In [None]:
import pandas as pd
import re
from typing import Dict, List, Optional

class UnifiedFeatureMapper:
    """
    A unified class to handle feature mapping and cleaning for UK Biobank data.
    Extracts clean disease names from parentheses in feature descriptions.
    """
    
    def __init__(self, data_dict_path: str):
        """
        Initialize the UnifiedFeatureMapper with data dictionary path.
        
        Args:
            data_dict_path (str): Path to the UK Biobank data dictionary CSV file
        """
        self.data_dict_path = data_dict_path
        self.dict_df = None
        self.clean_mapping = {}
        
    def load_and_process(self, feature_list: List[str]) -> Dict[str, str]:
        """
        Load data dictionary, process features, and return clean mappings.
        
        Args:
            feature_list (List[str]): List of feature column names (p-codes)
            
        Returns:
            Dict[str, str]: Clean mapping of feature codes to disease names
        """
        # Load data dictionary
        if not self._load_data_dictionary():
            return {}
        
        # Create initial mapping
        raw_mapping = self._create_raw_mapping(feature_list)
        
        # Clean and extract disease names
        self.clean_mapping = self._extract_clean_disease_names(raw_mapping)
        
        # Print summary
        self._print_summary(feature_list)
        
        return self.clean_mapping
    
    def _load_data_dictionary(self) -> bool:
        """Load the UK Biobank data dictionary."""
        try:
            self.dict_df = pd.read_csv(self.data_dict_path, low_memory=False)
            print(f"✓ Data dictionary loaded successfully. Shape: {self.dict_df.shape}")
            return True
        except FileNotFoundError:
            print(f"❌ Error: Data dictionary file not found at {self.data_dict_path}")
            return False
        except Exception as e:
            print(f"❌ Error loading data dictionary: {e}")
            return False
    
    def _create_raw_mapping(self, feature_list: List[str]) -> Dict[str, str]:
        """Create initial mapping from feature codes to full titles."""
        if self.dict_df is None:
            return {}
        
        # Filter for features that exist in our feature list
        relevant_features = self.dict_df[self.dict_df['name'].isin(feature_list)]
        
        # Create simple mapping dictionary
        raw_mapping = {}
        for _, row in relevant_features.iterrows():
            raw_mapping[row['name']] = row.get('title', 'N/A')
        
        return raw_mapping
    
    def _extract_clean_disease_names(self, raw_mapping: Dict[str, str]) -> Dict[str, str]:
        """Extract and clean disease names from parentheses."""
        clean_mapping = {}
        
        for feature_code, title in raw_mapping.items():
            if pd.isna(title) or title == 'N/A':
                clean_mapping[feature_code] = feature_code  # Fallback
                continue
            
            # Extract content within parentheses
            match = re.search(r'\(([^)]+)\)', str(title))
            if match:
                disease_name = match.group(1)  # Content within parentheses
                
                # Clean the disease name
                cleaned_disease = self._clean_disease_name(disease_name)
                clean_mapping[feature_code] = cleaned_disease
            else:
                # If no parentheses, use the full title but clean it
                clean_mapping[feature_code] = self._clean_disease_name(title)
        
        return clean_mapping
    
    def _clean_disease_name(self, disease_name: str) -> str:
        """Clean and format disease names."""
        if pd.isna(disease_name):
            return "Unknown"
        
        # Convert to string and strip whitespace
        cleaned = str(disease_name).strip()
        
        # Remove ICD code prefixes like "G00.0 " at the start
        cleaned = re.sub(r'^[A-Z]\d+(\.\d+)?\s*', '', cleaned)
        
        # Remove extra whitespace
        cleaned = re.sub(r'\s+', ' ', cleaned).strip()
        
        # Capitalize first letter if it's lowercase
        if cleaned and cleaned[0].islower():
            cleaned = cleaned[0].upper() + cleaned[1:]
        
        # If empty after cleaning, return a default
        if not cleaned:
            return "Unknown condition"
            
        return cleaned
    
    def _print_summary(self, feature_list: List[str]) -> None:
        """Print summary of the mapping process."""
        mapped_count = len(self.clean_mapping)
        total_count = len(feature_list)
        
        print(f"\n📊 Mapping Summary:")
        print(f"   Total features: {total_count}")
        print(f"   Successfully mapped: {mapped_count}")
        print(f"   Mapping success rate: {mapped_count/total_count*100:.1f}%")
        
        # Show sample mappings
        print(f"\n📋 Sample Clean Mappings:")
        sample_features = list(self.clean_mapping.keys())[:8]
        for feature in sample_features:
            print(f"   {feature} → {self.clean_mapping[feature]}")
    
    def get_clean_mapping(self) -> Dict[str, str]:
        """Get the clean mapping dictionary."""
        return self.clean_mapping.copy()
    
    def save_clean_mapping(self, output_path: str = "clean_disease_mapping.csv") -> None:
        """Save the clean mapping to a CSV file."""
        if not self.clean_mapping:
            print("❌ No clean mapping available. Run load_and_process() first.")
            return
        
        # Convert to DataFrame
        df = pd.DataFrame([
            {'feature_code': code, 'disease_name': name} 
            for code, name in self.clean_mapping.items()
        ])
        
        # Save to CSV
        try:
            df.to_csv(output_path, index=False)
            print(f"✓ Clean mapping saved to: {output_path}")
        except Exception as e:
            print(f"❌ Error saving mapping: {e}")
    
    def apply_to_dataframe(self, df: pd.DataFrame, save_original_names: bool = False) -> pd.DataFrame:
        """
        Apply clean disease names to rename DataFrame columns.
        
        Args:
            df (pd.DataFrame): DataFrame with feature code columns
            save_original_names (bool): Whether to keep original names as well
            
        Returns:
            pd.DataFrame: DataFrame with renamed columns
        """
        if not self.clean_mapping:
            print("❌ No clean mapping available. Run load_and_process() first.")
            return df
        
        # Create rename dictionary for columns that exist in both df and mapping
        rename_dict = {}
        for col in df.columns:
            if col in self.clean_mapping:
                rename_dict[col] = self.clean_mapping[col]
        
        if save_original_names:
            # Create new DataFrame with both original and new column names
            result_df = df.copy()
            for old_col, new_col in rename_dict.items():
                result_df[f"{new_col} ({old_col})"] = result_df[old_col]
                result_df = result_df.drop(columns=[old_col])
        else:
            # Simply rename columns
            result_df = df.rename(columns=rename_dict)
        
        renamed_count = len(rename_dict)
        print(f"✓ Renamed {renamed_count} columns with clean disease names")
        
        return result_df
    
    def get_feature_analysis(self) -> Dict[str, int]:
        """Analyze the types of features in the mapping."""
        if not self.clean_mapping:
            return {}
        
        analysis = {
            'total_features': len(self.clean_mapping),
            'unique_diseases': len(set(self.clean_mapping.values())),
        }
        
        # Count common disease categories
        disease_counts = {}
        for disease in self.clean_mapping.values():
            # Extract key terms for categorization
            lower_disease = disease.lower()
            if 'meningitis' in lower_disease:
                category = 'Meningitis'
            elif 'encephalitis' in lower_disease:
                category = 'Encephalitis'
            elif 'epilepsy' in lower_disease or 'seizure' in lower_disease:
                category = 'Epilepsy/Seizures'
            elif 'parkinson' in lower_disease:
                category = 'Parkinson Disease'
            elif 'dementia' in lower_disease or 'alzheimer' in lower_disease:
                category = 'Dementia/Alzheimer'
            elif 'migraine' in lower_disease or 'headache' in lower_disease:
                category = 'Headache/Migraine'
            elif 'multiple sclerosis' in lower_disease or 'sclerosis' in lower_disease:
                category = 'Multiple Sclerosis'
            elif 'cerebral' in lower_disease or 'stroke' in lower_disease:
                category = 'Cerebrovascular'
            else:
                category = 'Other'
            
            disease_counts[category] = disease_counts.get(category, 0) + 1
        
        analysis['disease_categories'] = disease_counts
        return analysis
    
    def print_analysis(self) -> None:
        """Print detailed analysis of the mapped features."""
        analysis = self.get_feature_analysis()
        
        if not analysis:
            print("❌ No analysis available. Run load_and_process() first.")
            return
        
        print(f"\n" + "="*60)
        print("FEATURE ANALYSIS")
        print("="*60)
        print(f"Total features: {analysis['total_features']}")
        print(f"Unique diseases: {analysis['unique_diseases']}")
        
        if 'disease_categories' in analysis:
            print(f"\nDisease categories breakdown:")
            sorted_categories = sorted(analysis['disease_categories'].items(), 
                                     key=lambda x: x[1], reverse=True)
            for category, count in sorted_categories:
                print(f"  {category}: {count}")


def main():
    """Example usage of the UnifiedFeatureMapper."""
    
    # Load phenotype data
    pheno = pd.read_csv('Nervous system disorders.csv', low_memory=False)
    features_list = list(pheno.columns[1:])  # Skip 'eid'

    # Path to UK Biobank data dictionary
    data_dict_path = 'database.dataset.data_dictionary.csv'
    
    # Initialize and run mapper
    mapper = UnifiedFeatureMapper(data_dict_path)
    clean_mapping = mapper.load_and_process(features_list)
    
    # Save mapping to CSV (optional but useful)
    mapper.save_clean_mapping("clean_disease_mapping.csv")
    
    # Print analysis
    mapper.print_analysis()
    
    # ✅ Apply mapping directly to pheno DataFrame
    pheno_clean = mapper.apply_to_dataframe(pheno, save_original_names=False)
    
    # Optional: Save renamed DataFrame
    pheno_clean.to_csv('Nervous_system_disorders_clean_names.csv', index=False)
    print("✓ Final DataFrame saved with clean disease names.")
    
    return clean_mapping, pheno_clean


if __name__ == "__main__":
    clean_mapping = main()

✓ Data dictionary loaded successfully. Shape: (36015, 16)

📊 Mapping Summary:
   Total features: 134
   Successfully mapped: 134
   Mapping success rate: 100.0%

📋 Sample Clean Mappings:
   p130992 → Bacterial meningitis, not elsewhere classified
   p130993 → Bacterial meningitis, not elsewhere classified
   p130994 → Meningitis in bacterial diseases classified elsewhere
   p130995 → Meningitis in bacterial diseases classified elsewhere
   p130996 → Meningitis in other infectious and parasitic diseases classified elsewhere
   p130997 → Meningitis in other infectious and parasitic diseases classified elsewhere
   p130998 → Meningitis due to other and unspecified causes
   p130999 → Meningitis due to other and unspecified causes
✓ Clean mapping saved to: clean_disease_mapping.csv

FEATURE ANALYSIS
Total features: 134
Unique diseases: 67

Disease categories breakdown:
  Other: 102
  Meningitis: 8
  Parkinson Disease: 6
  Encephalitis: 4
  Headache/Migraine: 4
  Cerebrovascular: 4
  Dement