In [None]:
import pandas as pd
import ast  # To safely evaluate list-like strings

# Load the dataset
df = pd.read_csv("/content/final_disease_dataset_complete.csv")
df.head()



In [None]:
import pandas as pd
import ast
import numpy as np
import re

# Load your dataset
df = pd.read_csv('/content/final_disease_dataset_complete.csv')

# Function to clean and extract symptom list
def extract_symptoms(raw):
    if pd.isna(raw):
        return []
    try:
        # Remove everything from 'Detailed:' onward
        cleaned = re.split(r'Detailed:', raw)[0].strip()
        # Try to parse the list safely
        symptoms_list = ast.literal_eval(cleaned)
        if isinstance(symptoms_list, list):
            return [s.strip().strip("'").strip('"') for s in symptoms_list]
    except:
        return []
    return []

# Apply to Symptoms column
df['Symptoms'] = df['Symptoms'].apply(extract_symptoms)

# Also create a readable string version for modeling or LLM
df['SymptomsText'] = df['Symptoms'].apply(lambda x: ', '.join(x))

# Preview result
df[['Disease', 'SymptomsText']].head(10)


In [None]:
df.head()

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report

# Use 'SymptomsText' and 'Disease' columns
X = df['SymptomsText']
y = df['Disease']

# TF-IDF vectorization of symptoms text
vectorizer = TfidfVectorizer()
X_vec = vectorizer.fit_transform(X)

# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(X_vec, y, test_size=0.2, random_state=42)


 Train a Logistic Regression model

In [None]:
# Train the model
model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)

# Predict and evaluate
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))


In [None]:
def predict_disease(symptoms_input):
    input_vec = vectorizer.transform([symptoms_input])
    prediction = model.predict(input_vec)[0]

    # Get more details from the dataset
    details = df[df['Disease'] == prediction].iloc[0]
    print(f"🔍 Predicted Disease: {prediction}\n")
    print(f"🧠 Diagnosis: {details.get('Diagnosis', 'N/A')}")
    print(f"💊 Treatment: {details.get('Treatment', 'N/A')}")
    print(f"🛡️  Precautions: {details.get('Basic Precautions', 'N/A')}")
    # print(f"⚠️  When to See a Doctor: {details.get('When to See a Doctor', 'N/A')}")

# Test
predict_disease("high fever, headache, body pain")


#NEW

In [None]:
!pip install -U transformers sentence-transformers torch scikit-learn pandas numpy flask datasets accelerate
!pip install peft bitsandbytes  # For fine-tuning

In [None]:
import pandas as pd
import numpy as np
from ast import literal_eval

# 1. Proper CSV Loading with Error Handling
def load_medical_data(filepath):
    try:
        # First attempt with standard reading
        df = pd.read_csv(filepath)
    except pd.errors.ParserError:
        # If error occurs, try reading with error handling
        df = pd.read_csv(filepath, on_bad_lines='warn', engine='python')
        print("Warning: Some rows had formatting issues and were skipped")

    # Alternative approach if still having issues
    if len(df.columns) != 8:  # Adjust based on your expected column count
        # Manually specify columns if headers are known
        cols = ['Disease', 'Symptoms', 'Causes', 'Treatment',
                'Basic Precautions', 'Additional Precautions',
                'Diagnosis', 'When to See a Doctor']
        df = pd.read_csv(filepath, names=cols, header=0, on_bad_lines='skip')

    return df

# Load your dataset
df = load_medical_data('/content/final_disease_dataset_complete.csv')

# 2. Enhanced Data Cleaning
def clean_medical_data(df):
    # Handle missing values
    df.fillna({
        'Symptoms': '[]',
        'Causes': '[]',
        'Treatment': 'No treatment information available',
        'Basic Precautions': 'No precautions specified',
        'Additional Precautions': '',
        'Diagnosis': 'Diagnosis method not specified',
        'When to See a Doctor': 'Consult a doctor if symptoms persist'
    }, inplace=True)

    # Convert string lists to actual lists
    def safe_convert(x):
        if isinstance(x, str) and x.startswith('['):
            try:
                return literal_eval(x)
            except:
                return [x.strip() for x in x.split(',')]
        return [x] if pd.notna(x) else []

    df['Symptoms'] = df['Symptoms'].apply(safe_convert)
    df['Causes'] = df['Causes'].apply(safe_convert)

    # Clean text fields
    text_cols = ['Treatment', 'Basic Precautions', 'Additional Precautions',
                 'Diagnosis', 'When to See a Doctor']
    for col in text_cols:
        df[col] = df[col].str.strip()

    return df

df = clean_medical_data(df)

# 3. Verify the cleaned data
print(f"Dataset shape: {df.shape}")
print("\nSample data:")
print(df.head(3))
print("\nMissing values:")
print(df.isnull().sum())

# 4. Create a more robust version of the diagnosis system
from sentence_transformers import SentenceTransformer
from sklearn.neighbors import NearestNeighbors
import numpy as np

class MedicalDiagnosisSystem:
    def __init__(self, data):
        self.df = data
        self.embedder = SentenceTransformer('paraphrase-MiniLM-L6-v2')
        self._prepare_system()

    def _prepare_system(self):
        # Create combined text for each disease
        self.df['combined_info'] = self.df.apply(
            lambda row: self._create_combined_info(row), axis=1)

        # Generate embeddings
        self.embeddings = self.embedder.encode(self.df['combined_info'].tolist())

        # Build nearest neighbors model
        self.nn = NearestNeighbors(n_neighbors=3, metric='cosine')
        self.nn.fit(self.embeddings)

    def _create_combined_info(self, row):
        symptoms = ', '.join(row['Symptoms']) if isinstance(row['Symptoms'], list) else row['Symptoms']
        causes = ', '.join(row['Causes']) if isinstance(row['Causes'], list) else row['Causes']
        return (
            f"Disease: {row['Disease']}. Symptoms: {symptoms}. Causes: {causes}. "
            f"Treatment: {row['Treatment']}. Precautions: {row['Basic Precautions']}. "
            f"Diagnosis: {row['Diagnosis']}. When to see doctor: {row['When to See a Doctor']}"
        )

    def diagnose(self, symptoms, top_n=3):
        # Embed the input symptoms
        symptom_embedding = self.embedder.encode([symptoms])

        # Find nearest matches
        distances, indices = self.nn.kneighbors(symptom_embedding, n_neighbors=top_n)

        # Prepare results
        results = []
        for i, (dist, idx) in enumerate(zip(distances[0], indices[0])):
            disease_info = self.df.iloc[idx]
            results.append({
                'rank': i+1,
                'disease': disease_info['Disease'],
                'confidence': float(1 - dist),
                'symptoms': disease_info['Symptoms'],
                'causes': disease_info['Causes'],
                'treatment': disease_info['Treatment'],
                'precautions': disease_info['Basic Precautions'],
                'additional_precautions': disease_info['Additional Precautions'],
                'diagnosis_methods': disease_info['Diagnosis'],
                'when_to_see_doctor': disease_info['When to See a Doctor']
            })

        return results

# Initialize the system
diagnosis_system = MedicalDiagnosisSystem(df)

# 5. Example Usage
def interactive_diagnosis():
    print("Medical Diagnosis Assistant")
    print("Enter your symptoms (e.g., 'headache, fever'):")
    symptoms = input("> ")

    results = diagnosis_system.diagnose(symptoms)

    print("\nTop Possible Diagnoses:")
    for result in results:
        print(f"\n{result['rank']}. {result['disease']} (Confidence: {result['confidence']:.2f})")
        print(f"   Symptoms: {', '.join(result['symptoms']) if isinstance(result['symptoms'], list) else result['symptoms']}")
        print(f"   Possible Causes: {', '.join(result['causes']) if isinstance(result['causes'], list) else result['causes']}")
        print(f"\n   Treatment: {result['treatment']}")
        print(f"\n   Precautions: {result['precautions']}")
        if result['additional_precautions']:
            print(f"   Additional Precautions: {result['additional_precautions']}")
        print(f"\n   Diagnosis Methods: {result['diagnosis_methods']}")
        print(f"\n   When to See a Doctor: {result['when_to_see_doctor']}")

    print("\nNote: This is for informational purposes only. Please consult a healthcare professional for proper diagnosis.")

# Run the interactive diagnosis
interactive_diagnosis()

#NEW

1. Robust Data Loading and Cleaning

In [None]:
import pandas as pd
import numpy as np
from ast import literal_eval
import re

def load_and_clean_data(filepath):
    # Load data with flexible parsing
    try:
        df = pd.read_csv(filepath, on_bad_lines='skip', engine='python')
        print(f"Initial load: {len(df)} rows")
    except Exception as e:
        print(f"Error loading CSV: {e}")
        return None

    # Clean column names
    df.columns = df.columns.str.strip()

    # Handle missing values
    df.replace('nan', np.nan, inplace=True)
    df.fillna({
        'Symptoms': '[]',
        'Causes': '[]',
        'Treatment': 'No treatment information available',
        'Basic Precautions': 'No precautions specified',
        'Additional Precautions': '',
        'Diagnosis': 'Diagnosis method not specified',
        'When to See a Doctor': 'Consult a doctor if symptoms persist'
    }, inplace=True)

    # Improved list conversion
    def safe_convert_to_list(x):
        if isinstance(x, str):
            try:
                # Handle malformed lists
                if x.startswith('[') and x.endswith(']'):
                    return literal_eval(x)
                # Handle pipe or comma separated values
                if '|' in x:
                    return [item.strip() for item in x.split('|') if item.strip()]
                if ',' in x:
                    return [item.strip() for item in x.split(',') if item.strip()]
                return [x.strip()]
            except:
                return [x.strip()]
        return [x] if pd.notna(x) else []

    df['Symptoms'] = df['Symptoms'].apply(safe_convert_to_list)
    df['Causes'] = df['Causes'].apply(safe_convert_to_list)

    # Clean text fields
    text_cols = ['Treatment', 'Basic Precautions', 'Additional Precautions',
                 'Diagnosis', 'When to See a Doctor']
    for col in text_cols:
        df[col] = df[col].apply(lambda x: re.sub(r'\s+', ' ', str(x))).str.strip()

    # Remove duplicates
    df.drop_duplicates(subset=['Disease'], keep='first', inplace=True)
    df.reset_index(drop=True, inplace=True)
    print(f"Final cleaned dataset: {len(df)} rows")
    return df

# Load your dataset
df = load_and_clean_data('/content/final_disease_dataset_complete.csv')

Enhanced Diagnosis System

In [None]:
from sentence_transformers import SentenceTransformer
from sklearn.neighbors import NearestNeighbors
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

class AdvancedMedicalDiagnosisSystem:
    def __init__(self, data):
        self.df = data
        self._prepare_models()

    def _prepare_models(self):
        # Create symptom strings for each disease
        self.df['symptom_text'] = self.df['Symptoms'].apply(
            lambda x: ', '.join(x) if isinstance(x, list) else str(x))

        # Initialize embedding model
        self.embedder = SentenceTransformer('paraphrase-MiniLM-L6-v2')

        # Create both TF-IDF and dense embeddings
        self.tfidf = TfidfVectorizer(max_features=1000)
        self.tfidf_vectors = self.tfidf.fit_transform(self.df['symptom_text'])

        self.dense_vectors = self.embedder.encode(self.df['symptom_text'].tolist())

        # Build hybrid similarity model
        self.nn_model = NearestNeighbors(n_neighbors=5, metric='cosine')
        self.nn_model.fit(self.dense_vectors)

    def _get_disease_info(self, idx):
        row = self.df.iloc[idx]
        return {
            'disease': row['Disease'],
            'symptoms': row['Symptoms'],
            'causes': row['Causes'],
            'treatment': row['Treatment'],
            'precautions': row['Basic Precautions'],
            'additional_precautions': row['Additional Precautions'],
            'diagnosis_methods': row['Diagnosis'],
            'when_to_see_doctor': row['When to See a Doctor']
        }

    def diagnose(self, symptoms, top_k=5):
        # Generate both types of embeddings
        dense_embedding = self.embedder.encode([symptoms])
        tfidf_embedding = self.tfidf.transform([symptoms])

        # Find similar diseases using dense embeddings
        distances, indices = self.nn_model.kneighbors(dense_embedding, n_neighbors=top_k)

        # Prepare results with combined confidence scores
        results = []
        for dist, idx in zip(distances[0], indices[0]):
            disease_info = self._get_disease_info(idx)

            # Calculate TF-IDF similarity for the top matches
            tfidf_sim = np.dot(tfidf_embedding, self.tfidf_vectors[idx].T).toarray()[0][0]

            # Combined confidence score
            confidence = (0.7 * (1 - dist) + 0.3 * tfidf_sim)

            results.append({
                **disease_info,
                'confidence': float(confidence)
            })

        # Sort by confidence
        results.sort(key=lambda x: x['confidence'], reverse=True)

        return results

# Initialize the system
diagnosis_system = AdvancedMedicalDiagnosisSystem(df)

3. Improved User Interface

In [None]:
def format_diagnosis_output(results):
    output = []
    for i, result in enumerate(results, 1):
        disease_entry = {
            'rank': i,
            'disease': result['disease'],
            'confidence': f"{result['confidence']:.2f}",
            'details': {
                'Symptoms': ', '.join(result['symptoms']) if isinstance(result['symptoms'], list) else result['symptoms'],
                'Possible Causes': ', '.join(result['causes']) if isinstance(result['causes'], list) else result['causes'],
                'Treatment': result['treatment'],
                'Precautions': result['precautions'],
                'Additional Precautions': result['additional_precautions'] if result['additional_precautions'] else 'None',
                'Diagnosis Methods': result['diagnosis_methods'],
                'When to See a Doctor': result['when_to_see_doctor']
            }
        }
        output.append(disease_entry)
    return output


In [None]:

def interactive_diagnosis():
    print("\n=== Medical Diagnosis Assistant ===")
    print("Enter your symptoms (e.g., 'fever, headache, cough'):")
    symptoms = input("> ").strip()

    if not symptoms:
        print("Please enter at least one symptom.")
        return

    results = diagnosis_system.diagnose(symptoms)
    formatted_results = format_diagnosis_output(results)

    print("\n📝 Input Symptoms:")
    print(f"{symptoms}\n")

    print("\nTop Possible Diagnoses:")
    for result in formatted_results:
        print(f"\n{result['rank']}. {result['disease']} (Confidence: {result['confidence']})")
        for key, value in result['details'].items():
            if value and str(value).lower() not in ['none', 'nan', 'not specified']:
                print(f"   {key}: {value}")

    print("\nImportant Note: This tool provides informational suggestions only.")
    print("Always consult a healthcare professional for proper diagnosis and treatment.")

# Run the interactive diagnosis
interactive_diagnosis()

In [None]:
import joblib

joblib.dump(diagnosis_system, 'healthqai_model.pkl')

