In [4]:

import pandas as pd
import numpy as np
import os
import re
import json
import logging
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from datetime import datetime
import shutil
from google.colab import drive
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import LabelEncoder

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class Medication:
    name: str
    dosage: str
    frequency: str
    patient_factors: Dict

@dataclass
class Interaction:
    drug1: str
    drug2: str
    severity: str
    description: str
    recommendation: str
    confidence: float

class DrugDatabase:
    def __init__(self, interaction_data_path: str = 'ddinter_downloads_code_V.csv'):
        self.drug_encoder = LabelEncoder()
        self.interactions_df = self._load_interaction_data(interaction_data_path)
        self.drug_aliases = self._build_aliases_from_data()
        self.interactions_db = self._build_interaction_dict()

    def _load_interaction_data(self, path: str) -> Optional[pd.DataFrame]:
        if not os.path.exists(path):
            logger.error(f"Interaction data file not found at: {path}")
            return None
        try:
            df = pd.read_csv(path)

            # Flexible column mapping
            column_map = {}
            for col in df.columns:
                col_lower = col.lower()
                if 'drug_a' in col_lower or 'drug 1' in col_lower or 'drug1' in col_lower:
                    column_map[col] = 'drug1'
                elif 'drug_b' in col_lower or 'drug 2' in col_lower or 'drug2' in col_lower:
                    column_map[col] = 'drug2'
                elif 'level' in col_lower or 'severity' in col_lower:
                    column_map[col] = 'severity'
                elif 'description' in col_lower:
                    column_map[col] = 'description'
                elif 'recommendation' in col_lower or 'management' in col_lower:
                    column_map[col] = 'recommendation'
            df.rename(columns=column_map, inplace=True)

            logger.info(f"Successfully loaded interaction data from {path}. Rows: {len(df)}")
            logger.info(f"Columns after rename: {df.columns.tolist()}")

            # Required columns
            required_cols = ['drug1', 'drug2', 'severity']
            if not all(col in df.columns for col in required_cols):
                missing = [col for col in required_cols if col not in df.columns]
                logger.error(f"Missing required columns: {missing}")
                return None

            # Data cleaning
            df.dropna(subset=required_cols, inplace=True)
            df['drug1'] = df['drug1'].astype(str).str.lower().str.strip()
            df['drug2'] = df['drug2'].astype(str).str.lower().str.strip()
            df['severity'] = df['severity'].astype(str).str.lower().str.strip()

            # Map severity values
            severity_mapping = {
                'mini': 'MINOR',
                'minor': 'MINOR',
                'moderate': 'MODERATE',
                'major': 'MAJOR',
                'unknown': 'MINOR'
            }
            df['severity'] = df['severity'].map(severity_mapping).fillna('MINOR').str.upper()

            # Add default description and recommendation
            if 'description' not in df.columns:
                df['description'] = df.apply(lambda row: f"Interaction between {row['drug1']} and {row['drug2']} with {row['severity']} severity.", axis=1)
                logger.warning("No 'description' column found. Generated defaults.")
            if 'recommendation' not in df.columns:
                df['recommendation'] = 'Consult a healthcare professional for management.'
                logger.warning("No 'recommendation' column found. Using default.")

            # Add manual interactions
            manual_interactions = pd.DataFrame([
                {'drug1': 'warfarin', 'drug2': 'ibuprofen', 'severity': 'MAJOR', 'description': 'Increased bleeding risk due to combined anticoagulant and antiplatelet effects.', 'recommendation': 'Avoid combination or monitor INR closely.'},
                {'drug1': 'simvastatin', 'drug2': 'clarithromycin', 'severity': 'MAJOR', 'description': 'Increased risk of myopathy and rhabdomyolysis due to CYP3A4 inhibition.', 'recommendation': 'Use alternative antibiotic like azithromycin.'},
                {'drug1': 'warfarin', 'drug2': 'aspirin', 'severity': 'MODERATE', 'description': 'Potential for increased bleeding risk.', 'recommendation': 'Monitor for signs of bleeding.'},
                {'drug1': 'warfarin', 'drug2': 'clarithromycin', 'severity': 'MAJOR', 'description': 'Clarithromycin may increase warfarin levels via CYP3A4 inhibition.', 'recommendation': 'Monitor INR and adjust warfarin dose.'},
                {'drug1': 'ibuprofen', 'drug2': 'aspirin', 'severity': 'MODERATE', 'description': 'May reduce aspirin’s cardioprotective effect.', 'recommendation': 'Consider alternative analgesic.'}
            ])
            df = pd.concat([df, manual_interactions], ignore_index=True)

            # Encode drug names
            all_drugs = pd.concat([df['drug1'], df['drug2']]).unique()
            self.drug_encoder.fit(all_drugs)
            df['drug1_encoded'] = self.drug_encoder.transform(df['drug1'])
            df['drug2_encoded'] = self.drug_encoder.transform(df['drug2'])

            # Debug: Check input drugs and pairs
            input_drugs = ['warfarin', 'ibuprofen', 'simvastatin', 'clarithromycin', 'aspirin', 'amoxicillin', 'metformin']
            found_drugs = [d for d in input_drugs if d in df['drug1'].values or d in df['drug2'].values]
            logger.info(f"Input drugs found in dataset: {found_drugs}")
            logger.info(f"Manual interactions added: {len(manual_interactions)}")
            logger.info(f"Sample interactions_df tail:\n{df.tail().to_string()}")
            for d1 in input_drugs:
                for d2 in input_drugs:
                    if d1 < d2:
                        matches = df[(df['drug1'] == d1) & (df['drug2'] == d2) | (df['drug1'] == d2) & (df['drug2'] == d1)]
                        logger.info(f"Pair {d1}-{d2}: {len(matches)} matches")

            return df
        except Exception as e:
            logger.error(f"Error loading or processing interaction data: {str(e)}")
            return None

    def _build_aliases_from_data(self) -> Dict[str, str]:
        aliases = {}
        if self.interactions_df is not None:
            all_drugs = pd.concat([self.interactions_df['drug1'], self.interactions_df['drug2']]).unique()
            for drug in all_drugs:
                normalized = self.normalize_drug_name(drug, skip_alias_lookup=True)
                aliases[normalized] = normalized
        return aliases

    def _build_interaction_dict(self) -> Dict[Tuple[str, str], Dict]:
        interactions_dict = {}
        if self.interactions_df is not None:
            for index, row in self.interactions_df.iterrows():
                drug1 = self.normalize_drug_name(row['drug1'])
                drug2 = self.normalize_drug_name(row['drug2'])
                key = tuple(sorted((drug1, drug2)))
                interactions_dict[key] = {
                    'description': row['description'],
                    'recommendation': row['recommendation'],
                    'severity': row['severity']
                }
            logger.info(f"Built interaction dictionary with {len(interactions_dict)} unique pairs.")
            logger.info(f"Sample interaction_db keys: {list(interactions_dict.keys())[:5]}")
        return interactions_dict

    def normalize_drug_name(self, drug_name: str, skip_alias_lookup: bool = False) -> str:
        if not isinstance(drug_name, str):
            drug_name = str(drug_name)
        drug_name = re.sub(r'\s+', ' ', drug_name.lower().strip())
        if skip_alias_lookup:
            return drug_name
        return self.drug_aliases.get(drug_name, drug_name)

    def get_interaction(self, drug1: str, drug2: str) -> Optional[Dict]:
        if self.interactions_db is None:
            logger.warning("Interaction database not loaded.")
            return None
        normalized_drug1 = self.normalize_drug_name(drug1)
        normalized_drug2 = self.normalize_drug_name(drug2)
        key = tuple(sorted((normalized_drug1, normalized_drug2)))
        interaction = self.interactions_db.get(key)
        if interaction is None:
            logger.debug(f"No interaction found for pair: {normalized_drug1} - {normalized_drug2}")
        else:
            logger.debug(f"Found interaction for {normalized_drug1} - {normalized_drug2}: {interaction['severity']}")
        return interaction

    def get_all_interactions_df(self) -> Optional[pd.DataFrame]:
        return self.interactions_df

class InteractionChecker:
    def __init__(self, interaction_data_path: str = 'ddinter_downloads_code_V.csv'):
        self.db = DrugDatabase(interaction_data_path)
        self.severity_levels = ['MINOR', 'MODERATE', 'MAJOR']
        self.label_encoder = LabelEncoder()

        if self.db.get_all_interactions_df() is not None and 'severity' in self.db.get_all_interactions_df().columns:
            unique_severities = [s for s in self.db.get_all_interactions_df()['severity'].unique() if s in self.severity_levels]
            if unique_severities:
                self.label_encoder.fit(unique_severities)
            else:
                logger.warning("No valid severity levels in data. Using default levels.")
                self.label_encoder.fit(['MINOR', 'MODERATE', 'MAJOR'])
        else:
            logger.warning("No severity data found. Using default levels.")
            self.label_encoder.fit(['MINOR', 'MODERATE', 'MAJOR'])

        self.model = self._init_ml_model()

    def _init_ml_model(self) -> Optional[RandomForestClassifier]:
        if self.db.get_all_interactions_df() is None or self.db.get_all_interactions_df().empty:
            logger.warning("Interaction database not loaded or empty. Cannot train ML model.")
            return None

        df = self.db.get_all_interactions_df().copy()
        if 'severity' not in df.columns or not any(s in df['severity'].unique() for s in self.severity_levels):
            logger.warning("Severity column not available or invalid. ML model not trained.")
            return None

        df = df[df['severity'].isin(self.label_encoder.classes_)]
        if df.empty or len(df['severity'].unique()) < 2:
            logger.warning("Not enough diverse severity data to train ML model.")
            return None

        X = df[['drug1_encoded', 'drug2_encoded']]
        y = self.label_encoder.transform(df['severity'])
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

        model = RandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced')
        model.fit(X_train, y_train)

        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        report = classification_report(y_test, y_pred, target_names=self.label_encoder.classes_, zero_division=0)
        logger.info(f"ML Model Accuracy: {accuracy:.2f}")
        logger.info(f"ML Model Classification Report:\n{report}")
        return model

    def suggest_alternative(self, drug: str, severity: str) -> Optional[str]:
        if severity != 'MAJOR':
            return None
        alternatives = {
            'ibuprofen': 'acetaminophen',
            'warfarin': 'apixaban',
            'clarithromycin': 'azithromycin',
            'simvastatin': 'atorvastatin',
            'aspirin': 'clopidogrel',
            'amoxicillin': 'penicillin',
            'metformin': 'sitagliptin'
        }
        return alternatives.get(drug.lower(), None)

    def standardize_medication(self, medication: Medication) -> Medication:
        try:
            medication.name = self.db.normalize_drug_name(medication.name)
            dosage_regex = r'^\s*\d+(\.\d+)?\s*(mg|ml|g|mcg|units|tablet|tablets|tab|capsule|capsules|cap|spray|solution|cream|ointment|patch|inh|neb|vial|lozenge|suppository|drop(s)?|syr|susp|amp|bag|bar|cap|disk|dose|er|ir|la|liq|lot|oin|pow|sol|spr|sr|tab|tr|unit|vial|kit|swab|wafer|wipe|form|disk|gel|granules|gum|implant|insert|jelly|paste|pellet|pledget|plug|sponge|strip|tampon|tape|tip).*$'
            if not re.match(dosage_regex, medication.dosage, re.IGNORECASE):
                logger.warning(f"Potentially invalid dosage format for {medication.name}: '{medication.dosage}'")
            return medication
        except Exception as e:
            logger.error(f"Error standardizing medication {medication.name}: {str(e)}")
            return medication

    def predict_severity(self, med1: Medication, med2: Medication, interaction_data: Optional[Dict]) -> Tuple[str, float]:
        if interaction_data is None:
            return 'MINOR', 0.1

        if self.model is not None:
            try:
                drug1_encoded = self.db.drug_encoder.transform([med1.name])[0]
                drug2_encoded = self.db.drug_encoder.transform([med2.name])[0]
                features = pd.DataFrame([[drug1_encoded, drug2_encoded]], columns=['drug1_encoded', 'drug2_encoded'])
                predicted_encoded_severity = self.model.predict(features)[0]
                predicted_severity_str = self.label_encoder.inverse_transform([predicted_encoded_severity])[0]
                probabilities = self.model.predict_proba(features)[0]
                confidence = np.max(probabilities)
            except ValueError as e:
                logger.warning(f"ML prediction failed for ({med1.name}, {med2.name}): {e}. Using database severity.")
                predicted_severity_str = interaction_data.get('severity', 'MODERATE')
                confidence = 0.5
        else:
            predicted_severity_str = interaction_data.get('severity', 'MODERATE')
            confidence = 0.5

        patient_age = max(med1.patient_factors.get('age', 0), med2.patient_factors.get('age', 0))
        num_conditions = len(set(med1.patient_factors.get('conditions', {}).keys()).union(set(med2.patient_factors.get('conditions', {}).keys())))

        if predicted_severity_str not in self.severity_levels:
            predicted_severity_str = 'MODERATE'
        current_severity_index = self.severity_levels.index(predicted_severity_str)

        if patient_age > 70 and num_conditions >= 2 and current_severity_index < len(self.severity_levels) - 1:
            logger.debug(f"Adjusting severity upwards for elderly patient with conditions ({predicted_severity_str} -> {self.severity_levels[current_severity_index + 1]})")
            predicted_severity_str = self.severity_levels[current_severity_index + 1]
            confidence = min(confidence + 0.2, 1.0) if self.model is not None else 0.7

        return predicted_severity_str, confidence

    def check_interactions(self, medications: List[Medication]) -> List[Interaction]:
        interactions = []
        try:
            medications = [self.standardize_medication(med) for med in medications]
            for i, med1 in enumerate(medications):
                for j in range(i + 1, len(medications)):
                    med2 = medications[j]
                    if med1.name == med2.name:
                        continue
                    interaction_data = self.db.get_interaction(med1.name, med2.name)
                    predicted_severity, confidence = self.predict_severity(med1, med2, interaction_data)
                    if interaction_data:
                        interactions.append(Interaction(
                            drug1=med1.name,
                            drug2=med2.name,
                            severity=predicted_severity,
                            description=interaction_data.get('description', 'No description available.'),
                            recommendation=interaction_data.get('recommendation', 'Consult a healthcare professional.'),
                            confidence=confidence
                        ))
                    else:
                        interactions.append(Interaction(
                            drug1=med1.name,
                            drug2=med2.name,
                            severity='MINOR',
                            description='No specific interaction data found in the database.',
                            recommendation='Monitor for adverse effects.',
                            confidence=0.1
                        ))
        except Exception as e:
            logger.error(f"Error during interaction check: {str(e)}")

        try:
            sorted_interactions = sorted(interactions, key=lambda x: self.severity_levels.index(x.severity), reverse=True)
        except ValueError as e:
            logger.error(f"Error sorting interactions by severity: {e}. Returning unsorted list.")
            sorted_interactions = interactions

        return sorted_interactions

    def generate_alerts(self, interactions: List[Interaction]) -> List[Dict]:
        alerts = []
        for interaction in interactions:
            alternative1 = self.suggest_alternative(interaction.drug1, interaction.severity)
            alternative2 = self.suggest_alternative(interaction.drug2, interaction.severity)
            alert = {
                'timestamp': datetime.now().isoformat(),
                'severity': interaction.severity,
                'drugs': [interaction.drug1, interaction.drug2],
                'description': interaction.description,
                'recommendation': interaction.recommendation,
                'confidence': f"{interaction.confidence:.2%}" if interaction.confidence is not None else 'N/A',
                'alternative': f"Consider {alternative1} or {alternative2}" if alternative1 or alternative2 else None
            }
            alerts.append(alert)
            logger.info(f"Generated alert: {interaction.severity} between {interaction.drug1} and {interaction.drug2} (Confidence: {interaction.confidence:.2%})")
        return alerts

def main():
    try:
        drive.mount('/content/drive', force_remount=True)
        logger.info("Google Drive mounted successfully.")
    except Exception as e:
        logger.error(f"Failed to mount Google Drive: {str(e)}")
        return

    dataset_path_in_drive = '/content/drive/My Drive/ddinter_downloads_code_V.csv'
    local_dataset_path = 'ddinter_downloads_code_V.csv'

    if os.path.exists(dataset_path_in_drive):
        logger.info(f"Copying dataset from Drive: {dataset_path_in_drive} to {local_dataset_path}")
        shutil.copy(dataset_path_in_drive, local_dataset_path)
        logger.info("Dataset copied successfully.")
    else:
        logger.error(f"Dataset not found in Google Drive at '{dataset_path_in_drive}'.")
        return

    medications = [
        Medication(name="warfarin", dosage="5 mg tablet", frequency="daily", patient_factors={'age': 75, 'weight': 70, 'conditions': {'renal_impairment': True, 'atrial_fibrillation': True, 'hypertension': True}}),
        Medication(name="ibuprofen", dosage="400 mg tablet", frequency="as needed", patient_factors={'age': 75, 'weight': 70, 'conditions': {'osteoarthritis': True}}),
        Medication(name="simvastatin", dosage="20 mg tablet", frequency="daily", patient_factors={'age': 60, 'weight': 80, 'conditions': {'hyperlipidemia': True}}),
        Medication(name="clarithromycin", dosage="500 mg tablet", frequency="twice daily", patient_factors={'age': 75, 'weight': 70, 'conditions': {'pneumonia': True}}),
        Medication(name="aspirin", dosage="81 mg tablet", frequency="daily", patient_factors={'age': 75, 'weight': 70, 'conditions': {'heart_disease': True}}),
        Medication(name="amoxicillin", dosage="500 mg capsule", frequency="three times daily", patient_factors={'age': 30, 'weight': 65, 'conditions': {}}),
        Medication(name="metformin", dosage="500 mg tablet", frequency="twice daily", patient_factors={'age': 65, 'weight': 90, 'conditions': {'diabetes type 2': True, 'hypertension': True}})
    ]

    logger.info("Initializing InteractionChecker...")
    checker = InteractionChecker(interaction_data_path=local_dataset_path)

    if checker.db.get_all_interactions_df() is None:
        logger.error("Failed to load interaction database. Cannot proceed.")
        return
    elif checker.model is None:
        logger.warning("ML model could not be trained. Using rule-based checks and database severity.")

    logger.info("Checking for interactions...")
    interactions = checker.check_interactions(medications)

    logger.info("Generating alerts...")
    alerts = checker.generate_alerts(interactions)
    print("\n--- Medication Interaction Alerts ---")
    print(json.dumps(alerts, indent=2))
    print("------------------------------------")

if __name__ == "__main__":
    main()


Mounted at /content/drive





--- Medication Interaction Alerts ---
[
  {
    "timestamp": "2025-07-08T07:59:05.739901",
    "severity": "MAJOR",
    "drugs": [
      "warfarin",
      "ibuprofen"
    ],
    "description": "Increased bleeding risk due to combined anticoagulant and antiplatelet effects.",
    "recommendation": "Avoid combination or monitor INR closely.",
    "confidence": "69.00%",
    "alternative": "Consider apixaban or acetaminophen"
  },
  {
    "timestamp": "2025-07-08T07:59:05.739933",
    "severity": "MAJOR",
    "drugs": [
      "warfarin",
      "clarithromycin"
    ],
    "description": "Clarithromycin may increase warfarin levels via CYP3A4 inhibition.",
    "recommendation": "Monitor INR and adjust warfarin dose.",
    "confidence": "60.00%",
    "alternative": "Consider apixaban or azithromycin"
  },
  {
    "timestamp": "2025-07-08T07:59:05.739942",
    "severity": "MAJOR",
    "drugs": [
      "warfarin",
      "aspirin"
    ],
    "description": "Potential for increased bleeding ris