
# ExoHabitAI ‚Äì Module 3: Machine Learning Dataset Preparation

üìå **Goal of Module 3**  
Convert the cleaned & engineered dataset from Module 2 into **ML-ready train/test datasets**
without changing any scientific or logical assumptions.

‚ö†Ô∏è **Important**  
- Logic is **IDENTICAL** to the original implementation  
- Code is only **restructured for readability & explanation**



## 1Ô∏è‚É£ Imports

These libraries are used for:
- Data handling
- Train/test splitting
- Feature scaling
- Feature selection
- Saving ML artifacts


In [22]:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.feature_selection import SelectKBest, f_classif

import joblib
import os
import warnings
warnings.filterwarnings('ignore')



## 2Ô∏è‚É£ MLDatasetPreparator Class

This class prepares datasets for machine learning by:
- Identifying or creating a target variable
- Selecting high-quality numerical features
- Handling imbalance safely
- Scaling features
- Saving reusable ML artifacts



### 2.1 `__init__()`

Initializes:
- Placeholders for train/test splits
- RobustScaler (better for outliers)
- Feature name tracking


In [23]:

class MLDatasetPreparator:
    def __init__(self):
        self.df = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.feature_names = []
        self.scaler = RobustScaler()
        self.target_column = None
    
    def find_target_variable(self):
        print("\nLooking for target variable...")

        target_priority = [
            'is_potentially_habitable',
            'is_habitable',
            'habitability_binary',
            'habitability_class',
            'habitability_tier',
            'habitability_label',
        ]

        for target in target_priority:
            if target in self.df.columns:
                print(f"‚úì Found target variable: '{target}'")
                self.target_column = target
                return True

        print("Checking for binary columns that could be targets...")
        binary_candidates = []

        for col in self.df.columns:
            if self.df[col].dtype in ['int64', 'float64']:
                unique_vals = self.df[col].dropna().unique()
                if len(unique_vals) == 2 and set(unique_vals).issubset({0, 1}):
                    binary_candidates.append(col)

        if binary_candidates:
            for col in binary_candidates:
                if 'habit' in col.lower() or 'class' in col.lower():
                    self.target_column = col
                    print(f"‚úì Selected '{col}' as target variable")
                    return True
            self.target_column = binary_candidates[0]
            print(f"‚úì Selected '{binary_candidates[0]}' as target variable")
            return True

        if 'habitability_score' in self.df.columns:
            print("‚úì Found 'habitability_score', will create binary target")
            self.target_column = 'habitability_score'
            return True

        print("‚ö† No suitable target variable found. Creating one...")
        return self.create_target_variable()

    def create_target_variable(self):
        print("Creating binary habitability target...")

        if 'in_habitable_zone' in self.df.columns:
            self.df['is_habitable'] = self.df['in_habitable_zone']
            self.target_column = 'is_habitable'
            return True

        elif all(col in self.df.columns for col in ['pl_eqt', 'pl_rade']):
            temp_condition = (self.df['pl_eqt'] >= 250) & (self.df['pl_eqt'] <= 350)
            size_condition = (self.df['pl_rade'] >= 0.8) & (self.df['pl_rade'] <= 1.5)
            self.df['is_habitable'] = (temp_condition & size_condition).astype(int)
            self.target_column = 'is_habitable'
            return True

        else:
            print("‚ùå Not enough data to create target variable")
            return False

    def prepare_target_variable(self):
        print(f"\nPreparing target variable '{self.target_column}'...")
        y = self.df[self.target_column].copy()

        if y.dtype in ['float64', 'int64']:
            unique_vals = y.dropna().unique()
            if len(unique_vals) == 2 and set(unique_vals) != {0, 1}:
                y = y.replace({unique_vals[0]: 0, unique_vals[1]: 1})
            elif 'score' in self.target_column.lower():
                y = (y >= 50).astype(int)
            else:
                y = (y >= y.median()).astype(int)

        elif y.dtype == 'object':
            unique_cats = y.dropna().unique()
            y = (y == unique_cats[0]).astype(int)

        y.fillna(0, inplace=True)

        self.df['target_binary'] = y.astype(int)
        self.target_column = 'target_binary'

        print("Target distribution:")
        print(self.df['target_binary'].value_counts())

        return True
    
    def select_features(self):
        id_cols = ['pl_name', 'hostname', 'rowid', 'loc_rowid', 'index']
        features_to_remove = [c for c in id_cols if c in self.df.columns]

        if self.target_column in self.df.columns:
            features_to_remove.append(self.target_column)

        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        features = []

        for col in numeric_cols:
            if col in features_to_remove:
                continue
            if self.df[col].isnull().mean() > 0.5:
                continue
            if self.df[col].nunique() <= 2:
                continue

            self.df[col].replace([np.inf, -np.inf], np.nan, inplace=True)
            self.df[col].fillna(self.df[col].median(), inplace=True)
            features.append(col)

        if len(features) > 30:
            features = self.df[features].var().sort_values(ascending=False).head(30).index.tolist()

        self.feature_names = features
        print(f"Selected {len(features)} features")
        return features
 
    def create_train_test_split(self, test_size=0.2, random_state=42):
        X = self.df[self.feature_names]
        y = self.df[self.target_column].astype(int)

        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y,
            test_size=test_size,
            random_state=random_state,
            stratify=y
        )

        print("Train size:", len(self.X_train))
        print("Test size:", len(self.X_test))

    def scale_features(self):
        self.scaler.fit(self.X_train)

        self.X_train_scaled = pd.DataFrame(
            self.scaler.transform(self.X_train),
            columns=self.feature_names,
            index=self.X_train.index
        )

        self.X_test_scaled = pd.DataFrame(
            self.scaler.transform(self.X_test),
            columns=self.feature_names,
            index=self.X_test.index
        )

        joblib.dump(self.scaler, 'ml_scaler.pkl')

    def save_datasets(self):
        self.X_train_scaled.to_csv('X_train_scaled.csv', index=False)
        self.X_test_scaled.to_csv('X_test_scaled.csv', index=False)
        self.y_train.to_csv('y_train.csv', index=False)
        self.y_test.to_csv('y_test.csv', index=False)

        pd.DataFrame({'feature': self.feature_names}).to_csv('feature_names.csv', index=False)



### 2.2 `find_target_variable()`

Automatically detects the best target column:
- Prefers habitability labels from Module 2
- Falls back to binary columns
- Can trigger target creation if needed


In [24]:

def find_target_variable(self):
        print("\nLooking for target variable...")

        target_priority = [
            'is_potentially_habitable',
            'is_habitable',
            'habitability_binary',
            'habitability_class',
            'habitability_tier',
            'habitability_label',
        ]

        for target in target_priority:
            if target in self.df.columns:
                print(f"‚úì Found target variable: '{target}'")
                self.target_column = target
                return True

        print("Checking for binary columns that could be targets...")
        binary_candidates = []

        for col in self.df.columns:
            if self.df[col].dtype in ['int64', 'float64']:
                unique_vals = self.df[col].dropna().unique()
                if len(unique_vals) == 2 and set(unique_vals).issubset({0, 1}):
                    binary_candidates.append(col)

        if binary_candidates:
            for col in binary_candidates:
                if 'habit' in col.lower() or 'class' in col.lower():
                    self.target_column = col
                    print(f"‚úì Selected '{col}' as target variable")
                    return True
            self.target_column = binary_candidates[0]
            print(f"‚úì Selected '{binary_candidates[0]}' as target variable")
            return True

        if 'habitability_score' in self.df.columns:
            print("‚úì Found 'habitability_score', will create binary target")
            self.target_column = 'habitability_score'
            return True

        print("‚ö† No suitable target variable found. Creating one...")
        return self.create_target_variable()



### 2.3 `create_target_variable()`

Creates a binary habitability label when none exists using:
- Habitable zone flag OR
- Temperature + radius heuristics


In [25]:

def create_target_variable(self):
        print("Creating binary habitability target...")

        if 'in_habitable_zone' in self.df.columns:
            self.df['is_habitable'] = self.df['in_habitable_zone']
            self.target_column = 'is_habitable'
            return True

        elif all(col in self.df.columns for col in ['pl_eqt', 'pl_rade']):
            temp_condition = (self.df['pl_eqt'] >= 250) & (self.df['pl_eqt'] <= 350)
            size_condition = (self.df['pl_rade'] >= 0.8) & (self.df['pl_rade'] <= 1.5)
            self.df['is_habitable'] = (temp_condition & size_condition).astype(int)
            self.target_column = 'is_habitable'
            return True

        else:
            print("‚ùå Not enough data to create target variable")
            return False



### 2.4 `prepare_target_variable()`

Ensures:
- Target is binary (0/1)
- Handles categorical or continuous targets
- Creates unified `target_binary`


In [26]:

def prepare_target_variable(self):
        print(f"\nPreparing target variable '{self.target_column}'...")
        y = self.df[self.target_column].copy()

        if y.dtype in ['float64', 'int64']:
            unique_vals = y.dropna().unique()
            if len(unique_vals) == 2 and set(unique_vals) != {0, 1}:
                y = y.replace({unique_vals[0]: 0, unique_vals[1]: 1})
            elif 'score' in self.target_column.lower():
                y = (y >= 50).astype(int)
            else:
                y = (y >= y.median()).astype(int)

        elif y.dtype == 'object':
            unique_cats = y.dropna().unique()
            y = (y == unique_cats[0]).astype(int)

        y.fillna(0, inplace=True)

        self.df['target_binary'] = y.astype(int)
        self.target_column = 'target_binary'

        print("Target distribution:")
        print(self.df['target_binary'].value_counts())

        return True



### 2.5 `select_features()`

Selects **high-quality numerical features** by:
- Removing identifiers
- Removing targets
- Dropping low-variance & high-missing columns


In [27]:

def select_features(self):
        id_cols = ['pl_name', 'hostname', 'rowid', 'loc_rowid', 'index']
        features_to_remove = [c for c in id_cols if c in self.df.columns]

        if self.target_column in self.df.columns:
            features_to_remove.append(self.target_column)

        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        features = []

        for col in numeric_cols:
            if col in features_to_remove:
                continue
            if self.df[col].isnull().mean() > 0.5:
                continue
            if self.df[col].nunique() <= 2:
                continue

            self.df[col].replace([np.inf, -np.inf], np.nan, inplace=True)
            self.df[col].fillna(self.df[col].median(), inplace=True)
            features.append(col)

        if len(features) > 30:
            features = self.df[features].var().sort_values(ascending=False).head(30).index.tolist()

        self.feature_names = features
        print(f"Selected {len(features)} features")
        return features



### 2.6 `create_train_test_split()`

Creates **stratified** train/test split to preserve class balance.


In [28]:

def create_train_test_split(self, test_size=0.2, random_state=42):
        X = self.df[self.feature_names]
        y = self.df[self.target_column].astype(int)

        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y,
            test_size=test_size,
            random_state=random_state,
            stratify=y
        )

        print("Train size:", len(self.X_train))
        print("Test size:", len(self.X_test))



### 2.7 `scale_features()`

Applies **RobustScaler** to reduce the effect of outliers.


In [29]:

def scale_features(self):
        self.scaler.fit(self.X_train)

        self.X_train_scaled = pd.DataFrame(
            self.scaler.transform(self.X_train),
            columns=self.feature_names,
            index=self.X_train.index
        )

        self.X_test_scaled = pd.DataFrame(
            self.scaler.transform(self.X_test),
            columns=self.feature_names,
            index=self.X_test.index
        )

        joblib.dump(self.scaler, 'ml_scaler.pkl')



### 2.8 `save_datasets()`

Saves:
- Scaled datasets
- Raw datasets
- Feature list


In [30]:

def save_datasets(self):
        self.X_train_scaled.to_csv('X_train_scaled.csv', index=False)
        self.X_test_scaled.to_csv('X_test_scaled.csv', index=False)
        self.y_train.to_csv('y_train.csv', index=False)
        self.y_test.to_csv('y_test.csv', index=False)

        pd.DataFrame({'feature': self.feature_names}).to_csv('feature_names.csv', index=False)



## 3Ô∏è‚É£ Run Module 3 Pipeline

This cell executes **all steps** in correct order.


In [31]:

prep = MLDatasetPreparator()

for fname in ['exoplanets_cleaned_ready.csv', 'exoplanets_processed.csv', 'exoplanet.csv']:
    if os.path.exists(fname):
        prep.df = pd.read_csv(fname)
        break

prep.find_target_variable()
prep.prepare_target_variable()
prep.select_features()
prep.create_train_test_split()
prep.scale_features()
prep.save_datasets()



Looking for target variable...
‚úì Found target variable: 'is_potentially_habitable'

Preparing target variable 'is_potentially_habitable'...
Target distribution:
target_binary
0    33156
1     5963
Name: count, dtype: int64
Selected 30 features
Train size: 31295
Test size: 7824



## ‚úÖ Module 3 Completed

You now have:
- ML-ready train/test datasets
- Robust scaling applied
- Clean feature selection

‚û°Ô∏è **Next: Module 4 ‚Äì Model Training & Evaluation**
