# Titanic dataset Training Example 

### Running the notebook end-to-end
* Setting ```skip_test_code``` variable to ```True``` skips unnecessary code execution,
  enabling execution of the notebook in one go.

In [None]:
skip_test_code = True

## Prepare the Docker container image that will run pipeline steps
* A new image is required because the default image does not contain these python modules: pandas, sklearn
* The docker image is built once using Kaniko inside the k8s cluster and pushed to a docker repository

In [None]:
def build_docker_image(image_name: str, docker_registry: str) -> str:
    """Builds the docker image, pushes to the registry, and returns the full usable image tag."""
    # from kubeflow.fairing import constants
    # print(constants.constants.KANIKO_IMAGE)
    from kubeflow.fairing.preprocessors import base as base_preprocessor
    from kubeflow.fairing.builders import cluster
    from kubeflow.fairing.builders.cluster.minio_context import MinioContextSource
    import logging

    # output_map is a map of extra files to add to the notebook.
    # It is a map from source location to the location inside the context.
    output_map =  {
        "Dockerfile.pipeline": "Dockerfile"
    }

    preprocessor = base_preprocessor.BasePreProcessor(
        command=["python"], # The base class will set this.
        input_files=[],
        path_prefix="/app", # irrelevant since we aren't preprocessing any files
        output_map=output_map)

    preprocessor.preprocess()

    minio_context_source = MinioContextSource(endpoint_url=minio_endpoint, minio_secret=minio_username,
                                              minio_secret_key=minio_key, region_name=minio_region)
    cluster_builder = cluster.cluster.ClusterBuilder(registry=docker_registry,
                                                     base_image="", # base_image is set in the Dockerfile
                                                     preprocessor=preprocessor,
                                                     image_name=image_name,
                                                     dockerfile_path="Dockerfile",
                                                     context_source=minio_context_source)
    cluster_builder.build()
    logging.info(f"Built image {cluster_builder.image_tag}")
    return cluster_builder.image_tag

In [None]:
# Image needs to be built, but only once
if not skip_test_code:
    image_tag = build_docker_image('titanic-pipeline', 'pcdas')

### Install Python packages when container runs

In [None]:
# test code
from typing import List
def install_packages(packages_to_install: List[str]) -> str:
    import sys
    import subprocess
    for pkg in packages_to_install:
        subprocess.run([sys.executable, '-m', 'pip', 'install', pkg])
    return 'installed: ' + str(packages_to_install)

## Set Minio/S3 global config & auth values

In [None]:
def get_minio_config() -> dict:
    return {
        "endpoint":"http://minio-service.kubeflow.svc.cluster.local:9000",
        "username":"minio",
        "key":"minio123",
        "region":"us-east-1"
    }    

## Download Minio/S3 file into the container

In [None]:
def download_file_to(file_name: str, bucket_name: str, uploaded_file_name: str):
    import boto3
    from botocore.client import Config

    minio = get_minio_config()
    session = boto3.session.Session()
    s3 = boto3.client('s3', endpoint_url=minio['endpoint'], aws_access_key_id=minio['username'], 
                      aws_secret_access_key=minio['key'], config=Config(signature_version='s3v4'),
                      region_name=minio['region'], use_ssl=False)
    with open(file_name, 'wb') as f:
        s3.download_fileobj(bucket_name, uploaded_file_name, f)

## Upload the prepared file to Minio/S3 storage

In [None]:
def upload_file_to(file_name: str, bucket_name: str, prepared_file: str):
    import boto3
    from botocore.client import Config

    minio = get_minio_config()
    session = boto3.session.Session()
    s3 = boto3.client('s3', endpoint_url=minio['endpoint'], aws_access_key_id=minio['username'], 
                      aws_secret_access_key=minio['key'], config=Config(signature_version='s3v4'),
                      region_name=minio['region'], use_ssl=False)
    with open(prepared_file, 'rb') as f:
        s3.upload_fileobj(f, bucket_name, file_name)

## Download Titanic dataset from Minio/S3 and create a Dataframe

In [None]:
import pandas as pd
def create_dataframe(bucket_name: str, uploaded_file_name: str) -> pd.DataFrame:
    downloaded_file_name = '/tmp/' + uploaded_file_name
    download_file_to(downloaded_file_name, bucket_name, uploaded_file_name)
    dft = pd.read_csv(downloaded_file_name)
    return dft

In [None]:
# test code
if not skip_test_code:
    dft = create_dataframe('pcdas', 'titanic.csv')
    print(dft)

In [None]:
# test code
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns

if not skip_test_code:
    sns.countplot(dft['Sex'], hue=dft['Survived'])

## Data Prep 1

In [None]:
import pandas as pd
def do_dataprep1(dft: pd.DataFrame):
    import numpy as np
    dft['familySize'] = dft['SibSp'] + dft['Parch'] + 1

    # fill null values
    dft['Cabin'] = dft['Cabin'].fillna('U')

    # fill null value with mode for the column
    dft['Embarked'] = dft['Embarked'].fillna(dft["Embarked"].mode()[0])

    # change values to numbers
    dft['Sex'] = np.where(dft['Sex'] == 'male', 0,
                 np.where(dft['Sex'] == 'female', 1, -1))

    dft['Embarked'] = np.where(dft["Embarked"] == "S", 0,
                      np.where(dft["Embarked"] == "C", 1,
                      np.where(dft["Embarked"] == "Q", 2, -1)))

    # convert age to integer values after replacing null values with -1
    dft['Age'] = dft['Age'].fillna(-1)
    dft['Age'] = dft['Age'].apply(lambda x: int(x))

In [None]:
# test code
if not skip_test_code:
    do_dataprep1(dft)
    dft.head(5)

## Data Prep 2 (Add Features)

In [None]:
import pandas as pd

# fill null ages and Fares with median values
def fillna_withdict(df: pd.DataFrame, col: str, acol: str, dict1: dict) -> pd.DataFrame: 
    import numpy as np
    print(dict1)
    for aggcol in dict1:
        val = dict1[aggcol]
        df[col] = np.where(((df[col].isna()) & (df[acol] == aggcol)), val, df[col])
    return df

def do_dataprep2(dft: pd.DataFrame) -> pd.DataFrame:
    dict1 = dft.groupby("Pclass")['Age'].median().to_dict()  
    dft = fillna_withdict(dft,'Age','Pclass', dict1)

    dict1 = dft.groupby("Pclass")['Fare'].median().to_dict()  
    dft = fillna_withdict(dft, 'Fare', 'Pclass', dict1)

    # convert cabin to numbers
    dft['v_Cabin'] = dft['Cabin'].str[0]

    cabin_category = {'A':1, 'B':2, 'C':3, 'D':4, 'E':5, 'F':6, 'G':7, 'T':8, 'U':9}
    dft['Cabintype'] = dft['v_Cabin'].map(cabin_category)

    # fills nulls for Embarked with 0
    dft["Embarked"] = dft["Embarked"].fillna(0)

    # drop redundant and other text/string columns
    dft = dft.drop(['Name', 'SibSp', 'Parch', 'Ticket', 'Cabin','v_Cabin'], axis = 1)
    return dft

In [None]:
# test code
if not skip_test_code:
    dft = do_dataprep2(dft)
    dft.head(5)

In [None]:
if not skip_test_code:
    sns.countplot(dft['Pclass'], hue=dft['Survived'])

## Write prepared dataframe to a local file system and upload to Minio/S3

In [None]:
import pandas as pd
def write_dataframe_and_upload(dft: pd.DataFrame, bucket_name: str, file_name: str) -> str:
    prepared_file='/tmp/' + file_name
    dft.to_csv(prepared_file, sep=',', index=False)
    upload_file_to(file_name, bucket_name, prepared_file)
    return bucket_name + '/' + file_name

In [None]:
# test code
if not skip_test_code:
    uploaded_fullpath = write_dataframe_and_upload(dft, 'pcdas', 'titan2.csv')
    print(f"Minio path is {uploaded_fullpath}")

## Create ML Training Pipeline
* Evaluate Linear Regression and Random Forest models

### Pipeline step for data preparation
* Data preparation task will be run in a container

In [None]:
def do_dataprep(uploaded_file: str) -> str:
    """
    Takes in uploaded file to process. After preparation using a local filesystem,
    returns the uploaded prepared filename.
    """
    parts = uploaded_file.split('/')
    bucket = parts[0]
    file_name = parts[1]
    dft = create_dataframe(bucket, file_name)
    do_dataprep1(dft)
    dft = do_dataprep2(dft)
    
    parts = file_name.split('.')
    prepared_file_name = parts[0] + '_prepared.' + parts[1]
    uploaded_fullpath = write_dataframe_and_upload(dft, bucket, prepared_file_name)
    return uploaded_fullpath

In [None]:
# test code
if not skip_test_code:
    uploaded_file = do_dataprep('pcdas/titanic.csv')
    print(f'Uploaded file to {uploaded_file}')

## Linear Regression Model training

In [None]:
import logging
import pandas as pd
def sklearn_Linear_Regression(prepared_file_name: str) -> str:
    logging.info(f'Prepared filename: {prepared_file_name}')
    parts = prepared_file_name.split('/')
    dft = create_dataframe(parts[0], parts[1])
    return sklearn_Linear_Regression_1(dft)

def sklearn_Linear_Regression_1(train: pd.DataFrame) -> str:
    import numpy as np
    import sklearn

    from sklearn.model_selection import train_test_split

    # Here is out local validation scheme!
    X_train, X_test, y_train, y_test = train_test_split(train.drop(['Survived', 'PassengerId'], axis = 1), 
                                                        train['Survived'], test_size = 0.2,
                                                        random_state = None)

    # We'll use a logistic regression model again, but we'll go to something more fancy soon! 
    from sklearn.linear_model import LogisticRegression
    logisticRegression = LogisticRegression(max_iter = 10000)
    logisticRegression.fit(X_train, y_train)

    # Predict!
    predictions = logisticRegression.predict(X_test)

    # Print our preditions
    print(predictions)
    # Check mean
    round(np.mean(predictions), 2)

    from sklearn.metrics import classification_report, confusion_matrix

    # Print the resulting confusion matrix
    print(confusion_matrix(y_test, predictions))

    # Calculate Accuracy!
    from sklearn.model_selection import KFold

    # Set our robust cross-validation scheme!
    kf = KFold(n_splits = 5, random_state = 2)

    from sklearn.model_selection import cross_val_score

    # Print our CV accuracy estimate:
    acc1 = cross_val_score(logisticRegression, X_test, y_test, cv = kf).mean() 
    return str(acc1)


In [None]:
# test code
if not skip_test_code:
    accuracy_result1 = sklearn_Linear_Regression('pcdas/titanic_prepared.csv')
    print(accuracy_result1)

### Random Forest Model Training

In [None]:
import logging
import pandas as pd
def random_forest(prepared_file_name: str) -> str:
    logging.info(f'Prepared filename: {prepared_file_name}')
    parts = prepared_file_name.split('/')
    dft = create_dataframe(parts[0], parts[1])
    return random_forest_1(dft)

def random_forest_1(train: pd.DataFrame) -> str:
    import sklearn
    from sklearn.ensemble import RandomForestClassifier
    import numpy as np 

    from sklearn.model_selection import train_test_split

    # Here is out local validation scheme!
    X_train, X_test, y_train, y_test = train_test_split(train.drop(['Survived', 'PassengerId'], axis = 1), 
                                                        train['Survived'], test_size = 0.2,
                                                        random_state = None)

    #Initialize randomForest
    randomForest = RandomForestClassifier(random_state = 2)

    # Set our parameter grid
    param_grid = { 
        'criterion' : ['gini', 'entropy'],
        'n_estimators': [100, 300, 500],
        'max_features': ['auto', 'log2'],
        'max_depth' : [3, 5, 7]    
    }

    from sklearn.model_selection import GridSearchCV

    # Grid search
    randomForest_CV = GridSearchCV(estimator = randomForest, param_grid = param_grid, cv = 5)
    randomForest_CV.fit(X_train, y_train)

    # Print best hyperparameters
    randomForest_CV.best_params_
    randomForestFinalModel = RandomForestClassifier(random_state = 2, criterion = 'gini', max_depth = 7, max_features = 'auto', n_estimators = 300)
    
    # Fit the model to the training set
    randomForestFinalModel.fit(X_train, y_train)
    # Predict!
    predictions = randomForestFinalModel.predict(X_test)

    from sklearn.metrics import accuracy_score

    # Calculate the accuracy for our powerful random forest!
    acc2 = round(accuracy_score(y_test, predictions), 2)
    print("accuracy is: ", round(accuracy_score(y_test, predictions), 2))   
    return str(acc2)

## Collect accuracy results from both models

In [None]:
def final_step(acc1: str, acc2: str) -> str:
    from tensorflow.python.lib.io import file_io
    metadata = {
        'outputs' : [
            {
              'storage': 'inline',
              'source': '# LR Accuracy\n' + acc1,
              'type': 'markdown',
            },
            {
              'storage': 'inline',
              'source': '# RF Accuracy \n' + acc2,
              'type': 'markdown',
            }        
        ]
    }
    return str(metadata)

### Define and compile the traning pipeline
* Uses a pre-built docker image that contains installed modules: pandas, sklearn

In [None]:
def get_image_tag() -> str:
    return 'pcdas/titanic-pipeline:4899B81E'

In [None]:
import kfp
from kfp.components import func_to_container_op

# converting functions to container operation
image_tag = get_image_tag()
dataprep_operation = func_to_container_op(do_dataprep, base_image=image_tag, use_code_pickling=True)
sklearn_operation = func_to_container_op(sklearn_Linear_Regression,
                                         base_image=image_tag, use_code_pickling=True)
rforest_operation = func_to_container_op(random_forest,
                                         base_image=image_tag, use_code_pickling=True)
final_step_operation = func_to_container_op(final_step)

from kfp.dsl import pipeline 
@pipeline( # defining pipeline metadata
    name='titanic-prep-training',
    description='Prepare titanic dataset'
)
# stitch the pipeline steps
def dataprep_training_pipeline(context: str = "pcdas/titanic.csv"):
    import logging
    step_0 = dataprep_operation(context)
    logging.info(step_0.output)
    step_1_1 = sklearn_operation(step_0.output) 
    step_1_2 = rforest_operation(step_0.output) 
    step_3 = final_step_operation(step_1_1.output, step_1_2.output) 
    
from kfp.compiler import Compiler
# generate the pipeline file that can be executed
pipeline_file = 'titanic-pipeline.zip'
Compiler().compile(dataprep_training_pipeline, pipeline_file)    

### Deploy training pipeline

In [None]:
import kfp
from kfp import Client
client = kfp.Client()
experiment = client.create_experiment(name = "Titanic dataset Experiment2") #creating experiment
run_name = "Titanic ML Test2"
pipeline_filename = pipeline_file
run_result = client.run_pipeline( # submit a pipeline run to k8s
    experiment.id, 
    run_name,
    pipeline_filename,
    params = {}
)
print(run_result)