In [1]:
#jupyter nbconvert --to script Model.ipynb


#TODO: Añadir las métricas que voy a usar finales
#TODO: Platt scaling
#TODO: Hyperband
#TODO: Corregir los logs
#TODO: Conformal prediction

# Optional with Shapely or sth like that?
#TODO: Estudiar relevancia de las features para cada modelo y cada grupo.

In [2]:
# Remove any existing log files
import os
import glob
import logging

# Reset logger to avoid any issues with permissions
logging.shutdown()
# Remove loggers
for log_file in glob.glob("*.log"):
    os.remove(log_file)



# Star-Galaxy Classification using ALHAMBRA Photometry

This notebook implements and evaluates several machine learning models for classifying astronomical objects as stars or galaxies based on multi-band photometric data from the ALHAMBRA survey, using labels derived from higher-resolution COSMOS2020 data.

**Target Variable:** `acs_mu_class` (from COSMOS2020)
 - Which is 1 for Galaxy and 2 for Star. We will remap this to 0 (Galaxy, majority class) and 1 (Star, minority class).

**Features:** Selected columns from the ALHAMBRA survey data.

**Models:**
1. Support Vector Machine (SVM)
2. Decision Tree (CART)
3. Random Forest
4. XGBoost
5. LightGBM

## 0. Setup and Configuration

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import logging
from datetime import datetime
import joblib # For saving/loading models efficiently
import glob

# Scikit-learn imports
from sklearn.model_selection import train_test_split, StratifiedKFold, RandomizedSearchCV,  ParameterSampler
from sklearn.linear_model import LogisticRegression
from scipy.stats import loguniform # For hyperparameter distributions
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score, classification_report, confusion_matrix,
    precision_recall_fscore_support, roc_auc_score,
    brier_score_loss, precision_recall_curve, auc
)   
import seaborn as sns # For confusion matrix heatmap

# Boosting models
import xgboost as xgb
import lightgbm as lgb


# Configure logging
logging.shutdown()
logging.basicConfig(
    filename=f'models_{datetime.now().strftime("%d_%H-%M-%S")}.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    force=True
)
# Prevent logs from being printed to console
logging.getLogger().handlers = [h for h in logging.getLogger().handlers if isinstance(h, logging.FileHandler)]

## 1. Loading Dataset & Feature Selection

**Interesting Feature Combinations for Modeling:**
 
 The feature groups are defined as follows:
 - Group 1: Morphology features and their uncertainties
 - Group 2: Photometry magnitudes
 - Group 3: Photometry magnitude and errors
 - Group 4: Redshift features and their uncertainties
 - Group 5: Combination of photometry magnitude errors and morphology features (including uncertainties)
 - Group 6: Combination of photometry magnitude errors, morphology features (including uncertainties), and redshift features (including uncertainties)




In [4]:
# Read the df
df = pd.read_csv('data/match_alhambra_cosmos2020_ACS_class_0.8arcsec.csv')
logging.info(f"DataFrame created with shape: {df.shape}")
# Map ACS classification: 1 (Galaxy, Majority) -> 0, 2 (Star, minority) -> 1, 3 (Fake) -> drop
logging.info("Original class counts:")
logging.info(df['acs_mu_class'].value_counts().to_string())

# Drop fake detections (class 3)
# Drop fake detections
n_fakes = (df['acs_mu_class'] == 3).sum()
logging.info(f"Number of fake detections (class 3): {n_fakes}")
df = df[df['acs_mu_class'] != 3]

# Map classifications
df['acs_mu_class'] = df['acs_mu_class'].map({1: 0, 2: 1})

logging.info("After dropping fakes and mapping classes (0: Galaxy, 1: Star):")
logging.info(df['acs_mu_class'].value_counts().to_string())

In [5]:
# Input features

# --- Define feature categories based on ALHAMBRA data using exact names ---

# 1. ALHAMBRA Morphology Features (SExtractor-based)
morphology_features = [
    'area', 'fwhm', 'stell', 'ell', 'a', 'b', 'theta', 'rk', 'rf'
]

morphology_err = [
    's2n'
]

morphology_mags_errors = morphology_features + morphology_err

# 2. ALHAMBRA Photometry Magnitudes (Optical + NIR + Synthetic)
OPTICAL_MAG_COLS = [
    'F365W', 'F396W', 'F427W', 'F458W', 'F489W', 'F520W', 'F551W',
    'F582W', 'F613W', 'F644W', 'F675W', 'F706W', 'F737W', 'F768W',
    'F799W', 'F830W', 'F861W', 'F892W', 'F923W', 'F954W'
]
photometry_magnitudes = (
    OPTICAL_MAG_COLS +
    ['J', 'H', 'KS', 'F814W']
)

# 3. ALHAMBRA Photometry Uncertainties
OPTICAL_ERR_COLS = [
    'dF365W', 'dF396W', 'dF427W', 'dF458W', 'dF489W', 'dF520W', 'dF551W',
    'dF582W', 'dF613W', 'dF644W', 'dF675W', 'dF706W', 'dF737W', 'dF768W',
    'dF799W', 'dF830W', 'dF861W', 'dF892W', 'dF923W', 'dF954W'
]
photometry_uncertainties = (
    OPTICAL_ERR_COLS +
    ['dJ', 'dH', 'dKS', 'dF814W']
)

photometry_mags_errors = photometry_magnitudes + photometry_uncertainties

# 4. ALHAMBRA Photometric Redshift & Derived Features (BPZ-based)
redshift_features = [
    'zb_1', 'zb_Min_1', 'zb_Max_1', 'Tb_1',
    'z_ml', 't_ml',
    'Stell_Mass_1', 'M_Abs_1', 'MagPrior'
]

redshift_uncertainties = [
    'Odds_1', 'Chi2'
]


redshift_mags_errors = redshift_features + redshift_uncertainties

# 5. ALHAMBRA Quality/Auxiliary Features (per-band quality etc.)
OPTICAL_IRMS_COLS = [
    'irms_F365W', 'irms_F396W', 'irms_F427W', 'irms_F458W', 'irms_F489W',
    'irms_F520W', 'irms_F551W', 'irms_F582W', 'irms_F613W', 'irms_F644W',
    'irms_F675W', 'irms_F706W', 'irms_F737W', 'irms_F768W', 'irms_F799W',
    'irms_F830W', 'irms_F861W', 'irms_F892W', 'irms_F923W', 'irms_F954W'
]
quality_aux_features = (
    ['nfobs'] +
    OPTICAL_IRMS_COLS +
    ['irms_J', 'irms_H', 'irms_KS', 'irms_F814W']
)

# --- Define lists of features NOT used for modeling ---

non_modeling_identifiers = ['ID_1', 'id_2'] # ALHAMBRA ID, COSMOS ID

non_modeling_astrometry = [
    'RA_1', 'Dec_1', 'x', 'y', # ALHAMBRA Astrometry
    'ra_2', 'dec_2',          # COSMOS Astrometry
    'Separation'              # Matching Quality
]

non_modeling_flags = [
    'photoflag', 'xray', 'PercW', 'Satur_Flag', # ALHAMBRA Object/Photometry Flags
    'irms_OPT_Flag', 'irms_NIR_Flag'           # ALHAMBRA Overall Quality Flags
]

alhambra_prediction = ['Stellar_Flag'] # ALHAMBRA's own classification

non_modeling_aperture_mags = [ # Specific aperture mags, usually use total mags
    'F814W_3arcs', 'dF814W_3arcs', 'F814W_3arcs_corr'
]

non_modeling_cosmos_features = [ # Measurements/flags derived from COSMOS data (HST, HSC, VISTA...)
    'model_flag',
    'flag_hsc', 'flag_supcam', 'flag_udeep', 'flag_uvista',
    'hsc_r_mag', 'hsc_r_magerr', 'hsc_r_valid',
    'hsc_i_mag', 'hsc_i_magerr', 'hsc_i_valid',
    'uvista_j_mag', 'uvista_j_magerr', 'uvista_j_valid',
    'uvista_ks_mag', 'uvista_ks_magerr', 'uvista_ks_valid',
    'acs_f814w_mag', 'acs_f814w_magerr',
    'acs_fwhm_world', 'acs_mu_max',
    'solution_model' # This is categorical, but still COSMOS-derived info
]

target_variable = ['acs_mu_class'] # The COSMOS classification label to predict

##########################################################################################
#! --- Consolidate into the main dictionary for easy access ---
##########################################################################################

feature_sets = {
        # --- Potential Input Feature Sets ---
        'morphology_only': morphology_mags_errors,
        'photometry_magnitudes_only': photometry_magnitudes,
        'photometry_mags_errors': photometry_mags_errors,
        'photometry_plus_morphology': photometry_mags_errors + morphology_mags_errors,
        'photometry_no_redshift': photometry_mags_errors + morphology_mags_errors + quality_aux_features,
        'redshift_only': redshift_mags_errors,
        'full_alhambra_all': (morphology_mags_errors +
                            photometry_mags_errors +
                            redshift_mags_errors + 
                            quality_aux_features),

        # --- Excluded Feature Sets ---
        'non_modeling_identifiers': non_modeling_identifiers,
        'non_modeling_astrometry': non_modeling_astrometry,
        'non_modeling_flags': non_modeling_flags,
        'non_modeling_aperture_mags': non_modeling_aperture_mags,
        'non_modeling_cosmos_features': non_modeling_cosmos_features,
        'alhambra_prediction': alhambra_prediction,
        'target_variable': target_variable
    }

#! This is excluding the quality aux.
# Include target_variable in each group by appending it to the feature list
groups = {
        'group_1': feature_sets.get('morphology_only', []) + feature_sets.get('target_variable', []),
        'group_2': feature_sets.get('photometry_magnitudes_only', []) + feature_sets.get('target_variable', []),
        'group_3': feature_sets.get('photometry_mags_errors', []) + feature_sets.get('target_variable', []),
        'group_4': feature_sets.get('redshift_only', []) + feature_sets.get('target_variable', []),
        'group_5': feature_sets.get('photometry_plus_morphology', []) + feature_sets.get('target_variable', []),
        'group_6': (feature_sets.get('photometry_mags_errors', []) +
                   feature_sets.get('morphology_only', []) +
                   feature_sets.get('redshift_only', []) +
                   feature_sets.get('target_variable', [])),
        'group_7': feature_sets.get('full_alhambra_all', []) + feature_sets.get('target_variable', [])
    }

# --- Function to get a specific feature set (Unchanged from before) ---

def get_feature_set(df, set_name, groups = groups):
    """
    Selects columns from a DataFrame based on a predefined feature set name,
    including six specific groups defined by combinations of morphology,
    photometry magnitudes, uncertainties, and redshift features.

    Args:
        df (pd.DataFrame): The input DataFrame.
        set_name (str): The name of the desired feature set group:
                        'group_1' to 'group_6' as defined below.

    Returns:
        pd.DataFrame: A DataFrame containing only the columns
                      belonging to the specified feature set group.
                      Returns an empty DataFrame if no columns are found.
    """

    if set_name not in groups:
        raise ValueError(f"Feature set group '{set_name}' not defined. "
                         f"Available groups: {list(groups.keys())}")

    required_cols_in_set = groups[set_name]

    # Find which of these columns actually exist in the DataFrame
    available_cols = [col for col in required_cols_in_set if col in df.columns]

    # Warn if some columns from the set definition are missing
    missing_cols = [col for col in required_cols_in_set if col not in available_cols]
    if missing_cols:
        print(f"Warning: The following columns defined for feature set group '{set_name}'"
              f" were not found in the DataFrame and will be excluded: {missing_cols}")

    if not available_cols:
        print(f"Warning: No columns for feature set group '{set_name}' found in the DataFrame.")
        return pd.DataFrame()  # Return empty DataFrame

    print(f"Selecting feature set group '{set_name}' with {len(available_cols)} columns.")
    return df[available_cols]


In [6]:
# Quality check to see which cols are excluded and contained in each group
all_feature_cols = set()
for cols in feature_sets.values():
    all_feature_cols.update(cols)

df_cols_set = set(df.columns)
not_in_feature_sets = df_cols_set - all_feature_cols

if not_in_feature_sets:
    print(f"Columns in df not included in any feature_sets: {sorted(not_in_feature_sets)}")
else:
    print("All df columns are included in feature_sets.")


# Check which columns are in each feature group
for group_name in ['group_1', 'group_2', 'group_3', 'group_4', 'group_5', 'group_6', 'group_7']:
    print(f"\n=== {group_name} ===")
    
    # Get the feature set definition
    feature_set = groups[group_name]
    
    # Get the actual columns that exist in the data
    group_df = get_feature_set(df, group_name)
    

    available_cols = list(group_df.columns)
    
    # Find columns that are defined but not in the data
    missing_cols = [col for col in list(df.columns) if col not in feature_set]
    
    print(f"\nFeatures present ({len(available_cols)} columns):")
    print(list(sorted(available_cols)))
    
    print(f"\nFeatures missing ({len(missing_cols)} columns):")
    print(list(sorted(missing_cols)))




All df columns are included in feature_sets.

=== group_1 ===
Selecting feature set group 'group_1' with 11 columns.

Features present (11 columns):
['a', 'acs_mu_class', 'area', 'b', 'ell', 'fwhm', 'rf', 'rk', 's2n', 'stell', 'theta']

Features missing (125 columns):
['Chi2', 'Dec_1', 'F365W', 'F396W', 'F427W', 'F458W', 'F489W', 'F520W', 'F551W', 'F582W', 'F613W', 'F644W', 'F675W', 'F706W', 'F737W', 'F768W', 'F799W', 'F814W', 'F814W_3arcs', 'F814W_3arcs_corr', 'F830W', 'F861W', 'F892W', 'F923W', 'F954W', 'H', 'ID_1', 'J', 'KS', 'M_Abs_1', 'MagPrior', 'Odds_1', 'PercW', 'RA_1', 'Satur_Flag', 'Separation', 'Stell_Mass_1', 'Stellar_Flag', 'Tb_1', 'acs_f814w_mag', 'acs_f814w_magerr', 'acs_fwhm_world', 'acs_mu_max', 'dF365W', 'dF396W', 'dF427W', 'dF458W', 'dF489W', 'dF520W', 'dF551W', 'dF582W', 'dF613W', 'dF644W', 'dF675W', 'dF706W', 'dF737W', 'dF768W', 'dF799W', 'dF814W', 'dF814W_3arcs', 'dF830W', 'dF861W', 'dF892W', 'dF923W', 'dF954W', 'dH', 'dJ', 'dKS', 'dec_2', 'flag_hsc', 'flag_supcam

## 2. Data Preprocessing and Splitting

In [7]:
# Data splitting parameters
TEST_SIZE = 0.20 # Test set proportion
VAL_SIZE = 0.00 # Validation set proportion
CAL_SIZE = 0.10 # Calibration set proportion
# Train size will be 1 - (TEST_SIZE + VAL_SIZE + CAL_SIZE) = 0.70

TARGET_COLUMN = feature_sets.get('target_variable', [])[0]
RANDOM_SEED = 33 # For reproducibility

# Model saving directory
MODEL_DIR = "trained_models"
os.makedirs(MODEL_DIR, exist_ok=True)

# Data splitting strategy ('stratified' or 'random')
SPLIT_STRATEGY = 'stratified' # Recommended for imbalanced datasets


In [8]:
# --- Data Cleaning ---
logging.info(f"Original dataset size: {df.shape}")

# Choose the feature group to use (e.g., 'group_1', 'group_2', etc.)
FEATURE_GROUP = 'group_7'  # Change this to select a different group

# Get the feature columns for the selected group using get_feature_set
df_clean = get_feature_set(df, FEATURE_GROUP).dropna().copy()
logging.info(f"Dataset size after dropping NaNs: {df_clean.shape}")

# Ensure TARGET_COLUMN is defined correctly
if TARGET_COLUMN not in df_clean.columns:
    raise KeyError(f"Target column '{TARGET_COLUMN}' not found in the cleaned DataFrame columns: {df_clean.columns.tolist()}")

# Log value counts for target
logging.info(f"Value counts for target:\n1 (Star): {(df_clean[TARGET_COLUMN] == 1).sum()}\n0 (Galaxy): {(df_clean[TARGET_COLUMN] == 0).sum()}")

# Separate features (X) and target (y) for the cleaned DataFrame
X = df_clean.drop(columns=[TARGET_COLUMN])
y = df_clean[TARGET_COLUMN]

Selecting feature set group 'group_7' with 95 columns.


In [9]:
# --- Data Splitting ---
import numpy as np # Ensure numpy is imported
from sklearn.model_selection import train_test_split # Ensure train_test_split is imported

logging.info(f"Splitting data using '{SPLIT_STRATEGY}' strategy...")

# --- Validate Proportions ---
if not (0 <= TEST_SIZE <= 1 and 0 <= VAL_SIZE <= 1 and 0 <= CAL_SIZE <= 1):
     raise ValueError("Split proportions (TEST_SIZE, VAL_SIZE, CAL_SIZE) must be between 0 and 1.")

TRAIN_SIZE = 1.0 - TEST_SIZE - VAL_SIZE - CAL_SIZE
if not (0 <= TRAIN_SIZE <= 1):
     raise ValueError(f"Calculated TRAIN_SIZE ({TRAIN_SIZE:.3f}) is invalid. Sum of TEST_SIZE, VAL_SIZE, and CAL_SIZE must be between 0 and 1.")

if not np.isclose(TRAIN_SIZE + TEST_SIZE + VAL_SIZE + CAL_SIZE, 1.0):
    # This check might be redundant given the calculation of TRAIN_SIZE, but good for safety.
    raise ValueError("Sum of split proportions must be equal to 1.")

if np.isclose(TRAIN_SIZE, 0) and (np.isclose(VAL_SIZE, 0) or np.isclose(TEST_SIZE, 0) or np.isclose(CAL_SIZE, 0)):
     # Avoid scenarios where train is 0 but other splits are also 0, leading to ambiguity.
     # If only train is 0, it might be valid in some rare cases, but usually requires at least one other non-zero split.
     # Let's enforce Train > 0 for typical ML workflows.
     # If you need zero training data, adjust this check.
     logging.warning("TRAIN_SIZE is zero or near zero. Ensure this is intended.")
     if TRAIN_SIZE < 0: # Definitely an error
         raise ValueError("TRAIN_SIZE cannot be negative.")
     # Allow TRAIN_SIZE = 0 only if explicitly handled later, otherwise raise error?
     # For now, let's proceed but log a warning. If TRAIN_SIZE must be > 0, uncomment the raise below.
     # raise ValueError("TRAIN_SIZE must be greater than 0 for typical model training.")


logging.info(f"Target split ratios: Train={TRAIN_SIZE:.2f}, Val={VAL_SIZE:.2f}, Test={TEST_SIZE:.2f}, Cal={CAL_SIZE:.2f}")

# --- Initialize Splits ---
# Use iloc[0:0] to create empty DataFrames/Series with the same columns/dtype
empty_X = X.iloc[0:0]
empty_y = y.iloc[0:0]
X_train, y_train = empty_X.copy(), empty_y.copy()
X_val, y_val = empty_X.copy(), empty_y.copy()
X_test, y_test = empty_X.copy(), empty_y.copy()
X_cal, y_cal = empty_X.copy(), empty_y.copy()

# Temporary variables for sequential splitting
X_remaining, y_remaining = X.copy(), y.copy() # Use copies to avoid modifying original X, y

# --- Stratification Option ---
# Define stratify_func only once
def get_stratify_array(y_arr):
    return y_arr if SPLIT_STRATEGY == 'stratified' and not y_arr.empty else None

# --- First Split: Train vs. Remainder (Val + Test + Cal) ---
val_test_cal_size = VAL_SIZE + TEST_SIZE + CAL_SIZE

if np.isclose(val_test_cal_size, 0): # Only Train set needed
    X_train, y_train = X_remaining, y_remaining
    logging.info("All data assigned to Train set (Val, Test, Cal sizes are 0).")
    X_remaining, y_remaining = empty_X.copy(), empty_y.copy() # No remainder
elif np.isclose(TRAIN_SIZE, 0): # No Train set needed
    logging.info("Train set is empty (TRAIN_SIZE=0). Remainder passed to next splits.")
    # X_remaining, y_remaining already hold all data
else: # Split Train vs Remainder
    split_test_size = val_test_cal_size # Proportion of remainder relative to total (1.0)
    X_train, X_remaining, y_train, y_remaining = train_test_split(
        X_remaining, y_remaining,
        test_size=split_test_size,
        random_state=RANDOM_SEED,
        stratify=get_stratify_array(y_remaining)
    )
logging.info(f"Train set shape: {X_train.shape}")


# --- Second Split: Val vs. Remainder (Test + Cal) ---
if not X_remaining.empty:
    test_cal_size = TEST_SIZE + CAL_SIZE
    # Denominator for relative size calculation: size of the current remaining pool
    current_remaining_size_frac = VAL_SIZE + test_cal_size # = val_test_cal_size

    if np.isclose(VAL_SIZE, 0): # No Val set, pass remainder to next stage
        X_temp2, y_temp2 = X_remaining, y_remaining # Remainder is Test + Cal
        logging.info("Validation set is empty (VAL_SIZE=0).")
    elif np.isclose(test_cal_size, 0): # Only Val set left in remainder
        X_val, y_val = X_remaining, y_remaining
        X_temp2, y_temp2 = empty_X.copy(), empty_y.copy() # No data left for Test/Cal
        logging.info(f"Validation set shape: {X_val.shape}")
    else: # Split Val vs (Test + Cal)
        # Proportion of (Test + Cal) relative to (Val + Test + Cal)
        split_test_size = test_cal_size / current_remaining_size_frac
        X_val, X_temp2, y_val, y_temp2 = train_test_split(
            X_remaining, y_remaining,
            test_size=split_test_size,
            random_state=RANDOM_SEED,
            stratify=get_stratify_array(y_remaining)
        )
        logging.info(f"Validation set shape: {X_val.shape}")
else: # No data remaining after train split
    X_temp2, y_temp2 = empty_X.copy(), empty_y.copy()
    if not np.isclose(VAL_SIZE, 0): # Log only if Val set was expected
       logging.info("Validation set is empty (no data remaining after train split).")


# --- Third Split: Test vs. Cal ---
if not X_temp2.empty:
    # Denominator for relative size calculation: size of the current remaining pool
    current_remaining_size_frac = TEST_SIZE + CAL_SIZE # = test_cal_size

    if np.isclose(CAL_SIZE, 0): # No Cal set, remainder is Test
        X_test, y_test = X_temp2, y_temp2
        logging.info("Calibration set is empty (CAL_SIZE=0).")
    elif np.isclose(TEST_SIZE, 0): # Only Cal set left in remainder
        X_cal, y_cal = X_temp2, y_temp2
        logging.info("Test set is empty (TEST_SIZE=0).")
    else: # Split Test vs Cal
        # Proportion of Cal relative to (Test + Cal)
        split_test_size = CAL_SIZE / current_remaining_size_frac
        X_test, X_cal, y_test, y_cal = train_test_split(
            X_temp2, y_temp2,
            test_size=split_test_size,
            random_state=RANDOM_SEED,
            stratify=get_stratify_array(y_temp2)
        )
        # Logging shapes done after the if/else block
else: # No data remaining for Test/Cal split
    if not (np.isclose(TEST_SIZE, 0) and np.isclose(CAL_SIZE, 0)): # Log only if Test or Cal were expected
        logging.info("Test and Calibration sets are empty (no data remaining for final split).")

# Log final shapes for Test and Cal
logging.info(f"Test set shape: {X_test.shape}")
logging.info(f"Calibration set shape: {X_cal.shape}")


# --- Verification and Final Logging ---
total_len = len(X_train) + len(X_val) + len(X_test) + len(X_cal)
original_len = len(X)

if total_len != original_len:
     # Calculate actual proportions based on lengths
     actual_train = len(X_train) / original_len if original_len > 0 else 0
     actual_val = len(X_val) / original_len if original_len > 0 else 0
     actual_test = len(X_test) / original_len if original_len > 0 else 0
     actual_cal = len(X_cal) / original_len if original_len > 0 else 0
     logging.warning(f"Total split length ({total_len}) does not exactly match original length ({original_len}). "
                     f"This can happen with stratification or rounding. "
                     f"Target proportions: Train={TRAIN_SIZE:.3f}, Val={VAL_SIZE:.3f}, Test={TEST_SIZE:.3f}, Cal={CAL_SIZE:.3f}. "
                     f"Actual proportions: Train={actual_train:.3f}, Val={actual_val:.3f}, Test={actual_test:.3f}, Cal={actual_cal:.3f}")
else:
    logging.info("Split lengths verification successful.")

logging.info("Data splitting complete.")

# Log distributions, handling empty sets
def log_distribution(name, y_set):
    if y_set.empty:
        logging.info(f"{name} target distribution: Set is empty.")
    else:
        try:
            # Use normalize=True, handle potential division by zero if counts are zero (though unlikely if not empty)
            counts = y_set.value_counts()
            dist = counts / counts.sum() if counts.sum() > 0 else counts
            logging.info(f"{name} target distribution:\n{dist}")
            # Log absolute counts as well for clarity
            logging.info(f"{name} target counts:\n{counts}")
        except Exception as e:
            logging.error(f"Could not calculate distribution for {name}: {e}")
            # Attempt to log raw value counts even if normalization fails
            try:
                logging.info(f"{name} raw value counts:\n{y_set.value_counts()}")
            except Exception as e_raw:
                 logging.error(f"Could not get raw value counts for {name}: {e_raw}")


log_distribution("Train", y_train)
log_distribution("Validation", y_val)
log_distribution("Test", y_test)
log_distribution("Calibration", y_cal)

### Feature Scaling

In [10]:
# --- Feature Scaling ---
# Important for SVM, can be beneficial for others too.
# Fit scaler ONLY on training data, then transform all sets.

# Check if training set and other datasets are non-empty before scaling
if len(X_train) > 0 and TRAIN_SIZE > 0:
    logging.info("Applying StandardScaler to features...")
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
else:
    logging.info("Empty training set, NOT able to apply StandardScaler!")
    X_train_scaled = X_train

if len(X_val) > 0 and VAL_SIZE > 0:
    X_val_scaled = scaler.transform(X_val)
else:
    X_val_scaled = X_val

if len(X_test) > 0 and TEST_SIZE > 0:
    X_test_scaled = scaler.transform(X_test)
else:
    X_test_scaled = X_test

if len(X_cal) > 0 and CAL_SIZE > 0:
    X_cal_scaled = scaler.transform(X_cal)
else:
    X_cal_scaled = X_cal

# Save the scaler if it was fitted
if 'scaler' in locals():
    scaler_filename = os.path.join(MODEL_DIR, f"scaler_{datetime.now().strftime('%Y%m%d_%H%M%S')}.joblib")
    joblib.dump(scaler, scaler_filename)
    logging.info(f"Scaler saved to {scaler_filename}")

# Use scaled data for models sensitive to scale (like SVM)
# For tree-based models, scaling is not strictly necessary, but using scaled
# data consistently here won't hurt and simplifies the workflow.
X_train_processed = X_train_scaled
X_val_processed = X_val_scaled
X_test_processed = X_test_scaled
X_cal_processed = X_cal_scaled

# (Optional) Convert back to DataFrames for easier column inspection if needed
# X_train_processed = pd.DataFrame(X_train_scaled, columns=FEATURE_COLUMNS, index=X_train.index)
# X_val_processed = pd.DataFrame(X_val_scaled, columns=FEATURE_COLUMNS, index=X_val.index)
# X_test_processed = pd.DataFrame(X_test_scaled, columns=FEATURE_COLUMNS, index=X_test.index)
# X_cal_processed = pd.DataFrame(X_cal_scaled, columns=FEATURE_COLUMNS, index=X_cal.index)

logging.info("Feature scaling complete.")

### Platt Scaling

In [11]:
def train_platt_scaler(base_estimator_class, best_params, X_train, y_train, n_splits=5, random_state=None):
    """
    Trains a base estimator and calibrates its outputs using Platt scaling
    with k-fold cross-validation to obtain out-of-fold decision scores.

    Args:
        base_estimator_class: The class of the base estimator (e.g., SVC).
        best_params (dict): Dictionary of best hyperparameters for the base estimator.
        X_train (np.ndarray): Training features.
        y_train (np.ndarray): Training labels.
        n_splits (int): Number of folds for cross-validation.
        random_state (int): Random state for reproducibility.

    Returns:
        tuple: (fitted_base_estimator, fitted_platt_scaler)
               Returns (None, None) if an error occurs.
    """
    logging.info("--- Starting Platt Scaling Training ---")
    try:
        # 1. Train the final base model on the entire training set
        logging.info("Training final base model on full training data...")
        final_base_estimator = base_estimator_class(**best_params)
        final_base_estimator.fit(X_train, y_train)
        logging.info("Final base model trained.")

        # 2. Get out-of-fold decision scores using k-fold CV
        logging.info(f"Performing {n_splits}-fold CV to get out-of-fold decision scores...")
        cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
        oof_decision_scores = np.zeros_like(y_train, dtype=float)
        oof_true_labels = np.zeros_like(y_train, dtype=int)
        fold_indices = np.zeros_like(y_train, dtype=int) # To store which fold each sample was held out from

        for fold, (train_idx, val_idx) in enumerate(cv.split(X_train, y_train)):
            logging.info(f"Processing Fold {fold+1}/{n_splits}...")
            X_train_fold, X_val_fold = X_train[train_idx], X_train[val_idx]
            y_train_fold, y_val_fold = y_train.iloc[train_idx], y_train.iloc[val_idx] # Use iloc for Series indexing

            # Clone and train estimator on the fold's training data
            estimator_fold = base_estimator_class(**best_params) # Instantiate fresh estimator
            estimator_fold.fit(X_train_fold, y_train_fold)

            # Get decision function scores for the validation fold
            decision_scores_fold = estimator_fold.decision_function(X_val_fold)
            
            # Store results
            oof_decision_scores[val_idx] = decision_scores_fold
            oof_true_labels[val_idx] = y_val_fold.values # Get numpy array from Series
            fold_indices[val_idx] = fold + 1


        logging.info("Out-of-fold decision scores collected.")
        
        # Ensure correct shapes and types
        oof_decision_scores = oof_decision_scores.reshape(-1, 1)

        # 3. Train the Logistic Regression scaler
        logging.info("Training Logistic Regression (Platt) scaler...")
        # Use high C to approximate original Platt scaling (low regularization)
        platt_scaler = LogisticRegression(C=1e10, solver='liblinear', random_state=random_state)
        platt_scaler.fit(oof_decision_scores, oof_true_labels)
        logging.info("Platt scaler trained.")

        # Verify shapes one last time
        if oof_decision_scores.shape[0] != len(oof_true_labels):
            raise ValueError(f"Shape mismatch after collecting OOF scores: scores {oof_decision_scores.shape[0]}, labels {len(oof_true_labels)}")


        logging.info("--- Platt Scaling Training Complete ---")
        return final_base_estimator, platt_scaler

    except Exception as e:
        logging.error(f"Error during Platt scaling: {e}", exc_info=True)
        return None, None

### Conformal Prediction

In [12]:
def calculate_ncm_scores(calibrated_probs, true_labels):
    """Calculates non-conformity scores (1 - probability of true class)."""
    if not isinstance(calibrated_probs, np.ndarray) or not isinstance(true_labels, np.ndarray):
         # Ensure inputs are numpy arrays for proper indexing
        calibrated_probs = np.asarray(calibrated_probs)
        true_labels = np.asarray(true_labels)
        
    if calibrated_probs.shape[0] != true_labels.shape[0]:
         raise ValueError("Probs and labels must have the same number of samples.")
    if calibrated_probs.shape[1] < np.max(true_labels) + 1:
        raise ValueError("Probs array has fewer columns than needed for max label index.")

    # Get probability of the true class for each sample
    # Using np.take_along_axis for efficient indexing
    true_class_probs = np.take_along_axis(calibrated_probs, true_labels[:, np.newaxis], axis=1).squeeze()

    return 1.0 - true_class_probs


def calibrate_conformal_threshold(ncm_scores, alpha):
    """Calculates the quantile threshold q for ICP."""
    n = len(ncm_scores)
    q_level = np.ceil((n + 1) * (1 - alpha)) / n
    q_threshold = np.quantile(ncm_scores, q_level, method='higher') # Use 'higher' to ensure coverage
    logging.info(f"Calibrated CP: n={n}, alpha={alpha}, q_level={q_level:.4f}, q_threshold={q_threshold:.6f}")
    return q_threshold

def predict_conformal_sets(calibrated_probs, q_threshold):
    """Generates prediction sets based on calibrated probabilities and threshold."""
    prediction_sets = []
    ncm_scores_per_class = 1.0 - calibrated_probs # NCM score for *each* potential class

    for scores in ncm_scores_per_class:
        set_for_sample = [i for i, score in enumerate(scores) if score <= q_threshold]
        # Handle empty sets - should be rare with correct quantile calculation
        # but as a fallback, could include the most likely class
        if not set_for_sample:
             set_for_sample = [np.argmin(scores)] # Index of lowest NCM score == highest prob
             logging.warning(f"Empty prediction set generated, falling back to most likely class: {set_for_sample}")
        prediction_sets.append(set(set_for_sample)) # Store as sets
    return prediction_sets

def evaluate_conformal_prediction(y_true, prediction_sets, alpha, model_name="Model"):
    """Evaluates the performance of conformal prediction sets."""
    if not isinstance(y_true, (pd.Series, np.ndarray)):
        y_true = np.asarray(y_true) # Ensure y_true is indexable

    n_samples = len(y_true)
    if n_samples == 0:
        logging.warning(f"[{model_name} CP Eval] Empty y_true provided.")
        return None, None

    # Empirical Coverage
    coverage_count = sum(y_true.iloc[i] in prediction_sets[i] for i in range(n_samples))
    empirical_coverage = coverage_count / n_samples

    # Average Set Size
    average_set_size = np.mean([len(s) for s in prediction_sets])

    logging.info(f"--- {model_name} Conformal Prediction Evaluation (alpha={alpha}) ---")
    logging.info(f"Target Coverage: {1 - alpha:.2f}")
    logging.info(f"Empirical Coverage: {empirical_coverage:.4f} ({coverage_count}/{n_samples})")
    logging.info(f"Average Prediction Set Size: {average_set_size:.4f}")

    return empirical_coverage, average_set_size

### Metrics

In [13]:
# --- Define Comprehensive Metrics ---

def calculate_metrics(y_true, y_pred, y_proba, model_name="Model"):
    """
    Calculates a comprehensive set of classification metrics.

    Args:
        y_true (array-like): Ground truth labels.
        y_pred (array-like): Predicted labels.
        y_proba (array-like): Predicted probabilities for the positive class (class 1).
        model_name (str): Name of the model for logging.

    Returns:
        dict: A dictionary containing calculated metrics.
              Returns None if input arrays are empty or invalid.
    """
    if len(y_true) == 0 or len(y_pred) == 0 or len(y_proba) == 0:
        logging.error(f"[{model_name}] Empty input arrays provided for metric calculation.")
        return None
    if len(y_true) != len(y_pred) or len(y_true) != len(y_proba):
        logging.error(f"[{model_name}] Mismatched lengths in input arrays for metric calculation.")
        return None

    metrics = {}

    # --- Threshold-based Metrics (using y_pred) ---
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary', zero_division=0)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

    metrics['accuracy'] = accuracy_score(y_true, y_pred)
    metrics['precision'] = precision
    metrics['recall_tpr'] = recall # True Positive Rate (Sensitivity)
    metrics['f1_score'] = f1

    # Specificity (True Negative Rate)
    metrics['specificity_tnr'] = tn / (tn + fp) if (tn + fp) > 0 else 0.0

    # Geometric Mean
    metrics['g_mean'] = np.sqrt(metrics['recall_tpr'] * metrics['specificity_tnr'])

    # Confusion Matrix
    metrics['confusion_matrix'] = {'tn': tn, 'fp': fp, 'fn': fn, 'tp': tp}

    # --- Ranking/Probabilistic Metrics (using y_proba) ---
    try:
        metrics['roc_auc'] = roc_auc_score(y_true, y_proba)
    except ValueError as e:
        logging.warning(f"[{model_name}] Could not calculate ROC AUC: {e}. Setting to 0.0.")
        metrics['roc_auc'] = 0.0 # Handle cases with only one class present

    # PR AUC
    pr_curve_precision, pr_curve_recall, _ = precision_recall_curve(y_true, y_proba)
    metrics['pr_auc'] = auc(pr_curve_recall, pr_curve_precision) # Note order: recall is x, precision is y

    # Brier Score
    metrics['brier_score'] = brier_score_loss(y_true, y_proba)

    logging.info(f"--- {model_name} Metrics ---")
    logging.info(f"Accuracy: {metrics['accuracy']:.4f}")
    logging.info(f"Precision: {metrics['precision']:.4f}")
    logging.info(f"Recall (TPR): {metrics['recall_tpr']:.4f}")
    logging.info(f"Specificity (TNR): {metrics['specificity_tnr']:.4f}")
    logging.info(f"F1-Score: {metrics['f1_score']:.4f}")
    logging.info(f"G-Mean: {metrics['g_mean']:.4f}")
    logging.info(f"ROC AUC: {metrics['roc_auc']:.4f}")
    logging.info(f"PR AUC: {metrics['pr_auc']:.4f}")
    logging.info(f"Brier Score: {metrics['brier_score']:.4f}")
    logging.info(f"Confusion Matrix (TN, FP, FN, TP): ({tn}, {fp}, {fn}, {tp})")

    # Optional: Plot Confusion Matrix
    plt.figure(figsize=(6, 4))
    sns.heatmap([[tn, fp], [fn, tp]], annot=True, fmt='d', cmap='Blues',
                xticklabels=['Predicted Galaxy (0)', 'Predicted Star (1)'],
                yticklabels=['Actual Galaxy (0)', 'Actual Star (1)'])
    plt.title(f'{model_name} Confusion Matrix')
    plt.ylabel('Actual Label')
    plt.xlabel('Predicted Label')
    cm_filename = os.path.join(MODEL_DIR, f"{model_name}_confusion_matrix_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
    plt.savefig(cm_filename)
    plt.close()
    logging.info(f"Confusion matrix plot saved to {cm_filename}")


    return metrics

## 3. Model Implementation: Support Vector Machine (SVM)

In [16]:
# --- 3.1 SVM: Define Base Model and Hyperparameter Search ---
model_name_svm = "svm_rbf_tuned"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Timestamp for this run

# Base SVM instance (untrained)
# Ensure probability=False initially, as we handle calibration separately.
# decision_function is needed for independent Platt scaling.
base_svm = SVC(
    kernel='rbf',
    random_state=RANDOM_SEED,
    class_weight='balanced',
    # probability=False # Not needed if using decision_function + external Platt
)

# Hyperparameter distribution for Randomized Search
# Use loguniform for C and gamma as they often span orders of magnitude
param_dist_svm = {
    'C': loguniform(1e-2, 1e3),       # Regularization parameter (e.g., 0.01 to 1000)
    'gamma': loguniform(1e-4, 1e1)   # Kernel coefficient for 'rbf' (e.g., 0.0001 to 10)
    # 'kernel': ['rbf'] # Keep kernel fixed or add 'linear' etc. if desired
}

# Randomized Search Cross-Validation
N_ITER_SEARCH = 5 # Number of parameter settings sampled (adjust as needed)
CV_FOLDS_SEARCH = 5 # Number of folds for cross-validation during search

logging.info(f"--- [{model_name_svm}] Setting up RandomizedSearchCV ---")
logging.info(f"Parameter distributions: {param_dist_svm}")
logging.info(f"Number of iterations: {N_ITER_SEARCH}")
logging.info(f"CV folds: {CV_FOLDS_SEARCH}")

random_search_svm = RandomizedSearchCV(
    estimator=base_svm,
    param_distributions=param_dist_svm,
    n_iter=N_ITER_SEARCH,
    cv=StratifiedKFold(n_splits=CV_FOLDS_SEARCH, shuffle=True, random_state=RANDOM_SEED), # Explicit StratifiedKFold
    scoring='f1', # Optimize for F1
    n_jobs=-1,         # Use all available CPU cores
    random_state=RANDOM_SEED,
    verbose=1          # Set verbosity level for RandomizedSearchCV
)

logging.info(f"Starting hyperparameter search for {model_name_svm}...")
start_time_hpo = datetime.now()

# Perform the search on the scaled training data
random_search_svm.fit(X_train_processed, y_train)

end_time_hpo = datetime.now()
hpo_duration = end_time_hpo - start_time_hpo
logging.info(f"Hyperparameter search finished in: {hpo_duration}")

# Get best parameters
best_params_svm = random_search_svm.best_params_
best_score_svm = random_search_svm.best_score_
logging.info(f"Best parameters found: {best_params_svm}")
logging.info(f"Best cross-validation score ({random_search_svm.scoring}): {best_score_svm:.4f}")

# Add required parameters if not tuned but needed by SVC
best_params_svm['random_state'] = RANDOM_SEED
best_params_svm['class_weight'] = 'balanced'
# best_params_svm['probability'] = False # Ensure this is set if needed by base class


Fitting 5 folds for each of 5 candidates, totalling 25 fits


In [17]:
# --- 3.2 SVM: Train Final Model and Platt Scaler ---
# Now train the final base SVM with best params and the Platt scaler
logging.info(f"--- [{model_name_svm}] Training final model and Platt scaler ---")

CV_FOLDS_PLATT = 5 # Folds for Platt scaling CV

# Ensure y_train is passed as a Series/DataFrame if X_train_processed is NumPy
# Ensure that the base estimator class SVC is passed, not an instance
fitted_svm_base, platt_scaler_svm = train_platt_scaler(
    base_estimator_class=SVC, # Pass the class itself
    best_params=best_params_svm,
    X_train=X_train_processed, # Use scaled data
    y_train=y_train,           # Pass original Series/DF for StratifiedKFold indexing
    n_splits=CV_FOLDS_PLATT,
    random_state=RANDOM_SEED
)

# Save the final base model and the scaler
if fitted_svm_base and platt_scaler_svm:
    base_model_filename_svm = os.path.join(MODEL_DIR, f"{model_name_svm}_base_{timestamp}.joblib")
    scaler_filename_svm = os.path.join(MODEL_DIR, f"{model_name_svm}_platt_scaler_{timestamp}.joblib")

    logging.info(f"Saving final base SVM model to {base_model_filename_svm}")
    joblib.dump(fitted_svm_base, base_model_filename_svm)
    logging.info(f"Saving Platt scaler to {scaler_filename_svm}")
    joblib.dump(platt_scaler_svm, scaler_filename_svm)
    logging.info("Base model and Platt scaler saved successfully.")
else:
    logging.error(f"[{model_name_svm}] Failed to train base model or Platt scaler. Aborting further steps for this model.")
    # Set models to None to prevent errors in subsequent steps
    fitted_svm_base = None
    platt_scaler_svm = None

In [None]:
# --- (Optional) Load Models if already trained ---
# This section is useful if you want to run evaluation separately

LOAD_EXISTING = False # Set to True to load previously saved models/scalers
target_timestamp = "YYYYMMDD_HHMMSS" # Replace with the actual timestamp if LOAD_EXISTING is True

if LOAD_EXISTING:
    base_model_filename_svm_load = os.path.join(MODEL_DIR, f"{model_name_svm}_base_{target_timestamp}.joblib")
    scaler_filename_svm_load = os.path.join(MODEL_DIR, f"{model_name_svm}_platt_scaler_{target_timestamp}.joblib")

    if os.path.exists(base_model_filename_svm_load) and os.path.exists(scaler_filename_svm_load):
        logging.info(f"Loading existing base model: {base_model_filename_svm_load}")
        fitted_svm_base = joblib.load(base_model_filename_svm_load)
        logging.info(f"Loading existing Platt scaler: {scaler_filename_svm_load}")
        platt_scaler_svm = joblib.load(scaler_filename_svm_load)
        logging.info("Models loaded successfully.")
    else:
        logging.error("Load existing set to True, but model/scaler files not found. Cannot proceed.")
        fitted_svm_base = None
        platt_scaler_svm = None


In [18]:
# --- 3.3 SVM: Calibrate Conformal Prediction Threshold ---
ALPHA = 0.1 # Miscoverage rate (e.g., 0.1 for 90% coverage)
q_threshold_svm = None # Initialize

if fitted_svm_base and platt_scaler_svm:
    logging.info(f"--- [{model_name_svm}] Calibrating Conformal Prediction (alpha={ALPHA}) ---")

    # 1. Get calibrated probabilities for the calibration set
    logging.info("Getting decision scores for calibration set...")
    decision_scores_cal_svm = fitted_svm_base.decision_function(X_cal_processed) # Use scaled cal data
    logging.info("Predicting calibrated probabilities for calibration set...")
    # Platt scaler expects 2D input
    calibrated_probs_cal_svm = platt_scaler_svm.predict_proba(decision_scores_cal_svm.reshape(-1, 1))

    # 2. Calculate NCM scores for the calibration set
    logging.info("Calculating NCM scores for calibration set...")
    ncm_scores_cal_svm = calculate_ncm_scores(calibrated_probs_cal_svm, y_cal.values) # Use .values for numpy array

    # 3. Determine the quantile threshold q
    logging.info("Determining CP threshold q...")
    q_threshold_svm = calibrate_conformal_threshold(ncm_scores_cal_svm, ALPHA)

    # Save the threshold
    cp_threshold_filename_svm = os.path.join(MODEL_DIR, f"{model_name_svm}_cp_threshold_alpha{ALPHA}_{timestamp}.joblib")
    logging.info(f"Saving CP threshold ({q_threshold_svm:.6f}) to {cp_threshold_filename_svm}")
    joblib.dump(q_threshold_svm, cp_threshold_filename_svm)

else:
    logging.warning(f"[{model_name_svm}] Base model or Platt scaler not available. Skipping CP calibration.")

In [None]:
# --- (Optional) Load CP Threshold ---
if LOAD_EXISTING and q_threshold_svm is None: # Only load if not calculated above
    cp_threshold_filename_svm_load = os.path.join(MODEL_DIR, f"{model_name_svm}_cp_threshold_alpha{ALPHA}_{target_timestamp}.joblib")
    if os.path.exists(cp_threshold_filename_svm_load):
        logging.info(f"Loading existing CP threshold: {cp_threshold_filename_svm_load}")
        q_threshold_svm = joblib.load(cp_threshold_filename_svm_load)
        logging.info(f"CP threshold ({q_threshold_svm:.6f}) loaded.")
    else:
        logging.error("Load existing set to True, but CP threshold file not found.")
        q_threshold_svm = None

In [19]:
# --- 3.4 SVM: Final Evaluation on Test Set ---
if fitted_svm_base and platt_scaler_svm:
    logging.info(f"--- [{model_name_svm}] Final Evaluation on Test Set ---")

    # 1. Get calibrated probabilities for the test set
    logging.info("Getting decision scores for test set...")
    decision_scores_test_svm = fitted_svm_base.decision_function(X_test_processed) # Use scaled test data
    logging.info("Predicting calibrated probabilities for test set...")
    calibrated_probs_test_svm = platt_scaler_svm.predict_proba(decision_scores_test_svm.reshape(-1, 1))
    
    # Get probabilities for the positive class (Star=1) for standard metrics
    y_proba_test_svm = calibrated_probs_test_svm[:, 1]

    # 2. Get point predictions (e.g., using 0.5 threshold on calibrated probs)
    y_pred_test_svm = (y_proba_test_svm >= 0.5).astype(int)

    # 3. Calculate standard performance metrics
    logging.info("Calculating standard performance metrics...")
    metrics_svm = calculate_metrics(y_test, y_pred_test_svm, y_proba_test_svm, model_name=model_name_svm)

    # 4. Evaluate Conformal Prediction (if threshold is available)
    if q_threshold_svm is not None:
        logging.info("Generating and evaluating conformal prediction sets...")
        prediction_sets_test_svm = predict_conformal_sets(calibrated_probs_test_svm, q_threshold_svm)
        cp_coverage_svm, cp_avg_set_size_svm = evaluate_conformal_prediction(
            y_test, prediction_sets_test_svm, ALPHA, model_name=model_name_svm
        )
    else:
        logging.warning(f"[{model_name_svm}] CP threshold not available. Skipping CP evaluation.")
        cp_coverage_svm, cp_avg_set_size_svm = None, None

else:
    logging.warning(f"[{model_name_svm}] Base model or Platt scaler not loaded/trained. Skipping final evaluation.")

## 4. Model Implementation: Decision Tree (CART)

In [39]:
# --- 4.1 CART: Define Model ---
model_name_cart = "cart"
model_cart = None

# Hyperparameters (Defaults tend to overfit, apply some basic constraints)
cart_params = {
    'criterion': 'gini',        # Split quality measure ('gini' or 'entropy')
    'max_depth': 15,            # Max depth to prevent overfitting (tune later)
    'min_samples_split': 10,    # Min samples required to split an internal node (tune later)
    'min_samples_leaf': 5,      # Min samples required at a leaf node (tune later)
    'random_state': RANDOM_SEED,
    'class_weight': 'balanced' # Handle imbalance
}

model_cart = DecisionTreeClassifier(**cart_params)
logging.info(f"Defined CART model '{model_name_cart}' with params: {cart_params}")

In [40]:
# --- 4.2 CART: Train Model ---
logging.info(f"--- Training {model_name_cart} ---")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_filename_cart = os.path.join(MODEL_DIR, f"{model_name_cart}_{timestamp}.joblib")

if os.path.exists(model_filename_cart):
    logging.warning(f"Model file {model_filename_cart} already exists. Skipping training.")
else:
    logging.info(f"Starting training for {model_name_cart}...")
    start_time = datetime.now()

    # Tree models don't strictly need scaling, but we use processed data for consistency
    model_cart.fit(X_train_processed, y_train)

    end_time = datetime.now()
    training_duration = end_time - start_time
    logging.info(f"Training finished in: {training_duration}")
    logging.info(f"Saving trained model to {model_filename_cart}")
    joblib.dump(model_cart, model_filename_cart)
    logging.info("Model saved successfully.")

In [41]:
# --- 4.3 CART: Load Model ---
list_of_cart_files = glob.glob(os.path.join(MODEL_DIR, f"{model_name_cart}_*.joblib"))
if list_of_cart_files:
    latest_cart_file = max(list_of_cart_files, key=os.path.getctime)
    logging.info(f"Loading latest CART model: {latest_cart_file}")
    try:
        model_cart_loaded = joblib.load(latest_cart_file)
        logging.info("CART model loaded successfully.")
    except Exception as e:
        logging.error(f"Error loading CART model: {e}")
        model_cart_loaded = None
else:
    logging.warning(f"No saved models found for {model_name_cart} in {MODEL_DIR}")
    model_cart_loaded = None

In [42]:
# --- 4.4 CART: Test Model ---
if model_cart_loaded:
    logging.info(f"--- Testing {model_name_cart} ---")
    y_pred_cart = model_cart_loaded.predict(X_test_processed)
    y_proba_cart = model_cart_loaded.predict_proba(X_test_processed)[:, 1]

    # Evaluate
    accuracy_cart = accuracy_score(y_test, y_pred_cart)
    roc_auc_cart = roc_auc_score(y_test, y_proba_cart)
    report_cart = classification_report(y_test, y_pred_cart, target_names=['Galaxy (0)', 'Star (1)'])
    cm_cart = confusion_matrix(y_test, y_pred_cart)

    logging.info(f"CART Test Accuracy: {accuracy_cart:.4f}")
    logging.info(f"CART Test ROC AUC: {roc_auc_cart:.4f}")
    logging.info(f"CART Classification Report:\n{report_cart}")
    logging.info(f"CART Confusion Matrix:\n{cm_cart}")
else:
    logging.warning("CART model not loaded. Skipping testing.")

## 5. Model Implementation: Random Forest

In [43]:
# --- 5.1 RF: Define Model ---
model_name_rf = "random_forest"
model_rf = None

# Hyperparameters (Good starting point)
rf_params = {
    'n_estimators': 200,        # Number of trees in the forest
    'criterion': 'gini',
    'max_depth': None,          # Grow trees fully (or set a limit like CART)
    'min_samples_split': 2,     # Default
    'min_samples_leaf': 1,      # Default (can increase for regularization)
    'max_features': 'sqrt',     # Number of features to consider for best split ('sqrt', 'log2', or int/float)
    'bootstrap': True,          # Use bootstrap samples
    'random_state': RANDOM_SEED,
    'n_jobs': -1,               # Use all available CPU cores
    'class_weight': 'balanced'  # Handle imbalance
}

model_rf = RandomForestClassifier(**rf_params)
logging.info(f"Defined RF model '{model_name_rf}' with params: {rf_params}")

In [44]:
# --- 5.2 RF: Train Model ---
logging.info(f"--- Training {model_name_rf} ---")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_filename_rf = os.path.join(MODEL_DIR, f"{model_name_rf}_{timestamp}.joblib")

if os.path.exists(model_filename_rf):
    logging.warning(f"Model file {model_filename_rf} already exists. Skipping training.")
else:
    logging.info(f"Starting training for {model_name_rf}...")
    start_time = datetime.now()

    model_rf.fit(X_train_processed, y_train)

    end_time = datetime.now()
    training_duration = end_time - start_time
    logging.info(f"Training finished in: {training_duration}")
    logging.info(f"Saving trained model to {model_filename_rf}")
    joblib.dump(model_rf, model_filename_rf)
    logging.info("Model saved successfully.")


In [45]:
# --- 5.3 RF: Load Model ---
list_of_rf_files = glob.glob(os.path.join(MODEL_DIR, f"{model_name_rf}_*.joblib"))
if list_of_rf_files:
    latest_rf_file = max(list_of_rf_files, key=os.path.getctime)
    logging.info(f"Loading latest RF model: {latest_rf_file}")
    try:
        model_rf_loaded = joblib.load(latest_rf_file)
        logging.info("RF model loaded successfully.")
    except Exception as e:
        logging.error(f"Error loading RF model: {e}")
        model_rf_loaded = None
else:
    logging.warning(f"No saved models found for {model_name_rf} in {MODEL_DIR}")
    model_rf_loaded = None

In [46]:
# --- 5.4 RF: Test Model ---
if model_rf_loaded:
    logging.info(f"--- Testing {model_name_rf} ---")
    y_pred_rf = model_rf_loaded.predict(X_test_processed)
    y_proba_rf = model_rf_loaded.predict_proba(X_test_processed)[:, 1]

    # Evaluate
    accuracy_rf = accuracy_score(y_test, y_pred_rf)
    roc_auc_rf = roc_auc_score(y_test, y_proba_rf)
    report_rf = classification_report(y_test, y_pred_rf, target_names=['Galaxy (0)', 'Star (1)'])
    cm_rf = confusion_matrix(y_test, y_pred_rf)

    logging.info(f"RF Test Accuracy: {accuracy_rf:.4f}")
    logging.info(f"RF Test ROC AUC: {roc_auc_rf:.4f}")
    logging.info(f"RF Classification Report:\n{report_rf}")
    logging.info(f"RF Confusion Matrix:\n{cm_rf}")
else:
    logging.warning("RF model not loaded. Skipping testing.")

In [47]:
# --- 6.1 XGB: Define Model ---
model_name_xgb = "xgboost"
model_xgb = None

# Hyperparameters
xgb_params = {
    'objective': 'binary:logistic', # Objective function for binary classification
    'eval_metric': 'auc',           # Evaluation metric ('logloss', 'auc', 'error')
    'n_estimators': 200,            # Number of boosting rounds/trees
    'learning_rate': 0.1,           # Step size shrinkage
    'max_depth': 5,                 # Maximum tree depth
    'subsample': 0.8,               # Fraction of samples used per tree
    'colsample_bytree': 0.8,        # Fraction of features used per tree
    'gamma': 0,                     # Minimum loss reduction required to make a split
    'reg_alpha': 0,                 # L1 regularization
    'reg_lambda': 1,                # L2 regularization (default)
    'use_label_encoder': False,     # Recommended setting for recent versions
    'random_state': RANDOM_SEED,
    'n_jobs': -1
    # scale_pos_weight can be used for imbalance, but often handled by eval_metric='auc' and tuning
}

model_xgb = xgb.XGBClassifier(**xgb_params)
logging.info(f"Defined XGBoost model '{model_name_xgb}' with params: {xgb_params}")

In [48]:
# --- 6.2 XGB: Train Model ---
logging.info(f"--- Training {model_name_xgb} ---")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# XGBoost has its own save method, but joblib works too for consistency here
model_filename_xgb = os.path.join(MODEL_DIR, f"{model_name_xgb}_{timestamp}.joblib")

if os.path.exists(model_filename_xgb):
    logging.warning(f"Model file {model_filename_xgb} already exists. Skipping training.")
else:
    logging.info(f"Starting training for {model_name_xgb}...")
    start_time = datetime.now()

    # Use validation set for early stopping
    eval_set = [(X_val_processed, y_val)]
    early_stopping_rounds = 15 # Stop if no improvement on eval_set for 15 rounds

    model_xgb.fit(X_train_processed, y_train,
                  eval_set=eval_set,
                  early_stopping_rounds=early_stopping_rounds,
                  verbose=False) # Set verbose=True or integer for progress logs

    end_time = datetime.now()
    training_duration = end_time - start_time
    logging.info(f"Training finished in: {training_duration}")
    logging.info(f"Best iteration: {model_xgb.best_iteration}, Best score ({xgb_params['eval_metric']}): {model_xgb.best_score:.4f}")
    logging.info(f"Saving trained model to {model_filename_xgb}")
    joblib.dump(model_xgb, model_filename_xgb)
    # Alternatively: model_xgb.save_model(model_filename_xgb.replace('.joblib', '.xgb'))
    logging.info("Model saved successfully.")

TypeError: fit() got an unexpected keyword argument 'early_stopping_rounds'

In [None]:
# --- 6.3 XGB: Load Model ---
list_of_xgb_files = glob.glob(os.path.join(MODEL_DIR, f"{model_name_xgb}_*.joblib"))
if list_of_xgb_files:
    latest_xgb_file = max(list_of_xgb_files, key=os.path.getctime)
    logging.info(f"Loading latest XGBoost model: {latest_xgb_file}")
    try:
        model_xgb_loaded = joblib.load(latest_xgb_file)
        # If using native save:
        # model_xgb_loaded = xgb.XGBClassifier()
        # model_xgb_loaded.load_model(latest_xgb_file.replace('.joblib', '.xgb'))
        logging.info("XGBoost model loaded successfully.")
    except Exception as e:
        logging.error(f"Error loading XGBoost model: {e}")
        model_xgb_loaded = None
else:
    logging.warning(f"No saved models found for {model_name_xgb} in {MODEL_DIR}")
    model_xgb_loaded = None

In [None]:
# --- 6.4 XGB: Test Model ---
if model_xgb_loaded:
    logging.info(f"--- Testing {model_name_xgb} ---")
    y_pred_xgb = model_xgb_loaded.predict(X_test_processed)
    y_proba_xgb = model_xgb_loaded.predict_proba(X_test_processed)[:, 1]

    # Evaluate
    accuracy_xgb = accuracy_score(y_test, y_pred_xgb)
    roc_auc_xgb = roc_auc_score(y_test, y_proba_xgb)
    report_xgb = classification_report(y_test, y_pred_xgb, target_names=['Galaxy (0)', 'Star (1)'])
    cm_xgb = confusion_matrix(y_test, y_pred_xgb)

    logging.info(f"XGBoost Test Accuracy: {accuracy_xgb:.4f}")
    logging.info(f"XGBoost Test ROC AUC: {roc_auc_xgb:.4f}")
    logging.info(f"XGBoost Classification Report:\n{report_xgb}")
    logging.info(f"XGBoost Confusion Matrix:\n{cm_xgb}")
else:
    logging.warning("XGBoost model not loaded. Skipping testing.")


## 7. Model Implementation: LightGBM

In [None]:
# --- 7.1 LGBM: Define Model ---
model_name_lgbm = "lightgbm"
model_lgbm = None

# Hyperparameters
lgbm_params = {
    'objective': 'binary',          # Binary classification
    'metric': 'auc',                # Evaluation metric ('auc', 'binary_logloss')
    'n_estimators': 200,
    'learning_rate': 0.1,
    'num_leaves': 31,               # Default, main parameter to control complexity
    'max_depth': -1,                # Default: no limit (num_leaves is often preferred)
    'feature_fraction': 0.8,        # Equivalent to colsample_bytree
    'bagging_fraction': 0.8,        # Equivalent to subsample
    'bagging_freq': 1,              # Perform bagging at every iteration
    'reg_alpha': 0,
    'reg_lambda': 0,                # Default: 0 for LightGBM
    'random_state': RANDOM_SEED,
    'n_jobs': -1,
    'class_weight': 'balanced'      # Handle imbalance
}

model_lgbm = lgb.LGBMClassifier(**lgbm_params)
logging.info(f"Defined LightGBM model '{model_name_lgbm}' with params: {lgbm_params}")

In [None]:
# --- 7.2 LGBM: Train Model ---
logging.info(f"--- Training {model_name_lgbm} ---")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# LightGBM also has native save, using joblib here
model_filename_lgbm = os.path.join(MODEL_DIR, f"{model_name_lgbm}_{timestamp}.joblib")

if os.path.exists(model_filename_lgbm):
    logging.warning(f"Model file {model_filename_lgbm} already exists. Skipping training.")
else:
    logging.info(f"Starting training for {model_name_lgbm}...")
    start_time = datetime.now()

    # Use validation set for early stopping
    eval_set = [(X_val_processed, y_val)]
    early_stopping_rounds = 15

    # Need callbacks for early stopping in older versions, but newer ones have direct param
    # from lightgbm import early_stopping
    # callbacks = [early_stopping(stopping_rounds=early_stopping_rounds, verbose=1)]

    model_lgbm.fit(X_train_processed, y_train,
                   eval_set=eval_set,
                   eval_metric=lgbm_params['metric'],
                   # callbacks=callbacks # Use this if early_stopping_rounds is not in fit()
                   # For newer versions:
                   early_stopping_rounds=early_stopping_rounds
                   )

    end_time = datetime.now()
    training_duration = end_time - start_time
    logging.info(f"Training finished in: {training_duration}")
    logging.info(f"Best iteration: {model_lgbm.best_iteration_}, Best score ({lgbm_params['metric']}): {model_lgbm.best_score_['valid_0'][lgbm_params['metric']]:.4f}")
    logging.info(f"Saving trained model to {model_filename_lgbm}")
    joblib.dump(model_lgbm, model_filename_lgbm)
    # Alternatively: model_lgbm.booster_.save_model(model_filename_lgbm.replace('.joblib', '.txt'))
    logging.info("Model saved successfully.")

In [None]:
# --- 7.3 LGBM: Load Model ---
list_of_lgbm_files = glob.glob(os.path.join(MODEL_DIR, f"{model_name_lgbm}_*.joblib"))
if list_of_lgbm_files:
    latest_lgbm_file = max(list_of_lgbm_files, key=os.path.getctime)
    logging.info(f"Loading latest LightGBM model: {latest_lgbm_file}")
    try:
        model_lgbm_loaded = joblib.load(latest_lgbm_file)
        # If using native save:
        # model_lgbm_loaded = lgb.Booster(model_file=latest_lgbm_file.replace('.joblib', '.txt')) # Note: loads Booster, need wrapper for predict
        # model_lgbm_loaded_clf = lgb.LGBMClassifier()
        # model_lgbm_loaded_clf.booster_ = model_lgbm_loaded
        logging.info("LightGBM model loaded successfully.")
    except Exception as e:
        logging.error(f"Error loading LightGBM model: {e}")
        model_lgbm_loaded = None
else:
    logging.warning(f"No saved models found for {model_name_lgbm} in {MODEL_DIR}")
    model_lgbm_loaded = None

In [None]:
# --- 7.4 LGBM: Test Model ---
if model_lgbm_loaded:
    logging.info(f"--- Testing {model_name_lgbm} ---")
    # If loaded Booster directly, need wrapper or use booster_.predict
    # y_pred_lgbm = (model_lgbm_loaded.predict(X_test_processed) > 0.5).astype(int) # Booster predicts scores
    # y_proba_lgbm = model_lgbm_loaded.predict(X_test_processed)
    # If loaded via joblib (as LGBMClassifier):
    y_pred_lgbm = model_lgbm_loaded.predict(X_test_processed)
    y_proba_lgbm = model_lgbm_loaded.predict_proba(X_test_processed)[:, 1]


    # Evaluate
    accuracy_lgbm = accuracy_score(y_test, y_pred_lgbm)
    roc_auc_lgbm = roc_auc_score(y_test, y_proba_lgbm)
    report_lgbm = classification_report(y_test, y_pred_lgbm, target_names=['Galaxy (0)', 'Star (1)'])
    cm_lgbm = confusion_matrix(y_test, y_pred_lgbm)

    logging.info(f"LightGBM Test Accuracy: {accuracy_lgbm:.4f}")
    logging.info(f"LightGBM Test ROC AUC: {roc_auc_lgbm:.4f}")
    logging.info(f"LightGBM Classification Report:\n{report_lgbm}")
    logging.info(f"LightGBM Confusion Matrix:\n{cm_lgbm}")
else:
    logging.warning("LightGBM model not loaded. Skipping testing.")

In [None]:
# ## 9. Next Steps
#
# - **Hyperparameter Tuning:** Use the validation set (`X_val_processed`, `y_val`) with techniques like `GridSearchCV` or `RandomizedSearchCV` to find optimal hyperparameters for each model.
# - **Feature Engineering:** Create new features (e.g., colors like F365W - F814W) and evaluate their impact.
# - **Feature Importance:** Analyze feature importance plots (especially for tree-based models) to understand which ALHAMBRA measurements are most predictive.
# - **Calibration:** Implement calibration methods using the calibration set.
# - **Error Analysis:** Investigate misclassified examples in the test set to understand model weaknesses.
# - **Comparison:** Systematically compare the performance metrics of all tuned and calibrated models.