In [6]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

class TitanicPredictor:
    """Implements probabilistic model to predict the survival of passengers."""
    def __init__(self, features):
        """Initializes the model with a list of features to use."""
        self.features = features
        self.probabilities = {}
        self.weights = {}
        self.threshold = 0.5

    @staticmethod
    def _print_evaluation_metrics(y_true, y_pred, dataset_name):
        """Prints evaluation metrics for a given dataset."""
        print(f"\n=== {dataset_name} metrics ===")
        print(f"Accuracy: {accuracy_score(y_true, y_pred):.4f}")

    def display_metrics(self, X_train, y_train, X_val, y_val):
        """Calculates and displays model accuracy on training and validation sets."""
        # Calculate and display metrics on training set
        train_predictions = self.predict(X_train)
        self._print_evaluation_metrics(y_train, train_predictions, "Training")
        
        # Calculate and display metrics on validation set
        val_predictions = self.predict(X_val)
        self._print_evaluation_metrics(y_val, val_predictions, "Validation")
    
    def _clean_data(self, data):
        """Fill the missing values in columns."""
        df = data.copy()
        # Note: We dont fill Cabin, because it is late used in IsCabin feature
        
        # Fill with the median age grouped by Pclass and Sex for more accuracy.
        df['Age'] = df.groupby(['Pclass', 'Sex'])['Age'].transform(lambda x: x.fillna(x.median()))
    
        # In case any group was entirely NaN, fill remaining with the global median
        if df['Age'].isnull().any():
            df['Age'] = df['Age'].fillna(df['Age'].median())
    
        # Fill missing values with the median.
        df['Fare'] = df['Fare'].fillna(df['Fare'].median())
    
        # Fill the missing values with the most common port.
        df['Embarked'] = df['Embarked'].fillna(df['Embarked'].mode()[0])
        return df

    def _apply_feature_engineering(self, data):
        """Add custom engineering features to improve predictive power of model."""
        df = data.copy()
        df['FamilySize'] = df['Parch'] + df['SibSp'] + 1
        df['HasCabin'] = df['Cabin'].notnull().astype(int)
        df['FarePerPerson'] = df['Fare'] / df['FamilySize']
        df['Title'] = df['Name'].str.extract(' ([A-Za-z]+)\.', expand = False)
        
        # Simplify titles into few different categories
        common_titles = ['Mr', 'Miss', 'Mrs', 'Master']
        df['Title'] = df['Title'].apply(lambda x: x if x in common_titles else 'Other')
        return df

    def _calculate_weights(self, data):
        """
        Calculates weights of certain feature based on the 
        survival rate difference between smallest and largest group.
        """
        diffs = {}
        for feature in self.features:
            grouped = data.groupby(feature)['Survived'].mean()
            diffs[feature] = grouped.max() - grouped.min()
        
        total_diff = sum(diffs.values()) or 1 # Avoid division by zero
        return {k: v / total_diff for k, v in diffs.items()}

    def _convert_probs_to_dict(self, data):
        """Convert each category probability weight to dictionary."""
        return {feature: data.groupby(feature)['Survived'].mean().to_dict() for feature in self.features}

    def _calculate_weighted_score(self, row):
        """Calculates combined survival chance for each passenger."""
        score = 0
        for feature in self.features:
            # Use 0.5 as a default probability if a category was not seen in the training data
            prob = self.probabilities.get(feature, {}).get(row[feature], 0.5)
            weight = self.weights.get(feature, 0)
            score += prob * weight
        return score

    def _find_best_threshold(self, X_val, y_val):
        """Finds optimal threshold for validation data."""
        X_val_copy = X_val.copy()
        X_val_copy['PredictedChance'] = X_val_copy.apply(self._calculate_weighted_score, axis=1)
            
        best_accuracy = 0
        best_threshold = 0.5
        for threshold in np.arange(0.1, 0.9, 0.01):
            predictions = (X_val_copy['PredictedChance'] >= threshold).astype(int)
            accuracy = accuracy_score(y_val, predictions)
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                best_threshold = threshold
        return best_threshold

    def train_model(self, X, y):
        """
        1. Splits data into training and validation sets.
        2. Learns weights and probabilities from the training set.
        3. Finds the optimal threshold on the validation set.
        """
        # Split data into train and validation sets
        X_train, X_val, y_train, y_val = train_test_split(X, y, test_size = 0.2, random_state = 1)
    
        # Combine features and target in training data
        train_data = pd.concat([X_train, y_train], axis = 1)
        
        # Find weights and optimal threshold
        self.weights = self._calculate_weights(train_data)
        self.probabilities = self._convert_probs_to_dict(train_data)
        self.threshold = self._find_best_threshold(X_val, y_val)

        self.display_metrics(X_train, y_train, X_val, y_val)

    def predict(self, X):
        """Gives final predictions for test data."""
        X_copy = X.copy()
        predicted_chances = X_copy.apply(self._calculate_weighted_score, axis = 1)
        return (predicted_chances >= self.threshold).astype(int)

    def run(self):
        # Load data
        train_df = pd.read_csv('/kaggle/input/titanic/train.csv')
        test_df = pd.read_csv('/kaggle/input/titanic/test.csv')

        # Separate features from the target variable
        X = train_df.drop('Survived', axis = 1)
        y = train_df['Survived']
        X_test = test_df

        # Clean data
        X = self._clean_data(X)
        X_test = self._clean_data(X_test)

        # Apply feature engineering
        X = self._apply_feature_engineering(X)
        X_test = self._apply_feature_engineering(X_test)

        # Train model
        self.train_model(X, y)

        # Make predictions on the test set
        predictions = self.predict(X_test)

        # Generate the submission file
        submission = pd.DataFrame({'PassengerId': test_df['PassengerId'], 'Survived': predictions})
        submission.to_csv('submission.csv', index = False)

# Main function
if __name__ == "__main__":
    # Define which categorical features we want to use in our model
    selected_features = ['Sex', 'Pclass', 'Title', 'FamilySize']
    
    # Create and run our model
    predictor = TitanicPredictor(selected_features)
    predictor.run()


=== Training metrics ===
Accuracy: 0.8202

=== Validation metrics ===
Accuracy: 0.7989
