# Let's solve te titanic as if it was a real-world problem

> we want to make it diverse enough such we can enusre we won't get stack the moment we'll use daft on real-world data 

### Pipeline steps
* Cleaned cabin values that have illegal values are unnecessary, but it is crucial to take cleaning data into account, as it significantly affects the pipeline.
* Calculating family_size = parch + sibsp + 1 (self)
*  Get the Initials from the name, and map them to either "Mr", "Miss", "Mrs" and "Other".
* Calculate the mean age for each Initial and use it to fill missing values for age.
* Create age_group for each male/female and under/over the age of 15.
* Bin family_size to the [0, 1, 2, 5, 7, 100,1000] bins.
* Encode embarked, sex, family_bin, age_group with a label/one-hot encoder.
* Use LightGBM for modelling.
* Add the survived/died probability in a consumable way.

### Context for applying prediction:
* [MLFlow](https://mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#pyfunc-create-custom)
* [Ray-serve](https://docs.ray.io/en/latest/serve/getting_started.html)
* [Cog](https://github.com/replicate/cog)
* [FastAPI](http://fastapi.tiangolo.com)

In [24]:
import warnings

import numpy as np
import pandas as pd
from lightgbm.sklearn import LGBMClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OrdinalEncoder

warnings.filterwarnings('ignore')

df = pd.read_csv('~/development/datasets/titanic.csv')
target = 'survived'
numeric_features = ['pclass', 'age', 'sibsp', 'parch', 'fare']
string_features = ['embarked', 'sex', 'family_bin', 'age_group']
features = numeric_features
df = df[df['cabin'].str.contains(' ') != True]
df['family_size'] = df['parch'] + df['sibsp'] + 1
df['initial'] = df['name'].str.extract(r'([A-Za-z]+)\.')
initials_map = {k: v for k, v in (zip(['Miss', 'Mr', 'Mrs', 'Mlle', 'Mme', 'Ms', 'Dr',
                                       'Major', 'Lady', 'Countess',
                                       'Jonkheer', 'Col', 'Rev',
                                       'Capt', 'Sir', 'Don'],
                                      ['Miss', 'Mr', 'Mrs', 'Miss', 'Miss', 'Miss',
                                       'Mr', 'Mr', 'Mrs', 'Mrs',
                                       'Other', 'Other', 'Other',
                                       'Mr', 'Mr', 'Mr']))}
df['initial'] = df['initial'].map(initials_map)

train, test = train_test_split(df)
means = train.groupby(['initial'])['age'].mean().to_dict()  # this should be with train

for initial, value in means.items():
    df['age'] = np.where((df['age'].isnull()) & (df['initial'].str.match(initial)), value, df['age'])

df['age_group'] = None
df.loc[((df['sex'] == 'male') & (df['age'] <= 15)), 'age_group'] = 'boy'
df.loc[((df['sex'] == 'female') & (df['age'] <= 15)), 'age_group'] = 'girl'
df.loc[((df['sex'] == 'male') & (df['age'] > 15)), 'age_group'] = 'adult male'
df.loc[((df['sex'] == 'female') & (df['age'] > 15)), 'age_group'] = 'adult female'
df['family_bin'] = pd.cut(df['family_size'], [0, 1, 2, 5, 7, 100, 1000])
df['family_bin'] = df['family_bin'].astype(str)

train = df.loc[train.index].dropna(subset=string_features)
test = df.loc[test.index].dropna(subset=string_features)

encoders = {column: OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=np.nan,
                                   ).fit(train[column].values.reshape(-1, 1)) for column in string_features}

for column, encoder in encoders.items():
    string_column = f"le_{column}"
    train[string_column] = encoder.transform(train[column].values.reshape(-1, 1)).reshape(-1)
    test[string_column] = encoder.transform(test[column].values.reshape(-1, 1)).reshape(-1)
    features.append(string_column)

df.head(2)

Unnamed: 0,pclass,survived,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,boat,body,home_dest,family_size,initial,age_group,family_bin
0,1,0,"Allen, Miss. Elisabeth Walton",female,29.0,0,0,24160,211.3375,B5,S,2,,"St Louis, MO",1,Miss,adult female,"(0, 1]"
5,1,0,"Anderson, Mr. Harry",male,48.0,0,0,19952,26.55,E12,S,3,,"New York, NY",1,Mr,adult male,"(0, 1]"


In [28]:
# modelling
model = LGBMClassifier()
X = train[features]
y = train[target]
model.fit(X, y)

print(f"Accuracy: {accuracy_score(test[target], model.predict(test[features]))}")

Accuracy: 0.8296529968454258


# Daft approaches

## Model class (pytorch lightning style)
Seems like the first approach to try in terms of daft design.    
First you clean, explore and write the functions during exploration and modelling. 
Secondly, you make a *Model* class to deploy including the steps.

In [None]:
class Model():

    classifier: LightGBMClassifier(params)
            
    
    def clean_cabin(self):
        ...
    
    def calculate_family_size(self):
        ...
    
    @polars_udf(return_type=str) 
    def clean_initlas(self):        
        ...
    
    def _pre_predict(self, data):
        """feature engineering before predictions"""
        ...
    
    @polars_udf(return_type=int)
    def predict(self, data):        
        return self.classifier.predict(self._pre_predict(data))
            
    
    def fit(self, df):        
        ...
        self.classifier = self.classifier.fit(self._pre_fit(df))
        
    def transform(self):
        ...
        
    def predict(self, df):
        ...
    

## A static approach
Simpler, and closer to POC, but less deploymnet friendly, as we need to save artifacts and code and manage both.

In [None]:
from daft import polars_udf
import polars as pl


@polars_udf(return_type=float)
class LightGBM:

    def __init__(self, classifier=None):        
        self.classifier = classifier or _load_classifier()
        
    def _load_classifier():
        pass
        
    def __call__(self, a_data: pl.Series, b_data: pl.Series):
        return np.matmul(self.model, np.array([a_data.to_numpy(), b_data.to_numpy()]))
    

@polars_udf(return_type=datetime.date)
def clean_cabin(data):
    ...
    
@polars_udf(return_type=datetime.date)
def calculate_family_size(data):
    ...

train, test = df.ml.random_split() # or something similar

def fit(df) # where does this code lives?

    df = df.with_column(...) # clean
    df = df.with_column(...) # calcualte size
    model.fit(df)
    return model

model = fit(train) # artifact

def predict(test, model): # where is this code managed?
    
    df = df.with_column(...) # calcualte size and other feature-engineering without removing droppong rows 
    return model.predict(test)


## A state approach
I consider it as a balanced approach. It relies on lazy evaluation on new data which daft don't have yet.   
[PR Reference](https://github.com/Eventual-Inc/Daft/pull/496)

In [None]:
from daft import polars_udf
import polars as pl


@polars_udf(return_type=float)
class LightGBM:

    def __init__(self, classifier=None):
        # Initialize and cache an "expensive" model between invocations of the UDF
        self.classifier = classifier or _load_classifier()
        
    def _load_classifier():
        pass
        
    def __call__(self, a_data: pl.Series, b_data: pl.Series):
        return np.matmul(self.model, np.array([a_data.to_numpy(), b_data.to_numpy()]))
    

@polars_udf(return_type=datetime.date)
def clean_cabin(data):
    ...
    
@polars_udf(return_type=datetime.date)
def calculate_family_size(data):
    ...

train, test = df.ml.random_split() # or something similar

train = train.with_column(...) # clean
train = train.with_column(...) # calcualte size
... # feature engineering
train = train.with_column('prediction', LightGBM(df[features]))

state = State.from_dataframe(train)

state.inference(test)

## Sklearn approach
* Not sure exactly how to apply this - but this is the most standard solution today.

In [None]:
class Cleaner(df: daft.DataFrame): 
    
    def fit():
        ...
        
    def transform():
        ...

class FamilySize():
    
    def fit():
        ...
        
    def transform():
        ...
        
pipeline = sklearn.pipeline.Pipline([Cleaner, FamilySize,...,LightGBMClassifier])
pipeline.fit(train)
pipeline.predict(test)

# [Unpublished notebook article](https://github.com/xdssio/goldilox/blob/master/notebooks/sklearn_vs_vaex_vs_pyspark.ipynb) 

Here you find a reference how it would look like in sklearn, pandas, Vaex(which is similar to the PR and state idea) and spark