In [None]:
import os
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
from sklearn.model_selection import train_test_split, StratifiedKFold, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    roc_auc_score,
    roc_curve,
    precision_recall_curve,
    average_precision_score,
)

In [None]:
# Global Config
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

In [None]:
MODEL_DIR = "./Models"
MODEL_PATH = os.path.join(MODEL_DIR, "habitable_planet_model_irs.pkl")

In [None]:
# Utility Functions
def ensure_dir(path: str):
    if not os.path.exists(path):
        os.makedirs(path)

In [None]:
# Data Loading
def load_dataset(file_path: str) -> pd.DataFrame:
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Dataset file not found: {file_path}")

    df = pd.read_csv(file_path)
    print(f"Dataset loaded | Shape: {df.shape}")
    return df

In [None]:
# Feature Selection
def select_features_and_target(df: pd.DataFrame) -> pd.DataFrame:

    allowed_features = [
        'P_MASS', 'P_RADIUS', 'P_DENSITY', 'P_GRAVITY', 'P_ESCAPE', 'P_TYPE',
        'P_PERIOD', 'P_SEMI_MAJOR_AXIS', 'P_ECCENTRICITY', 'P_INCLINATION',
        'P_OMEGA', 'P_PERIASTRON', 'P_APASTRON', 'P_IMPACT_PARAMETER', 'P_HILL_SPHERE',
        'S_MASS', 'S_RADIUS', 'S_LUMINOSITY', 'S_TEMPERATURE', 'S_AGE',
        'S_METALLICITY', 'S_LOG_G', 'S_TYPE', 'S_MAG', 'S_DISC', 'S_MAGNETIC_FIELD',
        'S_SNOW_LINE', 'S_TIDAL_LOCK', 'P_DETECTION', 'P_DISTANCE'
    ]

    target_column = 'P_HABITABLE'

    df = df[allowed_features + [target_column]].copy()
    df['Target'] = df[target_column].astype(str).str.lower().map({'yes': 1, 'no': 0})
    df.drop(columns=[target_column], inplace=True)

    print("Target Distribution:\n", df['Target'].value_counts(normalize=True))
    return df

In [None]:
# Preprocessing
class LabelEncoderWrapper:
    """Sklearn-compatible wrapper for LabelEncoder"""

    def __init__(self):
        self.encoders = {}

    def fit(self, X, y=None):
        X = pd.DataFrame(X)
        for col in X.columns:
            le = LabelEncoder()
            le.fit(X[col].astype(str))
            self.encoders[col] = le
        return self

    def transform(self, X):
        X = pd.DataFrame(X)
        for col, le in self.encoders.items():
            X[col] = le.transform(X[col].astype(str))
        return X.values

In [None]:
def build_preprocessor(X: pd.DataFrame) -> ColumnTransformer:

    categorical_cols = X.select_dtypes(include=['object']).columns.tolist()
    numerical_cols = X.select_dtypes(include=['number']).columns.tolist()

    num_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    cat_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', LabelEncoderWrapper())
    ])

    return ColumnTransformer([
        ('num', num_pipeline, numerical_cols),
        ('cat', cat_pipeline, categorical_cols)
    ])

In [None]:
# Model Training
def train_model(X_train, y_train, preprocessor) -> Pipeline:

    rf = RandomForestClassifier(
        random_state=RANDOM_SEED,
        n_jobs=-1,
        class_weight='balanced'
    )

    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', rf)
    ])

    param_grid = {
        'classifier__n_estimators': [200, 300, 400],
        'classifier__max_depth': [None, 10, 20],
        'classifier__min_samples_split': [2, 5],
        'classifier__min_samples_leaf': [1, 2]
    }

    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_SEED)

    search = RandomizedSearchCV(
        pipeline,
        param_distributions=param_grid,
        n_iter=20,
        scoring='roc_auc',
        cv=cv,
        n_jobs=-1,
        random_state=RANDOM_SEED
    )

    search.fit(X_train, y_train)

    print("Best Parameters:", search.best_params_)
    return search.best_estimator_

In [None]:
# Evaluation
def evaluate_model(model: Pipeline, X_test, y_test):

    preds = model.predict(X_test)
    probs = model.predict_proba(X_test)[:, 1]

    print(f"Accuracy: {accuracy_score(y_test, preds) * 100:.2f}%")
    print(f"ROC-AUC: {roc_auc_score(y_test, probs):.4f}\n")

    print("Classification Report:\n", classification_report(y_test, preds))

    cm = confusion_matrix(y_test, preds)
    plot_confusion_matrix(cm)
    plot_roc_curve(y_test, probs)
    plot_precision_recall(y_test, probs)

In [None]:
# Visualization
def plot_confusion_matrix(cm):
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.show()

In [None]:
def plot_roc_curve(y_true, probs):
    fpr, tpr, _ = roc_curve(y_true, probs)
    plt.figure(figsize=(6, 5))
    plt.plot(fpr, tpr, linewidth=2)
    plt.plot([0, 1], [0, 1], '--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.tight_layout()
    plt.show()

In [None]:
def plot_precision_recall(y_true, probs):
    precision, recall, _ = precision_recall_curve(y_true, probs)
    ap = average_precision_score(y_true, probs)

    plt.figure(figsize=(6, 5))
    plt.plot(recall, precision, linewidth=2)
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title(f'Precision-Recall Curve (AP={ap:.3f})')
    plt.tight_layout()
    plt.show()

In [None]:
# Model Saving
def save_model(model: Pipeline):
    ensure_dir(MODEL_DIR)
    with open(MODEL_PATH, 'wb') as f:
        pickle.dump(model, f)
    print(f"Model saved at: {MODEL_PATH}")

In [None]:
# Main Pipeline
def main():

    dataset_path = "../data/full_data.csv"

    df = load_dataset(dataset_path)
    df = select_features_and_target(df)

    X = df.drop(columns=['Target'])
    y = df['Target']

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y
    )

    preprocessor = build_preprocessor(X)

    model = train_model(X_train, y_train, preprocessor)

    evaluate_model(model, X_test, y_test)

    save_model(model)

In [None]:
if __name__ == '__main__':
    main()