In [None]:
from typing import Optional, Dict, Any
import pandas as pd
import numpy as np
import optuna

from sklearn.impute import KNNImputer
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import StratifiedKFold

from sklearn.ensemble import HistGradientBoostingClassifier, VotingClassifier, RandomForestClassifier

from xgboost import XGBClassifier
from lightgbm import LGBMClassifier

In [None]:
class FeatureEngineering:
    
    def ship_related_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """Splits 'Cabin' into 'Deck' and 'Cabin_part'.
        
        Args:
            data (pd.DataFrame): The input DataFrame containing the 'Cabin' column.

        Returns:
            pd.DataFrame: The updated DataFrame with 'Deck' and 'Cabin_part' columns.
        """

        # Split 'Cabin' column by '/' and assign first and third parts to 'Deck' and 'Cabin_part'
        data[['Deck', 'Cabin_part']] = data['Cabin'].str.split("/", expand=True).iloc[:, [0, 2]]

        return data

    def passenger_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """Creates passenger-related features including group size and family details.
        
        Args:
            data (pd.DataFrame): The input DataFrame containing passenger information.

        Returns:
            pd.DataFrame: The updated DataFrame with additional passenger features.
        """
        # Count number of passengers per 'Cabin' and assign result to a new column
        data['NUMBER_OF_PASSENGERS_PER_CABIN'] = data.groupby('Cabin')['PassengerId'].transform('count')

        # Count passengers per group by splitting 'PassengerId' and assigning result to a new column
        data['NUMBER_OF_PASSENGERS_PER_GROUP'] = data['PassengerId'].str.split("_").str[0].map(
            data['PassengerId'].str.split("_").str[0].value_counts()
        )

        # Calculate mean age per group and assign to a new column
        data['Avg_Age_Per_Group'] = data.groupby('NUMBER_OF_PASSENGERS_PER_GROUP')['Age'].transform('mean')

        # Create binary columns indicating if the passenger is alone in their group or cabin
        data['wasAlonePerGroup'] = (data['NUMBER_OF_PASSENGERS_PER_GROUP'] == 1).astype(int)
        data['wasAlonePerCabin'] = (data['NUMBER_OF_PASSENGERS_PER_CABIN'] == 1).astype(int)

        # Extract last name from 'Name' and assign to a new column
        data['LAST_NAME'] = data['Name'].str.split().str[1]

        # Count passengers with the same last name (family size) and assign to a temporary column
        data['FAMILY'] = data['LAST_NAME'].map(data['LAST_NAME'].value_counts())

        # Create binary column to indicate if the passenger traveled with family
        data['TRAVELLED_WITH_FAMILY'] = (data['FAMILY'] > 1).astype(int)

        # Remove the temporary 'FAMILY' column as it’s no longer needed
        data.drop(columns=['FAMILY'], inplace=True)

        return data

    def service_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """Calculates service-related spending features.
        
        Args:
            data (pd.DataFrame): The input DataFrame containing service spending information.

        Returns:
            pd.DataFrame: The updated DataFrame with service-related features.
        """
        # List of columns related to service spending
        service_columns = ['RoomService', 'Spa', 'FoodCourt', 'ShoppingMall', 'VRDeck']

        # Calculate mean, median, and total spending per passenger
        mean_spending_pass = data.groupby('PassengerId')[service_columns].mean()
        median_spending_pass = data.groupby('PassengerId')[service_columns].median()
        total_spending_pass = data.groupby('PassengerId')[service_columns].sum()

        # Calculate mean, median, and total spending per family based on last name
        mean_spending_fam = data.groupby('LAST_NAME')[service_columns].mean()
        median_spending_fam = data.groupby('LAST_NAME')[service_columns].median()
        total_spending_fam = data.groupby('LAST_NAME')[service_columns].sum()

        # Map spending statistics to each passenger for both passenger-based and family-based spending
        for column in service_columns:
            # Per passenger
            data[f'Mean_Spending_On_{column}_Pass'] = data['PassengerId'].map(mean_spending_pass[column])
            data[f'Median_Spending_On_{column}_Pass'] = data['PassengerId'].map(median_spending_pass[column])
            data[f'Total_Spending_On_{column}_Pass'] = data['PassengerId'].map(total_spending_pass[column])

            # Per family
            data[f'Mean_Spending_On_{column}_Fam'] = data['LAST_NAME'].map(mean_spending_fam[column])
            data[f'Median_Spending_On_{column}_Fam'] = data['LAST_NAME'].map(median_spending_fam[column])
            data[f'Total_Spending_On_{column}_Fam'] = data['LAST_NAME'].map(total_spending_fam[column])

        # Create binary columns for age and category indicators
        data['isMinor'] = (data['Age'] < 18).astype(int)
        data['Age_Cat'] = data['Age'].apply(lambda x: 'Child' if x <= 12 else 
                                              'Teen' if x < 18 else 
                                              'Adult' if x < 64 else 'Senior')

        # Create binary columns indicating if the passenger is in CryoSleep or is VIP
        data['isCyroSleep'] = data['CryoSleep'].apply(lambda x: 1 if x == 'True' else 0)
        data['isVIP'] = data['VIP'].apply(lambda x: 1 if x == 'True' else 0)

        # Calculate and add family size for each passenger based on last name
        data['Average_Family_Size'] = data.groupby('LAST_NAME')['PassengerId'].transform('count')

        return data
    
    def handling_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
        """Handles missing values in the dataset.
        
        Args:
            data (pd.DataFrame): The input DataFrame to handle missing values.

        Raises:
            NotImplementedError: This method is not yet implemented.
        """
        # Raise an error to indicate the function has not yet been implemented
        raise NotImplementedError("This method is not yet implemented!")
    
    def destination_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """Creates destination-related features.
        
        Args:
            data (pd.DataFrame): The input DataFrame containing destination related features.

        Returns:
            pd.DataFrame: The updated DataFrame with destination related features.
        """
        # Count the number of occurrences of each 'Destination' by 'HomePlanet'
        data['Destination_Count_By_HomePlanet'] = data.groupby('Destination')['HomePlanet'].transform('count')

        return data 

    
    def missing_value_imputation(self, data: pd.DataFrame, n_neighbors: int, columns_to_impute: list) -> pd.DataFrame:
        """Imputes missing values using the KNNImputer.
        
        Args:
            data (pd.DataFrame): The DataFrame containing missing values.
            n_neighbors (int): Number of neighbors for KNNImputer.
            columns_to_impute (list): Columns to be imputed.

        Returns:
            pd.DataFrame: The DataFrame with missing values imputed.
        """
        # Initialize the KNN imputer with the specified number of neighbors
        imputer = KNNImputer(n_neighbors=n_neighbors)

        # Select only the columns to be imputed
        data_to_impute = data[columns_to_impute]

        # Fit and transform the data for imputation, then update original data
        data_imputed = pd.DataFrame(imputer.fit_transform(data_to_impute), columns=columns_to_impute, index=data.index)
        data.update(data_imputed)

        return data

    def feature_pipeline(self, data: pd.DataFrame) -> pd.DataFrame:
        """Runs the full feature engineering pipeline on data.
        
        Args:
            data (pd.DataFrame): The input DataFrame to process.

        Returns:
            pd.DataFrame: The processed DataFrame with all features engineered.
        """
        # Apply each feature engineering function to the data
        data_processed = self.ship_related_features(data)
        data_processed = self.passenger_features(data_processed)
        data_processed = self.service_features(data_processed)
        data_processed = self.destination_features(data_processed)

        return data_processed
    
    def data_split(self, data: pd.DataFrame, features: list[str], target: str) -> tuple[pd.DataFrame, pd.Series]:
        """Splits the data into features and target.
        
        Args:
            data (pd.DataFrame): The input DataFrame containing features and target.
            features (list[str]): The list of feature column names.
            target (str): The name of the target column.

        Returns:
            tuple[pd.DataFrame, pd.Series]: A tuple containing the features DataFrame and target Series.
        """
        # Select the feature columns and target column from data
        X_train = data[features]
        y_train = data[target]

        return X_train, y_train
    
    def get_dummies(self, data: pd.DataFrame, dtype: type) -> pd.DataFrame:
        """Creates dummy variables for categorical features.
        
        Args:
            data (pd.DataFrame): The input DataFrame to create dummy variables from.
            dtype (type): The desired data type for the resulting dummy variables.

        Returns:
            pd.DataFrame: The DataFrame with dummy variables added.
        """
        # Convert categorical columns to dummy/one-hot encoded columns
        dummied_data = pd.get_dummies(data, dtype=dtype)
        
        return dummied_data


In [None]:
class Modelling:
    """
    A class to handle machine learning model initialization and hyperparameter tuning using Optuna.

    Attributes:
        model (Any): The initialized machine learning model.
        available_models (Dict[str, Any]): A dictionary mapping model names to their respective classes.
    """

    def __init__(self):
        """
        Initializes the Modelling class and defines available models for initialization.
        """
        # No model initialized yet
        self.model = None

        # Dictionary to hold available model classes for initialization
        self.available_models = {
            'RandomForestClassifier': RandomForestClassifier, 
            'XGBClassifier': XGBClassifier,
            'LGBMClassifier': LGBMClassifier,
            'HistGradientBoostingClassifier': HistGradientBoostingClassifier,
        }


    def initialize_model(self, model_name: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """
        Initializes and returns a machine learning model based on the model_name.

        Args:
            model_name (str): Name of the model to initialize.
            params (Optional[Dict[str, Any]]): Hyperparameters for the model. Defaults to None.

        Returns:
            Any: An instance of the selected model.

        Raises:
            ValueError: If the model_name is not in available_models.
        """

        # Check if the model name is valid
        if model_name not in self.available_models:
            print(f"The {model_name} is not yet available, you can select a following model: {list(self.available_models.keys())}")
            raise ValueError(f"{model_name} is not a valid model.")
        
        # Initialize and return the model with given parameters, or empty dictionary if None
        return self.available_models[model_name](**(params or {}))
    
    def predict_test_set(self, model: Any, X_test: pd.DataFrame) -> np.ndarray:

        """
        Predicts the target variable for the given test set using the provided model.

        Args:
            model (Any): The trained model used for making predictions.
            X_test (pd.DataFrame): The test set features for which predictions are to be made.

        Returns:
            np.ndarray: The predicted values for the test set.
        """
        predictions = model.predict(X_test)
        return predictions

    def fit_model(self, model: Any, X_train: pd.DataFrame, y_train: pd.Series) -> Any:
        """
        Fits the provided model to the training data.

        Args:
            model (Any): The model to be trained.
            X_train (pd.DataFrame): The training set features.
            y_train (pd.Series): The target variable corresponding to the training set features.

        Returns:
            Any: The fitted model.
        """
        return model.fit(X_train, y_train)

        
    def tune_model(self, model_name: str, X_train: Any, y_train: Any, n_trials: int) -> Dict[str, Any]:
        """
        Tunes the specified model's hyperparameters using Optuna.

        Args:
            model_name (str): Name of the model to tune.
            X_train (Any): Training features.
            y_train (Any): Training labels.
            n_trials (int): Number of trials for hyperparameter tuning.

        Returns:
            Dict[str, Any]: The best parameters found during tuning.

        Raises:
            ValueError: If the model_name is not in available_models.
        """

        # Suppress Optuna's verbose logging
        optuna.logging.set_verbosity(optuna.logging.WARNING)

        # Check if the model name is valid
        if model_name not in self.available_models:
            print(f"The {model_name} is not yet available, you can select a following model: {list(self.available_models.keys())}")
            raise ValueError(f"{model_name} is not a valid model.")

        # Choose the appropriate tuning method based on model name
        if model_name == 'RandomForestClassifier':
            return self.tune_random_forest(X_train, y_train, n_trials)

        elif model_name == 'XGBClassifier':
            return self.tune_xgb(X_train, y_train, n_trials)

        elif model_name == 'LGBMClassifier':
            return self.tune_lgbm(X_train, y_train, n_trials)
        
        elif model_name == 'HistGradientBoostingClassifier':
            return self.tune_hgbc(X_train, y_train, n_trials)

        else:
            raise ValueError(f"{model_name} is not a valid model.")

    def tune_random_forest(self, X_train: Any, y_train: Any, n_trials: int) -> Dict[str, Any]:
        """
        Tunes hyperparameters for the RandomForestClassifier.

        Args:
            X_train (Any): Training features.
            y_train (Any): Training labels.
            n_trials (int): Number of trials for hyperparameter tuning.

        Returns:
            Dict[str, Any]: The best parameters found during tuning.
        """

        # Define hyperparameters to tune
        def rfc_objective(trial):
            n_estimators = trial.suggest_int("n_estimators", 50, 500)
            max_depth = trial.suggest_int("max_depth", 3, 20)
            min_samples_split = trial.suggest_int("min_samples_split", 2, 20)
            min_samples_leaf = trial.suggest_int("min_samples_leaf", 1, 10)
            bootstrap = trial.suggest_categorical("bootstrap", [True, False])
            criterion = trial.suggest_categorical("criterion", ["gini", "entropy"])
            max_leaf_nodes = trial.suggest_int("max_leaf_nodes", 10, 100)

            # Initialize model with suggested hyperparameters
            rf = RandomForestClassifier(
                n_estimators=n_estimators,
                max_depth=max_depth,
                min_samples_split=min_samples_split,
                min_samples_leaf=min_samples_leaf,
                bootstrap=bootstrap,
                criterion = criterion,
                max_leaf_nodes = max_leaf_nodes,
                random_state=42,
            )

             # Perform cross-validation and return mean accuracy
            cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
            score = cross_val_score(rf, X_train, y_train, cv=cv, scoring="accuracy", n_jobs = -1).mean()
            return score

        # Create and run an Optuna study
        rfc_study = optuna.create_study(direction="maximize")
        rfc_study.optimize(rfc_objective, n_trials=n_trials)

        print("Best parameters for RandomForestClassifier:", rfc_study.best_params)
        print("Best score for RandomForestClassifier:", rfc_study.best_value)

        return rfc_study.best_params

    def tune_xgb(self, X_train: Any, y_train: Any, n_trials: int) -> Dict[str, Any]:
        """
        Tunes hyperparameters for the XGBClassifier.

        Args:
            X_train (Any): Training features.
            y_train (Any): Training labels.
            n_trials (int): Number of trials for hyperparameter tuning.

        Returns:
            Dict[str, Any]: The best parameters found during tuning.
        """
        def xgb_objective(trial):
            n_estimators = trial.suggest_int("n_estimators", 50, 500)
            max_depth = trial.suggest_int("max_depth", 3, 15)
            learning_rate = trial.suggest_float("learning_rate", 0.01, 0.3)
            subsample = trial.suggest_float("subsample", 0.5, 1.0)
            colsample_bytree = trial.suggest_float("colsample_bytree", 0.5, 1.0)
            gamma = trial.suggest_float("gamma", 0, 5)
            reg_alpha = trial.suggest_float("reg_alpha", 0, 10)
            reg_lambda = trial.suggest_float("reg_lambda", 0, 10)
            scale_pos_weight = trial.suggest_float("scale_pos_weight", 0.5, 1.0)
            min_child_weight = trial.suggest_int("min_child_weight", 1, 10)
            
            xgb = XGBClassifier(
                n_estimators=n_estimators,
                max_depth=max_depth,
                learning_rate=learning_rate,
                subsample=subsample,
                colsample_bytree=colsample_bytree,
                gamma=gamma,
                reg_alpha=reg_alpha,
                reg_lambda=reg_lambda,
                scale_pos_weight=scale_pos_weight,
                min_child_weight=min_child_weight,
                random_state=42,
            )

            

            cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
            score = cross_val_score(xgb, X_train, y_train, cv=cv, scoring="accuracy", n_jobs = -1).mean()
            return score

        xgb_study = optuna.create_study(direction="maximize")
        xgb_study.optimize(xgb_objective, n_trials=n_trials)

        print("Best parameters for XGBClassifier:", xgb_study.best_params)
        print("Best score for XGBClassifier:", xgb_study.best_value)

        return xgb_study.best_params

    def tune_lgbm(self, X_train: Any, y_train: Any, n_trials: int) -> Dict[str, Any]:
        """
        Tunes hyperparameters for the LGBMClassifier.

        Args:
            X_train (Any): Training features.
            y_train (Any): Training labels.
            n_trials (int): Number of trials for hyperparameter tuning.

        Returns:
            Dict[str, Any]: The best parameters found during tuning.
        """
        def lgbm_objective(trial):
            n_estimators = trial.suggest_int("n_estimators", 50, 500)
            max_depth = trial.suggest_int("max_depth", -1, 15)  # -1 means no limit
            learning_rate = trial.suggest_float("learning_rate", 0.01, 0.3)
            num_leaves = trial.suggest_int("num_leaves", 2, 256)
            min_child_samples = trial.suggest_int("min_child_samples", 5, 100)
            subsample = trial.suggest_float("subsample", 0.5, 1.0)
            colsample_bytree = trial.suggest_float("colsample_bytree", 0.5, 1.0)
            reg_alpha = trial.suggest_float("reg_alpha", 0, 10)
            reg_lambda = trial.suggest_float("reg_lambda", 0, 10)

            lgbm = LGBMClassifier(
                n_estimators=n_estimators,
                max_depth=max_depth,
                learning_rate=learning_rate,
                num_leaves=num_leaves,
                min_child_samples=min_child_samples,
                subsample=subsample,
                colsample_bytree=colsample_bytree,
                reg_alpha=reg_alpha,
                reg_lambda=reg_lambda,
                random_state=42,
            )

            cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
            score = cross_val_score(lgbm, X_train, y_train, cv=cv, scoring="accuracy", n_jobs = -1).mean()
            return score

        lgbm_study = optuna.create_study(direction="maximize")
        lgbm_study.optimize(lgbm_objective, n_trials=n_trials)

        print("Best parameters for LGBMClassifier:", lgbm_study.best_params)
        print("Best score for LGBMClassifier:", lgbm_study.best_value)

        return lgbm_study.best_params
    
    def tune_hgbc(self, X_train: Any, y_train: Any, n_trials: int) -> Dict[str, Any]:
        """
        Tunes hyperparameters for the HGBClassifier.

        Args:
            X_train (Any): Training features.
            y_train (Any): Training labels.
            n_trials (int): Number of trials for hyperparameter tuning.

        Returns:
            Dict[str, Any]: The best parameters found during tuning.
        """

        def hgbc_objective(trial):

            learning_rate =  trial.suggest_float("learning_rate", 0.01, 0.3, log=True)
            max_iter = trial.suggest_int("max_iter", 100, 1000)
            max_leaf_nodes = trial.suggest_int("max_leaf_nodes", 10, 255)
            max_depth = trial.suggest_int("max_depth", 3, 15)
            min_samples_leaf = trial.suggest_int("min_samples_leaf", 10, 200)
            l2_regularization = trial.suggest_float("l2_regularization", 0.0, 10.0)
            max_bins = trial.suggest_int("max_bins", 50, 255)
            early_stopping = trial.suggest_categorical("early_stopping", [True, False])
            scoring = "accuracy"  # Setting scoring to accuracy

            hgbc = HistGradientBoostingClassifier (
                learning_rate=learning_rate,
                max_iter=max_iter,
                max_leaf_nodes=max_leaf_nodes,
                max_depth=max_depth,
                min_samples_leaf=min_samples_leaf,
                l2_regularization=l2_regularization,
                max_bins=max_bins,
                early_stopping=early_stopping,
                scoring=scoring)
            
            cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
            score = cross_val_score(hgbc, X_train, y_train, cv=cv, scoring="accuracy", n_jobs = -1).mean()
            return score

        hgbc_study = optuna.create_study(direction="maximize")
        hgbc_study.optimize(hgbc_objective, n_trials=n_trials)

        print("Best parameters for HGBClassifier:", hgbc_study.best_params)
        print("Best score for HGBClassifier:", hgbc_study.best_value)

        return hgbc_study.best_params



In [None]:
train = pd.read_csv("train.csv")
test = pd.read_csv("test.csv")

In [None]:
FEATURES = ['Deck', 'Cabin_part', 'NUMBER_OF_PASSENGERS_PER_CABIN', 'NUMBER_OF_PASSENGERS_PER_GROUP', 'wasAlonePerGroup', 'wasAlonePerCabin', 'TRAVELLED_WITH_FAMILY', 'isMinor', 'Age_Cat',
       'Mean_Spending_On_RoomService_Pass', 'Mean_Spending_On_Spa_Pass',
       'Mean_Spending_On_FoodCourt_Pass', 'Mean_Spending_On_ShoppingMall_Pass',
       'Mean_Spending_On_VRDeck_Pass' ,'Mean_Spending_On_RoomService_Fam', 'Mean_Spending_On_Spa_Fam',
       'Mean_Spending_On_FoodCourt_Fam', 'Mean_Spending_On_ShoppingMall_Fam',
       'Mean_Spending_On_VRDeck_Fam','isCyroSleep', 'HomePlanet', 'isVIP', 'Destination', 'Destination_Count_By_HomePlanet']
TARGET = ['Transported']

In [None]:
feature_engineering = FeatureEngineering()
modelling = Modelling()

In [None]:
imputed_train = feature_engineering.missing_value_imputation(train, 5, ['Spa', 'RoomService', 'FoodCourt', 'ShoppingMall', 'VRDeck'])
processed_train = feature_engineering.feature_pipeline(imputed_train)

imputed_test = feature_engineering.missing_value_imputation(test, 5, ['Spa', 'RoomService', 'FoodCourt', 'ShoppingMall', 'VRDeck'])
processed_test = feature_engineering.feature_pipeline(imputed_test)

In [None]:
X_train, y_train = feature_engineering.data_split(processed_train, features = FEATURES, target = TARGET)

X_train = pd.get_dummies(X_train, dtype='int')
y_train = pd.get_dummies(y_train, dtype='int')
X_test = pd.get_dummies(processed_test[FEATURES], dtype='int')

In [None]:
xgb_best_optuna_params = modelling.tune_model('XGBClassifier', X_train=X_train, y_train=y_train, n_trials=300)
rfr_best_optuna_params = modelling.tune_model('RandomForestClassifier', X_train=X_train, y_train=y_train, n_trials=300)
lgbm_best_optuna_params = modelling.tune_model('LGBMClassifier', X_train=X_train, y_train=y_train, n_trials=300)
hgbc_best_optuna_params = modelling.tune_model('HistGradientBoostingClassifier', X_train=X_train, y_train=y_train, n_trials=300)

In [None]:
clf1 = RandomForestClassifier(**rfr_best_optuna_params)
clf2 = LGBMClassifier(**lgbm_best_optuna_params)
clf3 = XGBClassifier(**xgb_best_optuna_params)
clf4 = HistGradientBoostingClassifier(**hgbc_best_optuna_params)
eclf1 = VotingClassifier(estimators=[('rfr', clf1), ('lgb', clf2), ('xgb', clf3), ('hgbc', clf4)])
eclf1 = eclf1.fit(X_train, y_train)

In [None]:
test['Transported'] = modelling.predict_test_set(model = eclf1, X_test=X_test)
test['Transported'] = test['Transported'].apply(lambda x: True if x == 1 else False)
submission = test[['PassengerId', 'Transported']]
submission.to_csv("submission.csv", index=False)