In [1]:
# Imports
from google.colab import drive
import os
import json
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder, label_binarize
from sklearn.pipeline import Pipeline
from sklearn.neural_network import MLPClassifier   # Backpropagation (Neural Net)
from sklearn.metrics import accuracy_score, f1_score, classification_report, roc_curve, auc, confusion_matrix
import joblib
import matplotlib.pyplot as plt
import seaborn as sns


# Mount Google Drive
drive.mount('/content/drive')

# Dataset Path
file_path = "/content/drive/My Drive/Major Project (Medical System)/Symptoms_Illness_Prediction_Dataset.csv"
# Set output directory where artifacts will be saved
output_dir = "/content/drive/My Drive/Major Project (Medical System)/Symptoms_Illness_Prediction_Output"
os.makedirs(output_dir, exist_ok=True)

# Settings
do_grid_search = False   # Grid search for tuning
random_state = 42
test_size = 0.20
cv_folds = 5


# ---------------- Helper Functions ----------------

def load_data(path):
    df = pd.read_csv(path)
    return df

def detect_target(df):
    for c in df.columns:
        if str(c).strip().lower() == "prognosis":
            return c
    return df.columns[-1]

def simplify_label(name):
    s = str(name)
    s = s.replace("_", " ").replace("-", " ").replace(".", " ").strip()
    s = " ".join(s.split())
    return s.lower().capitalize()

def preprocess_features(df, target_col):
    nrows = len(df)
    drop_cols = [c for c in df.columns if df[c].nunique() == nrows]
    if drop_cols:
        print("Dropping unique columns:", drop_cols)
        df = df.drop(columns=drop_cols)
    X = df.drop(columns=[target_col]).copy()
    y = df[target_col].astype(str).str.strip().copy()
    for c in X.columns:
        if X[c].dtype == object:
            vals = set(map(str.lower, X[c].dropna().astype(str).unique()))
            if vals <= {"yes","no","true","false","0","1"}:
                X[c] = X[c].astype(str).str.lower().map(
                    {"yes":1,"true":1,"1":1,"no":0,"false":0,"0":0}).astype(float)
    non_numeric = X.select_dtypes(include=["object","category"]).columns.tolist()
    if non_numeric:
        print("One-hot encoding:", non_numeric)
        X = pd.get_dummies(X, columns=non_numeric, drop_first=True)
    X = X.fillna(X.median())
    return X, y

def save_artifacts(best_estimator, label_encoder, feature_mapping, out_dir):
    joblib.dump(best_estimator, os.path.join(out_dir, "best_pipeline.joblib"))
    joblib.dump(label_encoder, os.path.join(out_dir, "label_encoder.joblib"))
    with open(os.path.join(out_dir, "feature_mapping.json"), "w", encoding="utf-8") as f:
        json.dump(feature_mapping, f, indent=2, ensure_ascii=False)
    print("Saved artifacts to", out_dir)

def predict_from_simplified_input(simplified_dict, feature_mapping, pipeline, label_encoder):
    features = list(feature_mapping.keys())
    X_in = pd.DataFrame([0]*len(features)).T
    X_in.columns = features

    inverse_map = {v.lower():k for k,v in feature_mapping.items()}

    for k,v in simplified_dict.items():
        key = k
        if key.lower() in inverse_map:
            orig = inverse_map[key.lower()]
        elif key in feature_mapping:
            orig = key
        else:
            norm = key.replace("_"," ").strip().lower()
            orig = inverse_map.get(norm, None)
        if orig is None:
            print(f"Warning: input key '{k}' not recognized. Ignored.")
            continue
        X_in[orig] = float(v)
    X_in = X_in.astype(float)[features]

    probs = pipeline.predict_proba(X_in)[0]
    idx = np.argmax(probs)
    label = label_encoder.inverse_transform([idx])[0] if hasattr(label_encoder, "inverse_transform") else idx

    top_idx = np.argsort(probs)[::-1][:5]
    top = [(label_encoder.inverse_transform([i])[0], float(probs[i])) for i in top_idx]
    return label, top, probs

def build_preprocessing_pipeline():
    return Pipeline([
        ("scaler", StandardScaler())
    ])

def evaluate_models(X_train, y_train, X_test, y_test, perform_grid=False):
    results = {}
    models = {
        "Backpropagation": MLPClassifier(
            hidden_layer_sizes=(100,),
            activation='relu',
            solver='adam',
            max_iter=500,
            random_state=random_state)
    }
    param_grids = {
        "Backpropagation": {
            "clf__hidden_layer_sizes": [(50,), (100,), (100,50)],
            "clf__activation": ["relu", "tanh"],
            "clf__solver": ["adam", "sgd"]
        }
    }
    prep = build_preprocessing_pipeline()
    prep.fit(X_train)
    for name, model in models.items():
        print("\n--- Evaluating", name, "---")
        pipeline = Pipeline([
            ("scaler", prep.named_steps["scaler"]),
            ("clf", model)
        ])
        if perform_grid:
            grid = GridSearchCV(pipeline, param_grids[name], cv=cv_folds, n_jobs=-1, verbose=2)
            grid.fit(X_train, y_train)
            best = grid.best_estimator_
            print("Best params:", grid.best_params_)
            y_pred = best.predict(X_test)
            results[name] = {"estimator": best, "grid_search": grid}
        else:
            pipeline.fit(X_train, y_train)
            y_pred = pipeline.predict(X_test)
            results[name] = {"estimator": pipeline}
        acc = accuracy_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred, average="macro")
        print(f"{name} test accuracy: {acc:.4f}  |  macro-F1: {f1:.4f}")
        print("Classification report:")
        print(classification_report(y_test, y_pred, zero_division=0))
    return results

def cross_validation_evaluation(X, y):
    print("\nCross-Validation Evaluation with Backpropagation:")
    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("clf", MLPClassifier(hidden_layer_sizes=(100,), activation='relu',
                              solver='adam', max_iter=500, random_state=random_state))
    ])
    skf = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=random_state)
    cv_scores = cross_val_score(pipeline, X, y, cv=skf, scoring="accuracy", n_jobs=-1)
    print(f"Cross-validation accuracy scores: {cv_scores}")
    print(f"Mean CV accuracy: {cv_scores.mean():.4f} | Std: {cv_scores.std():.4f}")


# ---------------- Main ----------------

def main():
    print("Loading data from:", file_path)
    df = load_data(file_path)
    print("Data shape:", df.shape)
    target_col = detect_target(df)
    print("Detected target column:", target_col)
    print(f"Duplicates before removal: {df.duplicated().sum()}")
    df = df.drop_duplicates()
    print(f"Data shape after removing duplicates: {df.shape}")

    X, y = preprocess_features(df, target_col)
    print("Final feature matrix shape:", X.shape)
    feature_mapping = {c: simplify_label(c) for c in X.columns}

    summary = {
        "shape": df.shape, "n_features": X.shape[1],
        "target_col": target_col, "n_classes": int(y.nunique())
    }
    with open(os.path.join(output_dir, "dataset_summary.json"), "w", encoding="utf-8") as f:
        json.dump(summary, f, indent=2)
    with open(os.path.join(output_dir, "feature_mapping.json"), "w", encoding="utf-8") as f:
        json.dump(feature_mapping, f, indent=2, ensure_ascii=False)

    le = LabelEncoder()
    y_enc = le.fit_transform(y)
    print("Classes (first 20):", list(le.classes_)[:20])

    X_train, X_test, y_train, y_test = train_test_split(
        X, y_enc, test_size=test_size, stratify=y_enc, random_state=random_state)
    print(f"Train size: {X_train.shape}, Test size: {X_test.shape}")

    results = evaluate_models(X_train, y_train, X_test, y_test, perform_grid=do_grid_search)

    best_name, best_score, best_estimator = None, -1, None
    for name, info in results.items():
        est = info["estimator"]
        y_pred = est.predict(X_test)
        acc = accuracy_score(y_test, y_pred)
        if acc > best_score:
            best_score = acc
            best_name = name
            best_estimator = est

    print(f"\nBest model on test set: {best_name} with accuracy {best_score:.4f}")

    cross_validation_evaluation(X, y_enc)

    save_artifacts(best_estimator, le, feature_mapping, output_dir)

    example_input = {}
    if len(feature_mapping) > 0:
        keys = list(feature_mapping.values())[:4]
        for k in keys:
            example_input[k] = 0
    print("Example prediction call (zeros):", example_input)
    predicted_label, top5, _ = predict_from_simplified_input(example_input, feature_mapping, best_estimator, le)
    print("Predicted (example):", predicted_label)
    print("Top 5 probabilities (example):")
    for disease, prob in top5:
        print(f"  {disease}: {prob:.2f}")

    print("All artifacts saved. Ready for downstream integration.")


if __name__ == "__main__":
    main()


# ---------------- Sample Predictions ----------------

# Load saved artifacts
best_estimator = joblib.load("/content/drive/MyDrive/Major Project (Medical System)/Symptoms_Illness_Prediction_Output/best_pipeline.joblib")
label_encoder = joblib.load("/content/drive/MyDrive/Major Project (Medical System)/Symptoms_Illness_Prediction_Output/label_encoder.joblib")
with open("/content/drive/MyDrive/Major Project (Medical System)/Symptoms_Illness_Prediction_Output/feature_mapping.json", "r", encoding="utf-8") as f:
    feature_mapping = json.load(f)

# Sample inputs
sample_tests = [
    {"Itching": 1, "Skin rash": 1, "Nodal skin eruptions": 0, "Continuous sneezing": 0, "Shivering": 0, "High fever": 1},
    {"Headache": 1, "Cold hands and feets": 1, "Mood swings": 1, "Weight loss": 1, "High fever": 1, "Fatigue": 1},
    {"Chest pain": 1, "Palpitations": 1, "Breathlessness": 1, "Fast heart rate": 1, "Weakness in limbs": 1, "Swelling joints": 0},
    {"Acidity": 1, "Stomach pain": 1, "Vomiting": 1, "Loss of appetite": 1, "Nausea": 1, "Diarrhoea": 0},
    {"Muscle weakness": 1, "Back pain": 1, "Joint pain": 1, "Movement stiffness": 1, "Loss of balance": 0, "Headache": 0}
]

# Run predictions
for i, sample in enumerate(sample_tests):
    print(f"\nSample Test {i+1}:")
    predicted_label, top5, _ = predict_from_simplified_input(sample, feature_mapping, best_estimator, label_encoder)
    print(f"Predicted disease: {predicted_label}")
    print("Top 5 predicted diseases with probabilities:")
    for disease, prob in top5:
        print(f"  {disease}: {prob:.2f}")


Mounted at /content/drive
Loading data from: /content/drive/My Drive/Major Project (Medical System)/Symptoms_Illness_Prediction_Dataset.csv
Data shape: (4961, 133)
Detected target column: prognosis
Duplicates before removal: 4657
Data shape after removing duplicates: (304, 133)
Final feature matrix shape: (304, 132)
Classes (first 20): ['AIDS', 'Acne', 'Alcoholic Hepatitis', 'Allergy', 'Arthritis', 'Bronchial Asthma', 'Cervical Spondylosis', 'Chickenpox', 'Chronic Cholestasis', 'Common Cold', 'Dengue', 'Diabetes', 'Dimorphic Hemmorhoids (piles)', 'Drug Reaction', 'Fungal Infection', 'GERD', 'Gastroenteritis', 'Heart Attack', 'Hepatitis A', 'Hepatitis B']
Train size: (243, 132), Test size: (61, 132)

--- Evaluating Backpropagation ---
Backpropagation test accuracy: 0.9836  |  macro-F1: 0.9707
Classification report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         1
           1       1.00      1.00      1.00         1
           