# What I learned deploying the same pipeline with Skleran, PySpark and Vaex
The main idea is to put to the test a complicated, real-world data science problem and solution.    
We want to simulate real-world work.

The first step is a "quick & dirty" POC where we figure stuff out, build features, model, and evaluate.    
The second step is a pipeline for the solution in production.

Pipeline steps:    

 * Cleaned *Cabin* values that have illegal values are unnecessary, but it is crucial to take cleaning data into account, as it significantly affects the pipeline.    
 * Calculating *FamilySize = Parch + SibSp + 1* (self)
 * Get the Initials from the name, and map them to either "Mr", "Miss", "Mrs" and "Other".
 * Calculate the mean *Age* for each Initial and use it to fill missing values for *Age*.
 * Create *AgeGroup* for each male/female and under/over the age of 15.
 * Bin *FamilySize* to the [0,1, 2, 5, 7, 100,1000] bins.
 * Encode *Embarked, Sex, FamilyBin, AgeGroup* with a label/one-hot encoder.
 * Use [LightGBM](https://lightgbm.readthedocs.io/en/latest/) (or Random Forest for PySpark) for modelling.
 * Add the survived/died probability in a consumable way.
 
**Important notes**
* Any calculated values should be done only the train data.
* Quick-dirty things like "dropna" are used in POC, but not in the pipeline.
* POC should still be correct.
* Never filter data in production.
 
 
 
* [Inpiried heavily  by this notebook](https://www.kaggle.com/bombatkarvivek/pyspark-ml-pipeline-with-titanic-dataset-eda).   

# Pandas + Sklearn: Standard data science

## POC
This is a POC stage, works on my laptop...  fast and dirty.
* This will not work on big data.   

In [7]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OrdinalEncoder
from lightgbm.sklearn import LGBMClassifier
from sklearn.metrics import accuracy_score
from icecream import ic 
import warnings
warnings.filterwarnings('ignore')


df = pd.read_csv('../datasets/titanic.csv')

numeric_cols = ['PassengerId','Survived', 'Pclass', 'Age', 'SibSp','Parch','Ticket','Fare'] 
numeric_features = ['PassengerId','Pclass','Age', 'SibSp','Parch','Fare'] 
string_features = [ 'Embarked', 'Sex', 'FamilyBin', 'AgeGroup'] 
features = numeric_features


df = df[df['Cabin'].str.contains(' ') != True]
df['FamilySize'] = df['Parch'] + df['SibSp'] + 1
df['Initial'] = df['Name'].str.extract(r'([A-Za-z]+)\.')

initials_map = {k:v for k,v in (zip(['Miss','Mr','Mrs','Mlle','Mme','Ms','Dr',
                                               'Major','Lady','Countess',
                                               'Jonkheer','Col','Rev',
                                               'Capt','Sir','Don'],
                                             ['Miss','Mr','Mrs','Miss','Miss','Miss',
                                              'Mr','Mr','Mrs','Mrs',
                                              'Other','Other','Other',
                                              'Mr','Mr','Mr']))}
df['Initial'] = df['Initial'].map(initials_map)
train, test = train_test_split(df)

means = train.groupby(['Initial'])['Age'].mean().to_dict() # this should be with train
for initial, value in means.items():
    df['Age'] = np.where((df['Age'].isnull()) & (df['Initial'].str.match(initial)),value, df['Age'])
    
df['AgeGroup'] = None
df.loc[((df['Sex'] == 'male') & (df['Age'] <= 15)), 'AgeGroup'] = 'boy'
df.loc[((df['Sex'] == 'female') & (df['Age'] <= 15)), 'AgeGroup'] = 'girl'
df.loc[((df['Sex'] == 'male') & (df['Age'] > 15)), 'AgeGroup'] = 'adult male'
df.loc[((df['Sex'] == 'female') & (df['Age'] > 15)), 'AgeGroup'] = 'adult female'

df['FamilyBin'] = pd.cut(df['FamilySize'], [0,1, 2, 5, 7, 100,1000])
df['FamilyBin'] = df['FamilyBin'].astype(str)


train = df.loc[train.index].dropna(subset=string_features)
test = df.loc[test.index].dropna(subset=string_features)

encoders = {column: OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=np.nan,
                                     ).fit(train[column].values.reshape(-1,1)) for column in string_features}

for column, encoder in encoders.items():
    string_column = f"le_{column}"
    train[string_column] = encoder.transform(train[column].values.reshape(-1,1)).reshape(-1)
    test[string_column] = encoder.transform(test[column].values.reshape(-1,1)).reshape(-1)
    features.append(string_column)
    
df.head(2)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,FamilySize,Initial,AgeGroup,FamilyBin
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S,2,Mr,adult male,"(1, 2]"
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C,2,Mrs,adult female,"(1, 2]"


In [8]:
# modelling
model = LGBMClassifier()
X = train[features]
y = train[target]
model.fit(X, y)
        
print(f"accuracy: {accuracy_score(test[target],model.predict(test[features]))}")

accuracy: 0.8101851851851852


## How NOT to go into production
If you want to "copy-paste" this code - once for training, once with adjustments for inference, you are welcome to try...    
In reality, a non-pipeline solution will crash in many unpredicted places - missing values, new values (for categorical features, for example), illegal values, for example.

Implementing your model as code also means implementing a server technology like [FastAPI](https://fastapi.tiangolo.com/) (highly recommended), probably Docker, and master logging, error handling, deployments elements (environment variables, set-up a lambda docker vs Kubernetes, and timezones are to name a few) as part of the "data-science" side.

It also means managing the pipeline's steps, feature-engineering, models parameters, features in server-side code, probably using docker image versioning, which forces every "data-science" change into a deployment cycle on the containers and server sides, as the code you save of the model must fit the code in inference time.

If you let a DevOps/backend guys do it, you get much friction that hinders development; if you let the data scientist do it, you should give her a rise!

### Not convinced? - here is how you would do it

In [90]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OrdinalEncoder
from lightgbm.sklearn import LGBMClassifier
from sklearn.metrics import accuracy_score
from icecream import ic 
import cloudpickle
import pickle
import warnings
warnings.filterwarnings('ignore')


class CustomModel():
    
    def __init__(self):
        self.model = None
        self.encoders = {}
        self.means = {}
        # this can be parameters too or read from a config file...
        self.target = 'Survived'
        self.numeric_cols = ['PassengerId','Survived', 'Pclass', 'Age', 'SibSp','Parch','Ticket','Fare'] 
        self.numeric_features = ['PassengerId','Pclass','Age', 'SibSp','Parch','Fare'] 
        self.string_features = [ 'Embarked', 'Sex', 'FamilyBin', 'AgeGroup'] 
        self.features = self.numeric_features.copy()
        self.initials_map = {k:v for k,v in (zip(['Miss','Mr','Mrs','Mlle','Mme','Ms','Dr',
                                                       'Major','Lady','Countess',
                                                       'Jonkheer','Col','Rev',
                                                       'Capt','Sir','Don'],
                                                     ['Miss','Mr','Mrs','Miss','Miss','Miss',
                                                      'Mr','Mr','Mrs','Mrs',
                                                      'Other','Other','Other',
                                                      'Mr','Mr','Mr']))}
        self.accuracy = None        
    
    @staticmethod
    def _get_means(df, gb='Initial', column='Age'):
        return df.groupby([gb])[column].mean().to_dict()
        
    def _encode_categoricals(self, df, stage='fit'):
        if stage=='fit':
            self.features = self.numeric_features.copy()
            self.encoders = {column: OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=np.nan,
                                         ).fit(df[column].fillna('').values.reshape(-1,1)) for column in self.string_features}
        for column, encoder in self.encoders.items():
            new_column = f"le_{column}"
            df[new_column] = encoder.transform(df[column].fillna('').values.reshape(-1,1)).reshape(-1)
            if new_column not in self.features:
                self.features.append(new_column)
        return df
    
    def _preprocess(self, df, stage='fit'):
        if stage=='fit':
            df = df[df['Cabin'].str.contains(' ') != True] # not do this in inference
        df['FamilySize'] = df['Parch'] + df['SibSp'] + 1
        df['Initial'] = df['Name'].str.extract(r'([A-Za-z]+)\.')
        df['Initial'] = df['Initial'].map(self.initials_map)
        if stage=='fit':            
            self.means = self._get_means(df)
        for initial, value in self.means.items():
            df['Age'] = np.where((df['Age'].isnull()) & (df['Initial'].str.match(initial)),value, df['Age'])
                    
        df['AgeGroup'] = None
        df.loc[((df['Sex'] == 'male') & (df['Age'] <= 15)), 'AgeGroup'] = 'boy'
        df.loc[((df['Sex'] == 'female') & (df['Age'] <= 15)), 'AgeGroup'] = 'girl'
        df.loc[((df['Sex'] == 'male') & (df['Age'] > 15)), 'AgeGroup'] = 'adult male'
        df.loc[((df['Sex'] == 'female') & (df['Age'] > 15)), 'AgeGroup'] = 'adult female'

        df['FamilyBin'] = pd.cut(df['FamilySize'], [0,1, 2, 5, 7, 100,1000])
        df['FamilyBin'] = df['FamilyBin'].astype(str)

        df = self._encode_categoricals(df, stage=stage) 
        return df
    
    def _fit(self, df):
        copy = self._preprocess(df, 'fit')
        self.model = LGBMClassifier()
        self.model.fit(copy[self.features], copy[self.target])
        return self


    def fit(self, path):
        df = pd.read_csv(path)                
        train, test = train_test_split(df)        
        self._fit(train)
        test = self.inference(test)
        self.accuracy = accuracy_score(test[self.target], test['prediction'])
        self._fit(df)
        return self
    
    def inference(self, df):
        df = self.infer(df)
        copy = self._preprocess(df, 'inference')
        copy['prediction'] = self.model.predict(copy[self.features])
        copy['probabilities'] = [{'died':p[0],'survived':p[1]} for p in self.model.predict_proba(copy[self.features])]
        copy['label'] = copy['prediction'].map({1:'survived',0:'died'})
        return copy
    
    @classmethod
    def infer(cls, df):
        if isinstance(df, pd.DataFrame):
            return df
        if isinstance(df, dict):
            return pd.DataFrame.from_records([df])
        return pd.DataFrame(df)
    
    def save(self, path):
        with open(path, 'wb') as outfile:
            outfile.write(cloudpickle.dumps(self))
            
    @classmethod
    def load(cls, path):
        return pickle.loads(open(path,'rb').read())

model = CustomModel().fit('../datasets/titanic.csv')
model.save('../models/code.pkl')
print(f"Accuracy: {model.accuracy}")

Accuracy: 0.7982062780269058


In [91]:
data = {"PassengerId": 91, 
        "Survived": 0, "Pclass": 3, 
        "Name": "Christmann, Mr. Emil", 
        "Sex": "male", 
        "Age": 29.0, 
        "SibSp": 0, 
        "Parch": 0, 
        "Ticket": "343276", "Fare": 8.05, 
        "Cabin": "A B", # this make sure we don't filter
        "Embarked": "S"}
MyModel.load('../models/code.pkl').inference(data)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,...,Initial,AgeGroup,FamilyBin,le_Embarked,le_Sex,le_FamilyBin,le_AgeGroup,prediction,probabilities,label
0,91,0,3,"Christmann, Mr. Emil",male,29.0,0,0,343276,8.05,...,Mr,adult male,"(0, 1]",3.0,1.0,0.0,2.0,0,"{'died': 0.9736046999325613, 'survived': 0.026...",died


## Ok, show me a pipeline way

### Sklearn pipeline for deployment
This is data engineering, the non-ops side of it.
* This won't work on big data unless you use a big-ass machine (which is often a good solution).
* This pipeline would work on a properly designed server, fitting/inference any pipeline or problem.
* It is essential to adjust the pipeline not to filter data in inference time.
* A standard sklearn pipeline cannot filter the data (because of the 'y' side) - we go around it by using only *fit* and *transform* without *predict*, and managing the target column in a custom transformer (LGBMTransformer)

In [95]:
import numpy as np
import pandas as pd
import json
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import OrdinalEncoder
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.model_selection import train_test_split
from lightgbm.sklearn import LGBMClassifier
from sklearn.metrics import accuracy_score
from joblib import dump
import warnings
warnings.filterwarnings('ignore')

df = pd.read_csv('../datasets/titanic.csv')
train, test = train_test_split(df)

target = 'Survived'
fetures = list(train.columns)
fetures.remove(target)


dizip_initials = {k:v for k,v in (zip(['Mlle','Mme','Ms','Dr',
                                               'Major','Lady','Countess',
                                               'Jonkheer','Col','Rev',
                                               'Capt','Sir','Don'],
                                             ['Miss','Miss','Miss',
                                              'Mr','Mr','Mrs','Mrs',
                                              'Other','Other','Other',
                                              'Mr','Mr','Mr']))}

class PandasTransformer(TransformerMixin, BaseEstimator):
    
     def fit(self, X, y=None, **fit_params):
        return self


class DropSome(PandasTransformer):
    
    def __init__(self, column):
        self.column = column
    
    def transform(self, df, **transform_params):
        return df[df[self.column].str.contains(' ')!=True]
    
    
class FamilySizeTransformer(PandasTransformer):
    def __init__(self, columns):
        self.columns = columns
        
    def transform(self, df, **transform_params):
        df['FamilySize'] = 1
        for column in self.columns:
            df['FamilySize'] = df['FamilySize']+df[column]
        return df

class InitialsTransformer(PandasTransformer):
    def __init__(self, column):
        self.column = column
        self.initials_map = {k:v for k,v in (zip(['Miss','Mr','Mrs','Mlle','Mme','Ms','Dr',
                                               'Major','Lady','Countess',
                                               'Jonkheer','Col','Rev',
                                               'Capt','Sir','Don'],
                                             ['Miss','Mr','Mrs','Miss','Miss','Miss',
                                              'Mr','Mr','Mrs','Mrs',
                                              'Other','Other','Other',
                                              'Mr','Mr','Mr']))}
        
    def transform(self, df, **transform_params):
        df['Initial'] = df[self.column].str.extract(r'([A-Za-z]+)\.')        
        df['Initial'] = df['Initial'].map(self.initials_map)
        return df   
    

class AgeImputer(PandasTransformer):
    def __init__(self, column):
        self.column = column
        self.means = {}
    
    def fit(self, X, y=None, **fit_params):
        self.means = X.groupby(['Initial'])['Age'].mean().round().astype(int).to_dict() 
        return self


    def transform(self, df, **transform_params):
        for initial, value in self.means.items():
            df['Age'] = np.where((df['Age'].isnull()) & (df['Initial'].str.match(initial)),value, df['Age'])
        return df   
    
class AgeGroupTransformer(PandasTransformer):
    def __init__(self, column):
        self.column = column
    

    def transform(self, df, **transform_params):
        df['AgeGroup'] = None
        df.loc[((df['Sex'] == 'male') & (df['Age'] <= 15)), 'AgeGroup'] = 'boy'
        df.loc[((df['Sex'] == 'female') & (df['Age'] <= 15)), 'AgeGroup'] = 'girl'
        df.loc[((df['Sex'] == 'male') & (df['Age'] > 15)), 'AgeGroup'] = 'adult male'
        df.loc[((df['Sex'] == 'female') & (df['Age'] > 15)), 'AgeGroup'] = 'adult female'
        return df
  
class BinTransformer(PandasTransformer):
    def __init__(self, column,bins=None):
        self.column = column
        self.bins = bins or [0,1, 2, 5, 7, 100,1000]
    

    def transform(self, df, **transform_params):
        df['FamilyBin'] = pd.cut(df[self.column], self.bins).astype(str)
        return df


class MultiColumnLabelEncoder(PandasTransformer):

    def __init__(self, columns = None, prefix='le_', fillna_value=''):
        self.columns = columns 
        self.encoders = {}
        self.prefix = prefix
        self.fillna_value = fillna_value
        
    def _add_prefix(self, col):
        return f"{self.prefix}{col}"
    
    def preprocess_series(self, s):
        return s.fillna(self.fillna_value).values.reshape(-1,1)
        
    def encode(self, column, X):
        return self.encoders[column].transform(self.preprocess_series(X[column])).reshape(-1)
        
    def fit(self,X, y=None):
        for column in self.columns:
            le = OrdinalEncoder(handle_unknown='use_encoded_value',
                                unknown_value=-1)
            self.encoders[column] = le
            le.fit(self.preprocess_series(X[column]))
        return self 

    def transform(self, X):
        output = X.copy()
        if self.columns is not None:
            for column in self.columns:
                output[self._add_prefix(column)] = self.encode(column, X)
        return output

        
class FeatureSelector(PandasTransformer):

    def __init__(self, columns):
        self.columns = columns
    

    def transform(self, df, **transform_params):        
        return df[self.columns]

class LGBMTransformer(PandasTransformer):

    def __init__(self, target, features, output_column='prediction', **params):
        self.features = features
        self.params = params
        self.model = None
        self.target = target
        self.output_column = output_column
        
    def fit(self,X, y):
        self.model = LGBMClassifier(**self.params).fit(X[self.features], X[self.target])
        return self
    
    def predict(self, X):
        if self.model is None:
            raise RuntimeError("Model is not trained")
        return self.model.predict(X[self.features])

    def transform(self, df, **transform_params):        
        if self.model is None:
            raise RuntimeError("Model is not trained")
        missing_features = [feature for feature in self.features if feature not in df]
        if len(missing_features)>0:
            raise RuntimeError(f"Features missing: {missing_features}")
        
        df['prediction'] = self.model.predict(df[self.features])
        probabilities = self.model.predict_proba(df[self.features])        
        df['probabilities'] = [{'died':p[0],'survived':p[1]} for p in probabilities]
        df['label'] = df['prediction'].map({1:'survived',0:'died'})
        return df
    

class CleaningTransformer(PandasTransformer):   
    def __init__(self, column):
        self.column = column

    def transform(self, df, **transform_params):        
        return df[df[self.column].str.contains(' ')!=True]
    
    
pipeline = Pipeline([
    ('cleaning',CleaningTransformer('Cabin')),
    ('FamilySizeTransformer', FamilySizeTransformer(['Parch','SibSp'])),
    ('InitialsTransformer', InitialsTransformer('Name')),
    ('AgeImputer', AgeImputer('Age')),
    ('AgeGroupTransformer', AgeGroupTransformer('Age')),
    ('BinTransformer', BinTransformer('FamilySize')),
    ('MultiColumnLabelEncoder', MultiColumnLabelEncoder(columns=['Embarked', 'Sex', 'FamilyBin'])),
    ('model', LGBMTransformer(target='Survived', features=['PassengerId','Pclass', 'Age', 'SibSp', 
                                        'Parch', 'Fare', 'le_Embarked','le_Sex', 'le_FamilyBin'],verbose=-1)),
    ])


# train
target = 'Survived'
fetures = list(train.columns)
fetures.remove(target)

X = train[fetures]
y = train['Survived']


pipeline = pipeline.fit(train)
pipeline.steps = pipeline.steps[1:] # IMPORTANT - remove the filtering for inference
print(f"Accuracy: {accuracy_score(test[target], pipeline.predict(test))}")

# train on the entire dataset
pipeline = pipeline.fit(df)

# save a version for retrain when you have more data
import cloudpickle # standard pickle and joblib won't work with custom transformers
with open('../models/sklearn_fit.pkl', 'wb') as outfile:
    outfile.write(cloudpickle.dumps(pipeline))
    
# save a version for inference
pipeline.steps = pipeline.steps[1:] # IMPORTANT - remove the filtering for inference
with open('../models/sklearn_inference.pkl', 'wb') as outfile:
    outfile.write(cloudpickle.dumps(pipeline))


print('')
print(f"{json.dumps(test.head(1).to_dict(orient='records')[0])}")

Accuracy: 0.8071748878923767

{"PassengerId": 566, "Survived": 0, "Pclass": 3, "Name": "Davies, Mr. Alfred J", "Sex": "male", "Age": 24.0, "SibSp": 2, "Parch": 0, "Ticket": "A/4 48871", "Fare": 24.15, "Cabin": NaN, "Embarked": "S", "FamilySize": 3, "Initial": "Mr", "AgeGroup": "adult male", "FamilyBin": "(2, 5]"}


* We would want to save the two pipelines - one for training with cleaning, one without for predictions.

### Server
In this form, a single server would work for every valid pipeline, changes of data and models should not effect the "backend" and "DevOps" sides.

In [83]:
# server
import pickle # here standard pickle does work (:
import pandas as pd
pipeline = pickle.loads(open('../models/sklearn_inference.pkl','rb').read())

data = {"PassengerId": 91, 
        "Survived": 0, "Pclass": 3, 
        "Name": "Christmann, Mr. Emil", 
        "Sex": "male", 
        "Age": 29.0, 
        "SibSp": 0, 
        "Parch": 0, 
        "Ticket": "343276", "Fare": 8.05, 
        "Cabin": "A B", # this make sure we don't filter
        "Embarked": "S"}
data = pd.DataFrame.from_records([data])
pd.DataFrame(pipeline.transform(data))

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,...,FamilySize,Initial,AgeGroup,FamilyBin,le_Embarked,le_Sex,le_FamilyBin,prediction,probabilities,label
0,91,0,3,"Christmann, Mr. Emil",male,29.0,0,0,343276,8.05,...,1,Mr,adult male,"(0, 1]",3.0,1.0,0.0,0,"{'died': 0.9228505083109125, 'survived': 0.077...",died


# Vaex
The POC stage is practically the same
* This works with big data

In [1]:
import sys
sys.path.append("..")
from lakeml.vaex import Pipeline

## POC 

In [10]:
import vaex
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from lightgbm.sklearn import LGBMClassifier
from vaex.ml.lightgbm import LightGBMModel
from sklearn.metrics import accuracy_score
from vaex.ml import LabelEncoder
from icecream import ic 
from lakeml.vaex import Pipeline

import pyarrow as pa
import re


df = vaex.open('../datasets/titanic.csv')
train, test = df.split_random([0.8, 0.2])

numeric_cols = ['PassengerId','Survived', 'Pclass', 'Age', 'SibSp','Parch','Ticket','Fare'] 
numeric_features = ['PassengerId','Pclass','Age', 'SibSp','Parch','Fare'] 
string_features = [ 'Embarked', 'Sex', 'FamilyBin'] 
features = numeric_features

train = train[train['Cabin'].str.contains(' ') != True]
train['FamilySize'] = train['Parch'] + train['SibSp'] + 1
train['Name'] = train['Name'].fillna('Mr.')
pattern = re.compile('([A-Za-z]+)\.')

train['Initial'] = train['Name'].str.extract_regex(r'(?P<initial>[A-Za-z]+)\.').apply(lambda x: x.get('initial','Other'))

initials_map = {k:v for k,v in (zip(['Other','Miss','Mr','Mrs','Master','Mlle','Mme','Ms','Dr',
                                               'Major','Lady','Countess',
                                               'Jonkheer','Col','Rev',
                                               'Capt','Sir','Don'],
                                             ['Other','Miss','Mr','Mrs','Mrs','Miss','Miss','Miss',
                                              'Mr','Mr','Mrs','Mrs',
                                              'Other','Other','Other',
                                              'Mr','Mr','Mr']))}
train['Initial'] = train['Initial'].map(initials_map)

gb = train.groupby(['Initial']).agg({'value':vaex.agg.mean('Age')})
means = {k:v for k,v in zip(gb['Initial'].tolist(), gb['value'].tolist())}

for initial, value in means.items():    
    train['Age'] = train.func.where((train.Age.isna() & train.Initial.str.match(initial)), value, train.Age)

train['AgeGroup'] = train.func.where(((train.Sex.str.match('male')) & (train.Age<=15)), 'boy', '')
train['AgeGroup'] = train.func.where(((train.Sex.str.match('female')) & (train.Age <= 15)), 'girl', train.AgeGroup)
train['AgeGroup'] = train.func.where(((train.Sex.str.match('male')) & (train.Age > 15)), 'adult male', train.AgeGroup)
train['AgeGroup'] = train.func.where(((train.Sex.str.match('female')) & (train.Age > 15)), 'adult female', train.AgeGroup)
train['FamilyBin'] = train['FamilySize'].digitize(bins= [0,1, 2, 5, 7, 100,1000])


string_features = [ 'Embarked', 'Sex', 'FamilyBin', 'AgeGroup'] 
encoder = LabelEncoder(features=string_features, prefix='le_', allow_unseen=True)
train = encoder.fit_transform(train)

features = ['PassengerId','Pclass','Age', 'SibSp','Parch','Fare'] +[f"{encoder.prefix}{column}" for column in string_features]
target = 'Survived'
model = LightGBMModel(features=features, 
                        target=target,                         
                        prediction_name='lgm_predictions', 
                        num_boost_round=500,params={'verbose': -1,
                                                   'application':'binary'})
model.fit(train)
train = model.transform(train)
train['prediction'] = train.func.where(train['lgm_predictions'] > 0.5, 1,0)
train['target_label'] = train.func.where(train['lgm_predictions'] > 0.5, 'survived','died')
pipeline = Pipeline.from_dataframe(train)

predictions = pipeline.inference(test, fillna=False)['prediction']
print(f"accuracy: {accuracy_score(test[target].values, predictions.values)}")

accuracy: 0.8202247191011236


## Build Pipeline
Oops... one line

In [7]:
Pipeline.from_dataframe(train).save('../models/pipeline.pkl')

'../models/pipeline.pkl'

In [13]:
# server pipeline
from lakeml.vaex.pipeline import Pipeline
pipeline = Pipeline.from_file('../models/pipeline.pkl')

data = {"PassengerId": 91, 
        "Survived": 0, "Pclass": 3, 
        "Name": "Christmann, Mr. Emil", 
        "Sex": "male", 
        "Age": 29.0, 
        "SibSp": 0, 
        "Parch": 0, 
        "Ticket": "343276", "Fare": 8.05, 
        "Cabin": "A B", # this make sure we don't filter
        "Embarked": "S"}
pipeline.inference(data, clean=True, set_filter=False)

#,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,FamilySize,Initial,AgeGroup,FamilyBin,le_Embarked,le_Sex,le_FamilyBin,le_AgeGroup,lgm_predictions,prediction,target_label
0,91,0,3,"Christmann, Mr. Emil",male,29,0,0,343276,8.05,A B,S,1,Mr,adult male,2,3,1,0,1,0.000656524,0,died


### For retraining, copy-paste does work here...

In [10]:
import vaex
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from lightgbm.sklearn import LGBMClassifier
from vaex.ml.lightgbm import LightGBMModel
from sklearn.metrics import accuracy_score
from vaex.ml import LabelEncoder
from icecream import ic 
from lakeml.vaex import Pipeline

import pyarrow as pa
import re


df = vaex.open('../datasets/titanic.csv')


def fit(train):
    numeric_cols = ['PassengerId','Survived', 'Pclass', 'Age', 'SibSp','Parch','Ticket','Fare'] 
    numeric_features = ['PassengerId','Pclass','Age', 'SibSp','Parch','Fare'] 
    string_features = [ 'Embarked', 'Sex', 'FamilyBin'] 
    features = numeric_features

    train = train[train['Cabin'].str.contains(' ') != True]
    train['FamilySize'] = train['Parch'] + train['SibSp'] + 1
    train['Name'] = train['Name'].fillna('Mr.')
    pattern = re.compile('([A-Za-z]+)\.')

    train['Initial'] = train['Name'].str.extract_regex(r'(?P<initial>[A-Za-z]+)\.').apply(lambda x: x.get('initial','Other'))

    initials_map = {k:v for k,v in (zip(['Other','Miss','Mr','Mrs','Master','Mlle','Mme','Ms','Dr',
                                                   'Major','Lady','Countess',
                                                   'Jonkheer','Col','Rev',
                                                   'Capt','Sir','Don'],
                                                 ['Other','Miss','Mr','Mrs','Mrs','Miss','Miss','Miss',
                                                  'Mr','Mr','Mrs','Mrs',
                                                  'Other','Other','Other',
                                                  'Mr','Mr','Mr']))}
    train['Initial'] = train['Initial'].map(initials_map)

    gb = train.groupby(['Initial']).agg({'value':vaex.agg.mean('Age')})
    means = {k:v for k,v in zip(gb['Initial'].tolist(), gb['value'].tolist())}

    for initial, value in means.items():    
        train['Age'] = train.func.where((train.Age.isna() & train.Initial.str.match(initial)), value, train.Age)

    train['AgeGroup'] = train.func.where(((train.Sex.str.match('male')) & (train.Age<=15)), 'boy', '')
    train['AgeGroup'] = train.func.where(((train.Sex.str.match('female')) & (train.Age <= 15)), 'girl', train.AgeGroup)
    train['AgeGroup'] = train.func.where(((train.Sex.str.match('male')) & (train.Age > 15)), 'adult male', train.AgeGroup)
    train['AgeGroup'] = train.func.where(((train.Sex.str.match('female')) & (train.Age > 15)), 'adult female', train.AgeGroup)
    train['FamilyBin'] = train['FamilySize'].digitize(bins= [0,1, 2, 5, 7, 100,1000])


    string_features = [ 'Embarked', 'Sex', 'FamilyBin', 'AgeGroup'] 
    encoder = LabelEncoder(features=string_features, prefix='le_', allow_unseen=True)
    train = encoder.fit_transform(train)

    features = ['PassengerId','Pclass','Age', 'SibSp','Parch','Fare'] +[f"{encoder.prefix}{column}" for column in string_features]
    target = 'Survived'
    model = LightGBMModel(features=features, 
                            target=target,                         
                            prediction_name='lgm_predictions', 
                            num_boost_round=500,params={'verbose': -1,
                                                       'application':'binary'})
    model.fit(train)
    train = model.transform(train)
    train['prediction'] = train.func.where(train['lgm_predictions'] > 0.5, 1,0)
    train['target_label'] = train.func.where(train['lgm_predictions'] > 0.5, 'survived','died')
    return train

pipeline = Pipeline.from_dataframe(df, fit=fit)
train, test = df.split_random([0.8, 0.2])
pipeline.fit(train)

predictions = pipeline.inference(test, fillna=False)['prediction']
print(f"accuracy: {accuracy_score(test['Survived'].values, predictions.values)}")

pipeline.fit(df) # fit on the enitre dataset
pipeline.save('../models/pipeline.pkl')

ERROR:lakeml.vaex.pipeline:could not sample first: Column or variable '__Age' does not exist. Did you mean: 'Age'


accuracy: 0.8595505617977528


'../models/pipeline.pkl'

# PySpark

## POC with Koalas + PySpark
POC is similar, plus a workaround here and there...    
* This work on big data with a well-calibrated cluster/platform
* I have maintained most of the original non-pythonic syntax consistent with the PySpark docs and common examples.

In [5]:
# setup
import os

os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/"
os.environ['PYTHONPATH'] = "/usr/local/Cellar/apache-spark/2.4.5/libexec//python/lib/py4j-0.10.7-src.zip:/usr/local/Cellar/apache-spark/2.4.5/libexec//python/:"
os.environ['JRE_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/"
os.environ['SPARK_HOME'] = "/usr/local/Cellar/apache-spark/2.4.5/libexec/"
os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = "1"
os.environ['PYARROW_IGNORE_TIMEZONE'] = "1"
import findspark

findspark.init()

# real code
import pandas as pd
import numpy as np
from time import sleep
import databricks.koalas as ks
from databricks.koalas import DataFrame as KoalasFrame
from pyspark.ml.feature import Imputer, StringIndexer, VectorIndexer, VectorAssembler, OneHotEncoderEstimator, PCA, Bucketizer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline as SparkPipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import accuracy_score
import pyspark.sql.functions as F
import warnings
import re
warnings.filterwarnings('ignore')


In [6]:
# IMPORTANT - Only run the next cell when the one before has finished -> this prevent crashes
df = ks.read_csv('../datasets/titanic.csv')

numeric_cols = ['PassengerId','Survived', 'Pclass', 'Age', 'SibSp','Parch','Ticket','Fare'] 
numeric_features = ['PassengerId','Pclass','Age', 'SibSp','Parch','Fare'] 
string_features = [ 'Embarked', 'Sex', 'FamilyBin'] 
features = numeric_features


df = df[df['Cabin'].str.contains(' ') != True]
df['FamilySize'] = df['Parch'] + df['SibSp'] + 1

sdf = df.to_spark()

# df['Initial'] = df['Name'].str.extract(r'([A-Za-z]+)\.') # not implemented
# Workaround
def evaluate_initials(sdf):
    dizip_initials = {k:v for k,v in (zip(['Miss','Mr','Mrs','Master','Mlle','Mme','Ms','Dr',
                                               'Major','Lady','Countess',
                                               'Jonkheer','Col','Rev',
                                               'Capt','Sir','Don'],
                                             ['Miss','Mr','Mrs','Mrs','Miss','Miss','Miss',
                                              'Mr','Mr','Mrs','Mrs',
                                              'Other','Other','Other',
                                              'Mr','Mr','Mr']))}
    _sdf = sdf.withColumn('Initial',  F.regexp_extract( sdf['Name'], ('([A-Za-z]+)\.'),1 ) )
    _sdf = _sdf.replace(dizip_initials,1,'Initial')
    return _sdf

sdf = evaluate_initials(sdf)

train, test = sdf.randomSplit([0.8, 0.2])

def get_means(df):# pyspark bug -> workaround
    df = KoalasFrame(df)
    means = df.groupby('Initial').agg({'Age': 'mean'})
    index = list(means.index.values) # koalas bug with index
    means = means.toPandas()
    means.index = index
    means = means['Age'].to_dict()
    return means

means = get_means(train)

# Koahlas bugs -> workaround with pyspark
# for initial, value in means.items(): 
#     df.loc[((df['Age'].isnull()) & (df['Initial']==initial)), 'Age'] = value

def handle_missing_age(sdf, means):
    _sdf = sdf
    _sdf = _sdf.withColumn('Age', 
           F.when((F.isnull(_sdf['Age'])) & (_sdf['Initial'] == 'Mr') , means.get('Mr') )
            .otherwise(F.when((F.isnull(_sdf['Age'])) 
                              & (_sdf['Initial'] == 'Mrs') ,  means.get('Mrs') )\
            .otherwise(F.when((F.isnull(_sdf['Age'])) 
                              & (_sdf['Initial'] == 'Master') , means.get('Master'))\
            .otherwise(F.when((F.isnull(_sdf['Age'])) 
                              & (_sdf['Initial'] == 'Miss') , means.get('Miss'))\
            .otherwise(F.when((F.isnull(_sdf['Age'])) 
                              & (_sdf['Initial'] == 'Other') , means.get('Other'))\
            .otherwise(_sdf['Age']) )))))
    return _sdf
train = handle_missing_age(train, means)
test = handle_missing_age(test, means)

def age_group(df):
    df = KoalasFrame(df)
    df['AgeGroup'] = None
    df.loc[((df['Sex'] == 'male') & (df['Age'] <= 15)), 'AgeGroup'] = 'boy'
    df.loc[((df['Sex'] == 'female') & (df['Age'] <= 15)), 'AgeGroup'] = 'girl'
    df.loc[((df['Sex'] == 'male') & (df['Age'] > 15)), 'AgeGroup'] = 'adult male'
    df.loc[((df['Sex'] == 'female') & (df['Age'] > 15)), 'AgeGroup'] = 'adult female'
    return df

train = age_group(train)
test = age_group(test)

# move to PySpark for ML oriented transformations
train = train.to_spark()
test = test.to_spark()
train.persist()
test.persist()


numeric_cols = ['PassengerId','Survived', 'Pclass', 'Age', 'SibSp','Parch','Ticket','Fare'] 
numeric_features = ['PassengerId','Pclass','Age', 'SibSp','Parch','Fare'] 
string_features = [ 'Embarked', 'Sex'] 


stages = []
string_indexer =  [StringIndexer(inputCol = column , \
                                 outputCol = column + '_StringIndexer', 
                                 handleInvalid = "skip") for column in string_features]

one_hot_encoder = [OneHotEncoderEstimator(
    inputCols = [column + '_StringIndexer' for column in string_features ], \
    outputCols =  [column + '_OneHotEncoderEstimator' for column in string_features ])]

vect_indexer = [VectorIndexer(
    inputCol = column + '_OneHotEncoderEstimator',
    outputCol = column + '_VectorIndexer', 
    maxCategories=10) for column in string_features]

familt_size_splits = [1, 2, 5, 7, 100] 
bucketizer = Bucketizer(splits = familt_size_splits, 
                        inputCol = 'FamilySize',
                        outputCol = 'bucketized_FamilySize')

numeric_features += ['bucketized_FamilySize']

assemblerInput =  [f  for f in numeric_features]  
assemblerInput += [f + "_VectorIndexer" for f in string_features]
vector_assembler = VectorAssembler(inputCols = assemblerInput, \
                                   outputCol = 'VectorAssembler_features')

rf = RandomForestClassifier(labelCol = 'Survived', 
                            featuresCol = 'VectorAssembler_features', 
                            numTrees = 100, 
                            maxDepth = 4, 
                            maxBins = 1000)

stages += string_indexer
stages += one_hot_encoder
stages += vect_indexer
stages += [bucketizer]
stages += [vector_assembler]
stages += [rf]

# Train
pipeline = SparkPipeline(stages = stages).fit(train) 
predictions = KoalasFrame(pipeline.transform(test))
results = predictions[['Survived','prediction']].toPandas()


# Evalauation
# PySpark is broken here so we use Sklearn
# evaluator = MulticlassClassificationEvaluator(
#     labelCol="Survived", predictionCol="prediction", metricName="accuracy")
# accuracy = evaluator.evaluate(predictions)

from sklearn.metrics import accuracy_score
print(f"accuracy: {accuracy_score(results['Survived'], results['prediction'])}")

accuracy: 0.7967032967032966


## PySpark pipeline for production

In [2]:
# setup
import os

os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/"
os.environ['PYTHONPATH'] = "/usr/local/Cellar/apache-spark/2.4.5/libexec//python/lib/py4j-0.10.7-src.zip:/usr/local/Cellar/apache-spark/2.4.5/libexec//python/:"
os.environ['JRE_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/"
os.environ['SPARK_HOME'] = "/usr/local/Cellar/apache-spark/2.4.5/libexec/"
os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = "1"
os.environ['PYARROW_IGNORE_TIMEZONE'] = "1"
import findspark

findspark.init()

import databricks.koalas as ks
import pyspark.sql.functions as F
from pyspark import keyword_only
from pyspark.ml import Transformer as SparkTransformer,PipelineModel
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import lit # for the dummy _transform
from databricks.koalas import DataFrame as KoalasFrame
from pyspark.ml.feature import Imputer, StringIndexer, VectorIndexer, VectorAssembler, OneHotEncoderEstimator, PCA, Bucketizer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline as SparkPipeline

from pyspark.sql import SparkSession, SQLContext, DataFrame
import warnings
warnings.filterwarnings('ignore')

spark = SparkSession.builder.appName("Titanic-Dataset").config('spark.driver.memory','15g').getOrCreate()

df = spark.read.csv('../datasets/titanic.csv', inferSchema = True, header = True)

train, test = df.randomSplit([8.0, 2.0])


numeric_cols = ['PassengerId','Survived', 'Pclass', 'Age', 'SibSp','Parch','Ticket','Fare'] 
numeric_features = ['PassengerId','Pclass','Age', 'SibSp','Parch','Fare'] 
string_features = [ 'Embarked', 'Sex'] 



class DropSome(SparkTransformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    
    def __init__(self):
        super(DropSome, self).__init__()
    

    def _transform(self, df) -> DataFrame:        
        return df.where(~F.col("Cabin").contains(' '))
    

# This is Data engineering -> It is so it will work in production
class CleaningTransformer(SparkTransformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
    
    def __init__(self):
        super(CleaningTransformer, self).__init__()
    


    def _transform(self, df) -> DataFrame:
        return self._clean_dataset ( 
            self._handle_missing_age(
            self._evaluate_initials(
            self._create_family_size(train)
            )) 
            ,['Ticket','SibSp','Parch'],['Fare'] 
        )
    @staticmethod
    def _clean_dataset(sdf: DataFrame, col_to_convert: list, col_to_impute: list) -> DataFrame:
        for col in col_to_convert:
            sdf = sdf.withColumn(col,sdf[col].cast('double'))
        col_to_impute += col_to_convert

        imputer = Imputer(inputCols = col_to_impute, outputCols = col_to_impute)
        sdf = imputer.fit(sdf).transform(sdf)

        return sdf
    
    @staticmethod
    def _handle_missing_age(sdf: DataFrame) -> DataFrame:
        # this value should be calcualted in a better world
        _sdf = sdf
        _sdf = _sdf.withColumn('Age', 
               F.when((F.isnull(_sdf['Age'])) & (_sdf['Initial'] == 'Mr') , 33 )
                .otherwise(F.when((F.isnull(_sdf['Age'])) 
                                  & (_sdf['Initial'] == 'Mrs') , 36)\
                .otherwise(F.when((F.isnull(_sdf['Age'])) 
                                  & (_sdf['Initial'] == 'Master') , 5)\
                .otherwise(F.when((F.isnull(_sdf['Age'])) 
                                  & (_sdf['Initial'] == 'Miss') , 22)\
                .otherwise(F.when((F.isnull(_sdf['Age'])) 
                                  & (_sdf['Initial'] == 'Other') , 46)\
                .otherwise(_sdf['Age']) )))))
        return _sdf
    
    @staticmethod
    def _evaluate_initials(sdf: DataFrame) -> DataFrame:
        dizip_initials = {k:v for k,v in (zip(['Mlle','Mme','Ms','Dr',
                                               'Major','Lady','Countess',
                                               'Jonkheer','Col','Rev',
                                               'Capt','Sir','Don'],
                                             ['Miss','Miss','Miss',
                                              'Mr','Mr','Mrs','Mrs',
                                              'Other','Other','Other',
                                              'Mr','Mr','Mr']))}
        _sdf = sdf.withColumn('Initial',  F.regexp_extract( sdf['Name'], ('([A-Za-z]+)\.'),1 ) )
        _sdf = _sdf.replace(dizip_initials,1,'Initial')
        return _sdf
    
    @staticmethod
    def _create_family_size(sdf: DataFrame) -> DataFrame :
        _sdf = sdf.withColumn('FamilySize', sdf['Parch'] + sdf['SibSp'] + 1 )

        return _sdf
    

class AgeGroupTransformer(SparkTransformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):

    def __init__(self):
        super(AgeGroupTransformer, self).__init__()

    def _transform(self, df) -> DataFrame:
        df = KoalasFrame(df)
        df['AgeGroup'] = None
        df.loc[((df['Sex'] == 'male') & (df['Age'] <= 15)), 'AgeGroup'] = 'boy'
        df.loc[((df['Sex'] == 'female') & (df['Age'] <= 15)), 'AgeGroup'] = 'girl'
        df.loc[((df['Sex'] == 'male') & (df['Age'] > 15)), 'AgeGroup'] = 'adult male'
        df.loc[((df['Sex'] == 'female') & (df['Age'] > 15)), 'AgeGroup'] = 'adult female'
        return df.to_spark()
    
# Feature engineering
_stages = []

drop_some = [DropSome()] # Clearning step -> not to run in inference
clean_transformer = [CleaningTransformer()]
age_group_transformer = [AgeGroupTransformer()]
string_indexer =  [StringIndexer(inputCol = column , \
                                 outputCol = column + '_StringIndexer', 
                                 handleInvalid = "skip") for column in string_features]

one_hot_encoder = [OneHotEncoderEstimator(
    inputCols = [column + '_StringIndexer' for column in string_features ], \
    outputCols =  [column + '_OneHotEncoderEstimator' for column in string_features ])]

vect_indexer = [VectorIndexer(
    inputCol = column + '_OneHotEncoderEstimator',
    outputCol = column + '_VectorIndexer', 
    maxCategories=10) for column in string_features]

familt_size_splits = [1, 2, 5, 7, 100] 
bucketizer = Bucketizer(splits = familt_size_splits, 
                        inputCol = 'FamilySize',
                        outputCol = 'bucketized_FamilySize')

numeric_features += ['bucketized_FamilySize']

assemblerInput =  [f  for f in numeric_features]  
assemblerInput += [f + "_VectorIndexer" for f in string_features]
vector_assembler = VectorAssembler(inputCols = assemblerInput, \
                                   outputCol = 'VectorAssembler_features')

rf = RandomForestClassifier(labelCol = 'Survived', 
                            featuresCol = 'VectorAssembler_features', 
                            numTrees = 100, 
                            maxDepth = 4, 
                            maxBins = 1000)

_stages += drop_some
_stages += clean_transformer
_stages += age_group_transformer
_stages += string_indexer
_stages += one_hot_encoder
_stages += vect_indexer
_stages += [bucketizer]
_stages += [vector_assembler]
_stages += [rf]

# Train
pipeline = SparkPipeline(stages = _stages).fit(train) 

# IMPORTANT to not filter in production 
pipeline.stages = pipeline.stages[1:]

!rm -rf ../models/spark_pipeline.sp
pipeline.save('../models/spark_pipeline.sp')

In [4]:
# server
from pyspark.ml import PipelineModel
ks.DataFrame(PipelineModel.load('../models/spark_pipeline.sp').transform(test)).head(2)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,FamilySize,Initial,AgeGroup,Embarked_StringIndexer,Sex_StringIndexer,Embarked_OneHotEncoderEstimator,Sex_OneHotEncoderEstimator,Embarked_VectorIndexer,Sex_VectorIndexer,bucketized_FamilySize,VectorAssembler_features,rawPrediction,probability,prediction
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1.0,0.0,240761.11215,7.25,,S,2,Mr,adult male,0.0,0.0,"(1.0, 0.0)",(1.0),"(1.0, 0.0)",(1.0),1.0,"[1.0, 3.0, 22.0, 1.0, 0.0, 7.25, 1.0, 1.0, 0.0...","[81.39486731357621, 18.605132686423755]","[0.8139486731357624, 0.1860513268642376]",0.0
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1.0,0.0,240761.11215,71.2833,C85,C,2,Mrs,adult female,1.0,1.0,"(0.0, 1.0)",(0.0),"(0.0, 1.0)",(0.0),1.0,"[2.0, 1.0, 38.0, 1.0, 0.0, 71.2833, 1.0, 0.0, ...","[11.994029202166837, 88.00597079783314]","[0.1199402920216684, 0.8800597079783316]",1.0
