# Taxi tips Pipeline

### Using what you learnt in the 2 last notebooks, you will create a pipeline with 3 components to load, train and test a model

### create all components

In [None]:
# create folder to store the components definitions
!mkdir components

#### Read data on a minio bucket

In [None]:
import kfp as kfp
import kfp.dsl as dsl
from kfp import components
import os
from kfp.components import InputPath, OutputPath, create_component_from_func

In [None]:
def get_data_from_minio(
    minio_path: str,
    bucket: str,
    dest_file_path: OutputPath(),
    ):
    
    import numpy
    from io import BytesIO
    import pandas as pd
    import urllib3
    from minio import Minio
    import os
    import pyarrow

    client = Minio(
    ...
    )

    # Get data from minio using get_object, decode it using BytesIO and read the parquet result with pandas
    try:
        ...
    finally:
        response.close()
        response.release_conn()
    ### pass dataset to component output
    data.to_parquet(...)

In [None]:
create_component_from_func(
    ...,
    output_component_file='components/get_data_from_minio.yaml',
    base_image='python:3.8',
    packages_to_install=[
        'numpy==1.21.6',
        'minio==6.0.2',
        'pandas==1.0.5',
        'pyarrow==10.0.1'
    ],
)

#### Train a model

In [None]:
def xgboost_train(
    training_data_path: InputPath('CSV'),  # Also supports LibSVM
    model_path: OutputPath('XGBoostModel'),
    model_config_path: OutputPath('XGBoostModelConfig'),
    training_log_path: OutputPath(),
    starting_model_path: InputPath('XGBoostModel') = None,
    


    label_column: int = 0,
    num_iterations: int = 10,
    booster_params: dict = None,

    # Booster parameters
    objective: str = 'reg:squarederror',
    booster: str = 'gbtree',
    learning_rate: float = 0.3,
    min_split_loss: float = 0,
    max_depth: int = 6,
):
    '''Train an XGBoost model.
    Args:
        training_data_path: Path for the training data in CSV format.
        model_path: Output path for the trained model in binary XGBoost format.
        model_config_path: Output path for the internal parameter configuration of Booster as a JSON string.
        starting_model_path: Path for the existing trained model to start from.
        label_column: Column containing the label data.
        num_boost_rounds: Number of boosting iterations.
        booster_params: Parameters for the booster. See https://xgboost.readthedocs.io/en/latest/parameter.html
        objective: The learning task and the corresponding learning objective.
            See https://xgboost.readthedocs.io/en/latest/parameter.html#learning-task-parameters
            The most common values are:
            "reg:squarederror" - Regression with squared loss (default).
            "reg:logistic" - Logistic regression.
            "binary:logistic" - Logistic regression for binary classification, output probability.
            "binary:logitraw" - Logistic regression for binary classification, output score before logistic transformation
            "rank:pairwise" - Use LambdaMART to perform pairwise ranking where the pairwise loss is minimized
            "rank:ndcg" - Use LambdaMART to perform list-wise ranking where Normalized Discounted Cumulative Gain (NDCG) is maximized

    '''
    import pandas
    import xgboost
    from sklearn.metrics import roc_curve
    from tensorboardX import SummaryWriter
    from sklearn.model_selection import train_test_split
    import pyarrow
    
    
    ### embedded function to allow tensorboard to monitor the training ###
    def TensorBoardCallback():
        writer = SummaryWriter(training_log_path)

        def callback(env):
            for k, v in env.evaluation_result_list:
                print(k,v)
                writer.add_scalar(k, v, env.iteration)

        return callback
    
    ### load data ###
    
    df = ...
    
    ### autoclean data to allow only copatible types in features
    numerics = ['int','float']
    df = df.select_dtypes(...)
    
    ### split data ###

    data=...
    label=...
    X_train, X_test, y_train, y_test = ...
    
    dtrain = ...
    dtest = ...

    ### model HP ###

    booster_params = booster_params or {}
    booster_params.setdefault('objective', objective)
    booster_params.setdefault('booster', booster)
    booster_params.setdefault('learning_rate', learning_rate)
    booster_params.setdefault('min_split_loss', min_split_loss)
    booster_params.setdefault('max_depth', max_depth)
    
    ### Not from scratch training management ###

    starting_model = None
    if starting_model_path:
        starting_model = xgboost.Booster(model_file=starting_model_path)

    ### Model fit to data ###

    model = xgboost.train(
        ...
    )
        
    ### Save the model as an artifact ###
    model.save_model(...)

    # save the model config
    model_config_str = model.save_config()
    with open(model_config_path, 'w') as model_config_file:
        model_config_file.write(...)
        

In [None]:
create_component_from_func(
    ...,
    output_component_file='components/xgb_train_dbg.yaml',
    base_image='python:3.7',
    packages_to_install=[
        'xgboost==1.1.1',
        'pandas==1.0.5',
        'tensorboardX==2.5.1',
        'scikit-learn==1.0',
        'pyarrow==10.0.1'
    ],
)

#### Make predictions with this model

In [None]:

def xgboost_predict(
    data_path: InputPath('CSV'),  # Also supports LibSVM
    model_path: InputPath('XGBoostModel'),
    predictions_path: OutputPath('Predictions'),
    label_column: int = None,
):
    '''Make predictions using a trained XGBoost model.
    Args:
        data_path: Path for the feature data in CSV format.
        model_path: Path for the trained model in binary XGBoost format.
        predictions_path: Output path for the predictions.
        label_column: Column containing the label data.
    '''
    from pathlib import Path

    import numpy
    import pandas
    import xgboost
    import pyarrow

    df = pandas.read_parquet(
        data_path,
    )
    
    ### autoclean data to allow only copatible types in features
    numerics = ['int','float']
    df = ...

    ### drop label column if provided
    if ... :
        df = ...

    testing_data = xgboost.DMatrix(
        data=df,
    )

    model = xgboost.Booster(model_file=model_path)

    predictions = model.predict(testing_data)

    Path(predictions_path).parent.mkdir(parents=True, exist_ok=True)
    numpy.savetxt(predictions_path, predictions)

In [None]:
create_component_from_func(
    ...,
    output_component_file='components/xgb_predict.yaml',
    base_image='python:3.7',
    packages_to_install=[
        'xgboost==1.1.1',
        'pandas==1.0.5',
        'pyarrow==10.0.1'
    ],
)

### Load components to use them in a pipeline

In [None]:
get_data_from_minio_op = components.load_component_from_file(...)
xgboost_predict_on_csv_op=components.load_component_from_file(...)
xgboost_train_on_csv_op=components.load_component_from_file(...)

### Pipeline definition

In [None]:
user=''#firstname-lastname
namespace = f'kubeflow-user-{user}'

In [None]:
@dsl.pipeline(name='xgboost_performance')
def xgboost_pipeline(namespace=namespace):
    import datetime
    from kfp.onprem import use_k8s_secret
    
    bucket=''#firstname-lastname
    
    data = get_data_from_minio_op(
        minio_path = 'datasets/chicago/trips.parquet',
        bucket = bucket,
    )
    
    ### this allows using real secret in a component
    data.apply(
        use_k8s_secret(
            secret_name='minio-service-account',
            k8s_secret_key_to_env={
                'access_key':'MINIO_ACCESS_KEY',
                'secret_key':'MINIO_SECRET_KEY'
            }
        )
    )
    
    # Training and prediction on dataset in CSV format
    model_trained_on_csv = xgboost_train_on_csv_op(
        training_data=data.output,
        label_column=0,
        objective='reg:squarederror',
        num_iterations=200,
    ).set_memory_limit('1Gi').outputs
    
    xgboost_predict_on_csv_op(
        data=data.output,
        model=model_trained_on_csv['model'],
        label_column=0,
    ).set_memory_limit('1Gi')
    
    

In [None]:
import datetime as dt

### Create the KFP client to link with Kubeflow Pipeline

In [None]:
### a token has been automatically provided in the KF_PIPELINES_SA_TOKEN_PATH variable. This token allow accès to only your namespace
token_file = os.getenv("KF_PIPELINES_SA_TOKEN_PATH")
with open(token_file) as f:
    token = f.readline()
client = kfp.Client(host='http://ml-pipeline.kubeflow.svc.cluster.local:8888',
               existing_token=token)

In [None]:
EXPERIMENT_NAME = 'Aiengineer labs session2'

### Submit the pipeline using the client

In [None]:
run_id = client.create_run_from_pipeline_func(
    pipeline_func = ..., 
    namespace=..., 
    experiment_name=...,
    run_name=f"XGB_taxi{dt.datetime.today().isoformat()}",
    arguments={},
).run_id
print("Run ID: ", run_id)

### Exercice Upgrade this pipeline!

#### Now we want to add :

- A preprocessing component with better feature creation
- Pipeline adjustments to be closer to a real ML workflow
- A metric exporter in the the train phase, as in the previous notebook
- A model push to MinIO in a separate component
- A prediction push in a separate component

#### Preprocessing component

He will be responsible of 
- separate train/test from validation data
- impute missing values on train/test
- get rid of outliers
- Return 2 parquet datasets : train_test that will be used by train component and validation that will be used by predict component

You can debug your code by calling the function in nexts cells

```python
train_test_df, validation_df  = preprocess_data(data)
```

In [None]:
def preprocess_data(
    input_data_path: InputPath('CSV'), 
    preprocess_train_test_data: OutputPath(),
    preprocess_validation_data: OutputPath(),
    label_column: int = 0,
):

    import pandas
    import numpy as np
    from sklearn.metrics import roc_curve
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import IsolationForest
    # Missing values imputations with mean values
    from sklearn.impute import SimpleImputer

    
    ### load data ###
    
    df = pandas.read_parquet(
        input_data_path,
    )

    ### separate train_test from validation
    train_test = ...
    validation_set = 

    # Create our imputer to replace missing values with the mean e.g.
    imp = ...
    imp_train = ...

    # Impute our data, then transform train_test dataset 
    train_test_imp = ...

    # Instanciate isolation forest to get rid of outliers
    isolate_forest = ...
    isolate_forest.fit(.....)
    isolate_predictions = ...

    ### clean dataset with results
    train_test_imp = ...


    train_test_imp.to_parquet(preprocess_train_test_data)
    validation_set.to_parquet(preprocess_validation_data)


In [None]:
create_component_from_func(
    preprocess_data,
    output_component_file='components/preprocess_data.yaml',
    base_image='python:3.7',
    packages_to_install=[
        'pandas==1.0.5',
        'scikit-learn==1.0',
        'tensorboardX==2.5.1',
        'pyarrow==10.0.1',
        'fastparquet',
        'numpy'
    ],
)

#### Pipeline adjustments

You can see that in our first taxi_tips pipeline, we call the prediction with some data that the model already saw. This prevent from getting any usable performance metrics.

lets implement a better pipeline using the 2 outputs of preprocessing component

![pipeline2](./images/pipeline2.png)


for this we need to use preprocess component outputs separately : 

```python
preprocess_task = preprocess_op(input_data=get_data.output).set_memory_limit('1Gi')

train_test_set = preprocess_task.outputs["preprocess_train_test_data"]
validation_set = preprocess_task.outputs["preprocess_validation_data"]

model_trained_on_csv = xgboost_train_on_csv_op(
        training_data=train_test_set,
        label_column=0,
        objective='reg:squarederror',
        num_iterations=200,
    ).set_memory_limit('1Gi').outputs
    
xgboost_predict_on_csv_op(
        data=validation_set,
        model=model_trained_on_csv['model'],
        label_column=0,
    ).set_memory_limit('1Gi')

```

#### Metrics exporter

As we exported the "add_result" variable in tp 1, you will use the metrics api to export training or performance test metrics 

```python

    ### Save and exports metrics ###
    metrics = {
    'metrics': [{
        'name': 'training_metric_XXXX', 
        'numberValue': .... , 
        }]
    }
    
    with open(mlpipeline_metrics_path, 'w') as f:
        json.dump(metrics, f)


```

Add it to train and predict components!

#### Push model, results

As in the get_data components, we will need to create a component with a minio client, and look for the right way to `put` or `fput` our object in the right `type`.


object to export are : 

```python
#####
model_trained_on_csv[model]
    
xgboost_predict_on_csv_op.outputs['results']
```

Code skeletton for pickle model export: 

In [None]:
def save_xgboost_model_pkl(
    bucket: str,
    model_path: InputPath('XGBoostModel'),
):
    '''Make predictions using a trained XGBoost model.
    Args:
        bucket: Bucket name used in Minio to store the model 
        model_path: Path for the trained model in binary XGBoost format.
    '''
    import pickle
    import xgboost
    import urllib3
    from minio import Minio
    from datetime import datetime

    # load model using input model_path
    model =

    # Create a date to store the model time & save the model as pickle
    inference_date =
    with open('xgboost_model_{}.pkl'.format(inference_date),'wb') as f:
        pickle.dump(model, f)

    client = Minio(
    ...
    ...
    ...
    )

    ### define file name to identify which object put
    minio_model_name = 'xgboost_model_{}.pkl'.format(inference_date)
    ### put object
    client. ...
    

In [None]:
create_component_from_func(
    save_xgboost_model_pkl,
    output_component_file='components/save_xgboost_model_pkl.yaml',
    base_image='python:3.7',
    packages_to_install=[
        'minio==6.0.2',
        'xgboost==1.1.1',
    ],
)