# Setup & Imports

In [None]:
from matplotlib.pyplot import xlabel

%load_ext autoreload
%autoreload 2

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import polars as pl

from src.titanic.constants import PROJECT_ROOT

DATA_DIR = PROJECT_ROOT / "data"
RESULTS_DIR = PROJECT_ROOT / "results"

In [None]:
df_train = pl.read_csv(DATA_DIR / "train.csv")

In [None]:
df_train

# EDA (Exploratory Data Analysis)

In [None]:
def get_colormap(df: pl.DataFrame, column: str = "survival_mean", cmap: str = "cividis") -> list:
    norm = mcolors.Normalize(vmin=df[column].min(), vmax=df[column].max())
    cmap = plt.get_cmap(cmap)
    return [cmap(norm(surv_rate)) for surv_rate in df[column].to_list()]


def plot_barplot(
    df: pl.DataFrame,
    x: list,
    y: list,
    labels: list,
    title: str,
    xlabel: str,
    different_labels: list | None = None,
    fontsize: str = "large",
    title_fontsize: str = "xx-large",
    ylabel: str = "Survival Mean",
    figsize: tuple[int, int] = (12, 6),
    set_ylim: bool = True,
    **kwargs,
):
    fig, ax = plt.subplots(figsize=figsize)

    bars = ax.bar(x, y, label=labels, color=get_colormap(df, **kwargs))

    if set_ylim:
        ax.set_ylim(0, 1)

    for i, bar in enumerate(bars):
        plt.text(
            bar.get_x() + bar.get_width() / 2,
            bar.get_height(),
            f"{different_labels[i]:.2f}" if different_labels else f"{y[i]:.2f}",
            ha="center",
            va="bottom",
        )

    ax.set_ylabel(ylabel)
    ax.set_xlabel(xlabel)
    ax.legend(title=title, title_fontsize=title_fontsize, fontsize=fontsize)

    return ax

#### Pclass

In [None]:
pclass_vs_survival_mean = (
    df_train.group_by(pl.col("Pclass"))
    .all()
    .with_columns(pl.col("Survived").list.mean().alias("survival_mean"))
    .select("Pclass", "survival_mean")
    .sort(by="Pclass", descending=False)
)

pclass = pclass_vs_survival_mean["Pclass"].to_list()
survivability = pclass_vs_survival_mean["survival_mean"].to_list()
pclass_bar_labels = pclass

ax = plot_barplot(
    df=pclass_vs_survival_mean,
    x=pclass,
    y=survivability,
    labels=pclass_bar_labels,
    title="Survival Rate across different Ticket Classes",
    xlabel="Ticket Class",
    fontsize="xx-large",
)

ax.set_xticks(pclass)

plt.show()

In [None]:
pclass_prevalence = df_train.get_column("Pclass").value_counts().sort(by="Pclass", descending=False)

colors = get_colormap(pclass_prevalence, column="count")

fig, ax = plt.subplots(figsize=(12, 6))

ax.bar(
    pclass_prevalence["Pclass"].cast(pl.String),
    pclass_prevalence["count"],
    label=pclass_bar_labels,
    color=colors,
)
ax.set_xlabel("Pclass")
ax.set_ylabel("Prevalence")
ax.legend(title="Pclass Distribution", title_fontsize="xx-large", fontsize="xx-large")

plt.show()

In [None]:
"""Map Pclass values: 3 -> 1, 2 -> 2, and 1 -> 3 so that they are consistent with the increase in survivability rates."""

#### Name

In [None]:
import matplotlib.cm as cm
import matplotlib.colors as mcolors

prefix_vs_survival_mean = (
    df_train.with_columns(
        pl.col("Name").str.split(by=".").list.get(0).str.split(by=" ").list.get(-1).alias("prefix")
    )
    .group_by(pl.col("prefix"))
    .all()
    .with_columns(
        pl.col("prefix"),
        pl.col("Survived").list.mean().alias("survival_mean"),
    )
    .select("prefix", "survival_mean")
    .sort(by="survival_mean", descending=False)
)

prefix_prevalence = (
    df_train.with_columns(
        pl.col("Name").str.split(by=".").list.get(0).str.split(by=" ").list.get(-1).alias("prefix")
    ).select(pl.col("prefix").value_counts())
).unnest("prefix")

prefixes = prefix_vs_survival_mean["prefix"].to_list()
survivability = prefix_vs_survival_mean["survival_mean"].to_list()
prevalence = [
    prefix_prevalence.filter(pl.col("prefix") == prefix).get_column("count").first()
    for prefix in prefixes
]
prefix_bar_labels = [prefix for prefix, surv_rate in zip(prefixes, survivability)]

ax = plot_barplot(
    df=prefix_vs_survival_mean,
    x=prefixes,
    y=prevalence,
    labels=prefix_bar_labels,
    different_labels=survivability,
    title="Survival Rate across different Prefixes",
    ylabel="Prevalence",
    xlabel="Prefix",
    title_fontsize="x-large",
    fontsize="medium",
    figsize=(15, 6),
    set_ylim=False,
)

plt.show()

In [None]:
"""Group rare prefixes (prevalence < 3) and then use one hot encoding to encode them with the rest of them."""

#### Sex

In [None]:
sex_vs_survival_mean = (
    df_train.group_by(pl.col("Sex"))
    .all()
    .with_columns(pl.col("Survived").list.mean().alias("survival_mean"))
    .select("Sex", "survival_mean")
    .sort(by="survival_mean", descending=True)
)

genders = sex_vs_survival_mean["Sex"].to_list()
survivability = sex_vs_survival_mean["survival_mean"].to_list()
sex_bar_labels = genders

ax = plot_barplot(
    df=sex_vs_survival_mean,
    x=genders,
    y=survivability,
    labels=sex_bar_labels,
    title="Survival Rate across different Genders",
    xlabel="Gender",
    fontsize="xx-large",
)

plt.show()

In [None]:
sex_prevalence = df_train.get_column("Sex").value_counts().sort(by="Sex")

colors = get_colormap(sex_prevalence, column="count")

fig, ax = plt.subplots(figsize=(12, 6))

ax.bar(sex_prevalence["Sex"], sex_prevalence["count"], label=sex_bar_labels, color=colors)
ax.set_xlabel("Sex")
ax.set_ylabel("Prevalence")
ax.legend(title="Sex Distribution", title_fontsize="xx-large", fontsize="xx-large")

plt.show()

In [None]:
"""Encode gender in a binary column where female = 1 and male = 0 to maintain the meaningful difference in survival rates."""

In [None]:
jump = 10
age_grouping = {f"{i*jump}-{i*jump+jump}": i for i in range(int(df_train["Age"].max() + 1) // jump)}


def map_age(age):
    for grp in age_grouping.keys():
        upper = float(grp.split("-")[-1])
        if float(age) <= upper:
            return age_grouping[grp]


age_group_vs_survival_mean = (
    df_train.with_columns(
        pl.col("Age").map_elements(map_age, return_dtype=pl.Int8).alias("age_group")
    )
    .group_by(pl.col("age_group"))
    .all()
    .with_columns(pl.col("Survived").list.mean().alias("survival_mean"))
    .select(pl.col("age_group"), pl.col("survival_mean"))
    .sort(by="age_group", descending=False)
)

age_groups = age_group_vs_survival_mean["age_group"].to_list()
age_groups[0] = -1
survivability = age_group_vs_survival_mean["survival_mean"].to_list()
age_bar_labels = ["Unknown"] + [grp for grp in age_grouping.keys()]

ax = plot_barplot(
    df=age_group_vs_survival_mean,
    x=age_groups,
    y=survivability,
    labels=age_bar_labels,
    title="Survival Rate across different Age Groups",
    xlabel="Age Group",
    fontsize="medium",
)

ax.set_xticks(age_groups)
ax.set_xticklabels(age_bar_labels)

plt.show()

In [None]:
age_group_prevalence = (
    df_train.with_columns(
        pl.col("Age").map_elements(map_age, return_dtype=pl.Int8).alias("age_group")
    )
    .get_column("age_group")
    .value_counts()
    .sort(by="age_group", descending=False)
)

colors = get_colormap(age_group_prevalence, column="count")

fig, ax = plt.subplots(figsize=(12, 6))

ax.bar(
    age_group_prevalence["age_group"],
    age_group_prevalence["count"],
    label=age_bar_labels,
    color=colors,
)
ax.set_xlabel("Age Group")
ax.set_ylabel("Prevalence")
ax.set_xticks([age_group + 1 for age_group in age_groups])
ax.set_xticklabels(age_bar_labels)
ax.legend(title="Age Distribution", title_fontsize="xx-large", fontsize="x-large")

plt.show()

In [None]:
"""Split into age categories (every 10 years), fill the nulls with median value for age."""

In [None]:
sibsp_vs_survival_mean = (
    df_train.group_by(pl.col("SibSp"))
    .all()
    .with_columns(pl.col("Survived").list.mean().alias("survival_mean"))
    .select(pl.col("SibSp"), pl.col("survival_mean"))
    .sort(by="SibSp", descending=False)
)

sibsp_sizes = sibsp_vs_survival_mean["SibSp"].to_list()
survivability = sibsp_vs_survival_mean["survival_mean"].to_list()
sibsp_bar_labels = sibsp_sizes

ax = plot_barplot(
    df=sibsp_vs_survival_mean,
    x=sibsp_sizes,
    y=survivability,
    labels=sibsp_bar_labels,
    title="Survival Rate across different SibSp values",
    xlabel="SibSp",
    fontsize="x-large",
)

ax.set_xticks(sibsp_sizes)

plt.show()

In [None]:
sibsp_prevalence = df_train.get_column("SibSp").value_counts().sort(by="SibSp", descending=False)

colors = get_colormap(sibsp_prevalence, column="count")

fig, ax = plt.subplots(figsize=(12, 6))

ax.bar(
    sibsp_prevalence["SibSp"].cast(pl.String),
    sibsp_prevalence["count"],
    label=sibsp_bar_labels,
    color=colors,
)
ax.set_xlabel("SibSp")
ax.set_ylabel("Prevalence")
ax.legend(title="Pclass Distribution", title_fontsize="xx-large", fontsize="xx-large")

plt.show()

In [None]:
parch_vs_survival_mean = (
    df_train.group_by(pl.col("Parch"))
    .all()
    .with_columns(pl.col("Survived").list.mean().alias("survival_mean"))
    .select(pl.col("Parch"), pl.col("survival_mean"))
    .sort(by="Parch", descending=False)
)

parch_sizes = parch_vs_survival_mean["Parch"].to_list()
survivability = parch_vs_survival_mean["survival_mean"].to_list()
parch_bar_labels = parch_sizes

ax = plot_barplot(
    df=parch_vs_survival_mean,
    x=parch_sizes,
    y=survivability,
    labels=parch_bar_labels,
    title="Survival Rate across different Parch values",
    xlabel="Parch",
)

ax.set_xticks(parch_sizes)

plt.show()

In [None]:
parch_prevalence = df_train.get_column("Parch").value_counts().sort(by="Parch", descending=False)

ax = parch_prevalence.to_pandas().plot(
    kind="bar",
    x="Parch",
    y="count",
    figsize=(12, 6),
    title="Parch Distribution",
    legend=False,
    rot=0,
    colormap="cividis",
)

plt.show()

#### Family Size

In [None]:
family_size_vs_survival_mean = (
    df_train.with_columns((pl.col("SibSp") + pl.col("Parch")).alias("family_size"))
    .group_by(pl.col("family_size"))
    .all()
    .with_columns(pl.col("Survived").list.mean().alias("survival_mean"))
    .select(pl.col("family_size"), pl.col("survival_mean"))
    .sort(by="family_size", descending=False)
)

family_sizes = family_size_vs_survival_mean["family_size"].to_list()
survivability = family_size_vs_survival_mean["survival_mean"].to_list()
bar_labels = family_sizes

ax = plot_barplot(
    df=family_size_vs_survival_mean,
    x=family_sizes,
    y=survivability,
    labels=bar_labels,
    title="Survival Rate across different Family Sizes",
    xlabel="Family Size",
)

ax.set_xticks(family_sizes)

plt.show()

In [None]:
family_size_prevalence = (
    df_train.
)

In [None]:
x = df_train.to_pandas()

In [None]:
df_train.null_count()

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin


class AttributeTransformer(BaseEstimator, TransformerMixin):
    def __init__(
        self,
        drop_id: bool = True,
        transform_cabin: bool = True,
        transform_family: bool = True,
        extract_prefix: bool = True,
        extract_ticket_class: bool = True,
    ):
        self.drop_id = drop_id
        self.transform_cabin = transform_cabin
        self.transform_family = transform_family
        self.extract_prefix = extract_prefix
        self.extract_ticket_class = extract_ticket_class

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        if self.transform_cabin:
            X["Cabin"] = X["Cabin"].apply(lambda x: x[0] if x else "0")
        if self.transform_family:
            X["FamilySize"] = X["SibSp"] + X["Parch"] + 1
            X.drop(["SibSp", "Parch"], axis=1, inplace=True)
        if self.extract_prefix:
            X["Prefix"] = X["Name"].apply(lambda x: x.split()[1].rstrip("."))
            X.drop("Name", axis=1, inplace=True)
        if self.extract_ticket_class:
            X["Ticket"] = X["Ticket"].apply(lambda x: x[0])
            X.drop("Ticket", axis=1, inplace=True)
        if self.drop_id:
            X.drop("PassengerId", axis=1, inplace=True)
        return np.c_[X]

In [None]:
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder

cat_features = ["Pclass", "Sex", "SibSp", "Parch", "Cabin", "Name", "PassengerId", "Ticket"]

cat_pipeline = Pipeline(
    [
        ("attribs_transformer", AttributeTransformer()),
        ("one_hot_encoder", OneHotEncoder(sparse_output=False, handle_unknown="ignore")),
        ("imputer", SimpleImputer(strategy="most_frequent")),
    ]
)

In [None]:
from sklearn.preprocessing import StandardScaler

num_features = ["Age", "Fare"]

num_pipeline = Pipeline(
    [
        ("imputer", SimpleImputer(strategy="mean")),
        ("standard_scaler", StandardScaler()),
    ]
)

In [None]:
embarked_feature = ["Embarked"]

embark_pipeline = Pipeline(
    [
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("one_hot_encoder", OneHotEncoder(sparse_output=False, handle_unknown="ignore")),
    ]
)

In [None]:
from sklearn.compose import ColumnTransformer

full_pipeline = ColumnTransformer(
    [
        ("embarked", embark_pipeline, embarked_feature),
        ("num", num_pipeline, num_features),
        ("cat", cat_pipeline, cat_features),
    ],
    remainder="passthrough",
)

In [None]:
from sklearn.model_selection import train_test_split

random_state = 42

X_train, X_test, y_train, y_test = train_test_split(
    X.to_pandas(), y.to_pandas(), test_size=0.2, random_state=random_state
)

In [None]:
X_val, X_test, y_val, y_test = train_test_split(
    X_test, y_test, test_size=0.5, random_state=random_state
)

In [None]:
fitted_pipeline = full_pipeline.fit(X_train)

X_train_transformed = fitted_pipeline.transform(X_train)
X_val_transformed = fitted_pipeline.transform(X_val)
X_test_transformed = fitted_pipeline.transform(X_test)

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

rf_param_grid = {
    "n_estimators": [10, 13, 15, 20],
    "criterion": ["gini", "entropy"],
    "max_depth": [10, 15, 20, 30, 50],
    "min_samples_split": [2, 3, 4, 5, 10],
    "min_samples_leaf": [1, 2, 3, 5],
    "max_features": ["sqrt", "log2"],
}

rf_grid_search = GridSearchCV(
    estimator=RandomForestClassifier(),
    param_grid=rf_param_grid,
    scoring="accuracy",
    n_jobs=-1,
    cv=20,
    verbose=1,
).fit(X_train_transformed, y_train)

In [None]:
print(rf_grid_search.best_score_)
print(rf_grid_search.best_params_)

In [None]:
from sklearn.model_selection import cross_val_score

rf_clf = rf_grid_search.best_estimator_

rf_scores = cross_val_score(rf_clf, X_val_transformed, y_val, cv=10, scoring="accuracy")

print(rf_scores.mean())

In [None]:
from sklearn.metrics import accuracy_score

rf_preds = rf_clf.predict(X_test_transformed)
rf_score_test = accuracy_score(rf_preds, y_test)
print(f"Random Forest score on test set: {rf_score_test:.3f}")

In [None]:
from sklearn.linear_model import LogisticRegression

lr_param_grid = {
    "penalty": ["l1", "l2", "elasticnet"],
    "tol": [0.001, 0.01, 0.1, 1],
    "C": [0.1, 1, 10, 20, 30, 40, 50, 100],
    "solver": ["liblinear", "sag", "saga"],
    "max_iter": [1000, 1500, 2000],
}

lr_grid_search = GridSearchCV(
    estimator=LogisticRegression(),
    param_grid=lr_param_grid,
    scoring="accuracy",
    n_jobs=-1,
    cv=20,
    verbose=1,
).fit(X_train_transformed, y_train)

In [None]:
print(lr_grid_search.best_score_)
print(lr_grid_search.best_params_)

In [None]:
lr_clf = lr_grid_search.best_estimator_

lr_scores = cross_val_score(lr_clf, X_val_transformed, y_val, cv=10, scoring="accuracy")

print(lr_scores.mean())

In [None]:
lr_preds = lr_clf.predict(X_test_transformed)
lr_score_test = accuracy_score(lr_preds, y_test)
print(f"Logistic Regression score on test set: {lr_score_test:.3f}")

In [None]:
from sklearn.svm import SVC

svc_param_grid = {
    "C": [10, 20, 30],
    "kernel": ["linear", "rbf"],
    "degree": [1, 2],
    "gamma": [0.001, 0.1, 1, 3, 5],
    "tol": [0.1, 1, 3, 5],
}

svc_grid_search = GridSearchCV(
    estimator=SVC(),
    param_grid=svc_param_grid,
    scoring="accuracy",
    n_jobs=-1,
    cv=20,
    verbose=1,
).fit(X_train_transformed, y_train)

In [None]:
print(svc_grid_search.best_score_)
print(svc_grid_search.best_params_)

In [None]:
svc_clf = svc_grid_search.best_estimator_

svc_scores = cross_val_score(svc_clf, X_val_transformed, y_val, cv=10, scoring="accuracy")

print(svc_scores.mean())

In [None]:
svc_preds = svc_clf.predict(X_test_transformed)
svc_score_test = accuracy_score(svc_preds, y_test)
print(f"SVC score on test set: {svc_score_test:.3f}")

In [None]:
from sklearn.ensemble import VotingClassifier

vote_clf = VotingClassifier(
    estimators=[
        ("random_forest", rf_clf),
        ("logistic_regression", lr_clf),
        ("svc", svc_clf),
    ],
    voting="hard",
).fit(X_train_transformed, y_train)

In [None]:
vote_scores = cross_val_score(vote_clf, X_val_transformed, y_val, cv=10, scoring="accuracy")

print(vote_scores.mean())

In [None]:
vote_preds = vote_clf.predict(X_test_transformed)
vote_score_test = accuracy_score(vote_preds, y_test)
print(f"SVC score on test set: {vote_score_test:.3f}")

In [None]:
X_submit = fitted_pipeline.transform(df_test.to_pandas())

In [None]:
import os

RESULTS_DIR = CHAPTER_ROOT / "results"

if not os.path.isdir(RESULTS_DIR):
    os.makedirs(RESULTS_DIR)

In [None]:
passenger_ids = pl.DataFrame({"PassengerId": df_test.get_column("PassengerId")})

In [None]:
rf_preds = [i.item() for i in rf_clf.predict(X_submit)]

rf_preds = pl.DataFrame({"Survived": rf_preds})

rf_preds_df = pl.concat(
    [passenger_ids, rf_preds],
    how="horizontal",
)

rf_preds_df.write_csv(RESULTS_DIR / "rf_preds.csv")

In [None]:
lr_preds = [i.item() for i in lr_clf.predict(X_submit)]

lr_preds = pl.DataFrame({"Survived": lr_preds})

lr_preds_df = pl.concat(
    [passenger_ids, lr_preds],
    how="horizontal",
)

lr_preds_df.write_csv(RESULTS_DIR / "lr_preds.csv")

In [None]:
svc_preds = [i.item() for i in svc_clf.predict(X_submit)]

svc_preds = pl.DataFrame({"Survived": svc_preds})

svc_preds_df = pl.concat(
    [passenger_ids, svc_preds],
    how="horizontal",
)

svc_preds_df.write_csv(RESULTS_DIR / "svc_preds.csv")

In [None]:
vote_preds = [i.item() for i in vote_clf.predict(X_submit)]

vote_preds = pl.DataFrame({"Survived": vote_preds})

vote_preds_df = pl.concat(
    [passenger_ids, vote_preds],
    how="horizontal",
)

vote_preds_df.write_csv(RESULTS_DIR / "vote_preds.csv")