In [1]:
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath
from typing import NamedTuple

## Download and Load the Dataset

In [2]:
# load data step
def load_data(download_link: str, data_path: OutputPath(str)):
        
    # install the necessary libraries
    import os, sys, pickle, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, "-m", "pip", "install", "wget"])
    subprocess.run([sys.executable, "-m", "pip", "install", "openpyxl"])
    import wget
    
    # import libraries
    import pandas as pd
    import openpyxl
    
    # create data_path directory
    if not os.path.exists(data_path):
        os.makedirs(data_path)

    # download data
    wget.download(download_link, f'{data_path}/Delivery_truck_trip_data.xlsx')
    
    # read data
    data = pd.read_excel(f'{data_path}/Delivery_truck_trip_data.xlsx', engine='openpyxl')

    # Save data as a pickle file to be used by the tranform_data component.
    with open(f'{data_path}/data', 'wb') as f:
        pickle.dump(data, f)

    return(print('Done!'))

## Transform Data

In [3]:
# transform data step

def transform_data(data_path: InputPath(str), 
              transform_data_path: OutputPath(str)):
    
    # install the necessary libraries
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','scipy'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'geopy'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'numpy'])
    
    # import Libraries
    import os, pickle;
    import pandas as pd
    import numpy as np
    import geopy

    
    # load data from data_path
    with open(f'{data_path}/data', 'rb') as f:
        data = pickle.load(f)

    # 8. Change ontime colunm to 1 for ontime and 0 for delay
    data.ontime.replace({'G':1, np.NaN:0}, inplace=True)

    # 3. Replace NaN with unknown for vechicle type
    data.vehicleType.replace(np.NaN, 'unknown', inplace=True)
    
    # 10. Filling NaN in Transportation distance with difference in lon/lot of origin and destination
    from geopy import distance

    geodistance_km = []
    for row in data.itertuples(index=False):
        geodistance_km.append(distance.distance(row.Org_lat_lon, row.Des_lat_lon).km)

    data['geodistaince_km']=geodistance_km

    # Replace NaN row in 'TRANSPORTATION_DISTANCE_IN_KM' with geodisatnce values
    data.TRANSPORTATION_DISTANCE_IN_KM.fillna(data.geodistaince_km, inplace=True)
    
    #11. Create the expected travel time in hours
    data['expected_travelhours']=(data.Planned_ETA-data.trip_start_date).astype('timedelta64[h]')

    # There are negative travel hours. I replace them with 0 hour.
    data.expected_travelhours[data.expected_travelhours<0]=0
    data.expected_travelhours.sort_values()

    # 6784 row can be dropped, because it looks like an outlier.
    data.drop(index=6784,axis=0,inplace=True)
    
    # Based on the consideration above, I use the following the first 9 columes as input features. The last one is the target.
    data_use = data[['Market/Regular ','OriginLocation_Code','DestinationLocation_Code',
                 'TRANSPORTATION_DISTANCE_IN_KM','vehicleType',
                 'customerID','supplierID','Material Shipped','ontime']] #'expected_travelhours',
    # They are not many som just trop them.
    data_use.dropna(axis=0,inplace=True)

    # Fix these columns
    data_use['OriginLocation_Code'] = data_use['OriginLocation_Code'].apply(str)
    data_use['DestinationLocation_Code'] = data_use['DestinationLocation_Code'].apply(str)
    data_use['supplierID'] = data_use['supplierID'].apply(str)

    
    #creating the transform_data_path
    os.makedirs(transform_data_path, exist_ok = True)
    
    #Save data as a pickle file to be used by the feature_engineering component.
    with open(f'{transform_data_path}/data', 'wb') as f:
        pickle.dump(data_use, f)
    
    return(print('Done!'))

## Feature Engineering

In [4]:
# feature engineering step

def feature_engineering(transform_data_path: InputPath(str), 
            feat_eng_path: OutputPath(str)):
    
    # install the necessary libraries
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn'])
    
  
    
    # import Library
    import os, pickle;
    import numpy as np
    import pandas as pd
    from sklearn.model_selection import train_test_split
    
    # loading the data
    with open(f'{transform_data_path}/data', 'rb') as f:
        data = pickle.load(f)
        
    # Take a copy for preproessing
    df = data.copy()

    # Make X and y
    X = df.drop(columns='ontime', axis=1)
    y = df['ontime']
    
    from sklearn.preprocessing import OrdinalEncoder
    encoder = OrdinalEncoder()
    cat_columns = ['Market/Regular ','OriginLocation_Code','DestinationLocation_Code','vehicleType','customerID','supplierID','Material Shipped']
    X[cat_columns] = encoder.fit_transform(X[cat_columns])
    
    # split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=22, stratify=y)

    # creating the feat_eng_path
    os.makedirs(feat_eng_path, exist_ok = True)
      
    # save the train_test_split data as a pickle file to be used by the modeling component.
    with open(f'{feat_eng_path}/split_data', 'wb') as f:
        pickle.dump((X_train, X_test, y_train, y_test), f)
    
    return(print('Done!'))  

## Modelling

In [5]:
# xgboost modeling step

def xgboost_modeling(feat_eng_path: InputPath(str), 
                     xgb_ensemble_path: OutputPath(str),
                     mlpipeline_ui_metadata_path: OutputPath(str)):
    
    # install the necessary libraries
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','xgboost'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn'])
    
    # import Library
    import os, json, pickle, joblib;
    import numpy as np
    import pandas as pd
    from xgboost import XGBClassifier
    from sklearn.metrics import confusion_matrix
    from collections import namedtuple

    #loading the split_data data
    with open(f'{feat_eng_path}/split_data', 'rb') as f:
        X_train, X_test, y_train, y_test = pickle.load(f)
            
    #creating the ensemble_path directory
    os.makedirs(xgb_ensemble_path, exist_ok = True)
    
    # model initialization
    xgb=XGBClassifier(scale_pos_weight=0.3627, 
                      max_depth=10, 
                      learning_rate=0.1043242, 
                      n_estimators=600, 
                      colsample_bylevel=0.8, 
                      reg_alpha=0.8,
                      silent=True, 
                      metrics='auc', 
                      random_state=22)
    
    # fitting
    xgb.fit(X_train,y_train,eval_set=[(X_train,y_train),(X_test, y_test)], early_stopping_rounds=50,verbose=50)
    
    # predict
    xgb_pred = xgb.predict(X_test)
    
    #Save the predicted data as a pickle file to be used by the ensembling component.
    with open(f'{xgb_ensemble_path}/xgb_pred', 'wb') as f:
        pickle.dump(xgb_pred, f) 
    
    # plot confusion_matrix
    cm = confusion_matrix(y_test, xgb_pred)
    vocab = list(np.unique(y_test))
    
    # confusion_matrix pair dataset 
    data = []
    for target_index, target_row in enumerate(cm):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))
    
    # convert confusion_matrix pair dataset to dataframe
    df = pd.DataFrame(data,columns=['target','predicted','count'])
    
    # change 'target', 'predicted' to integer strings
    df[['target', 'predicted']] = (df[['target', 'predicted']].astype(int)).astype(str)
    
    #create kubeflow metric metadata for UI
    metadata = {
                "outputs": [
                    {
                        "type": "confusion_matrix",
                        "format": "csv",
                        "schema": [
                            {
                                "name": "target",
                                "type": "CATEGORY"
                            },
                            {
                                "name": "predicted",
                                "type": "CATEGORY"
                            },
                            {
                                "name": "count",
                                "type": "NUMBER"
                            }
                        ],
                        "source": df.to_csv(header=False, index=False),
                        "storage": "inline",
                        "labels": [
                            "0",
                            "1"
                        ]
                    }
                ]
            }
    
    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata, metadata_file)

    conf_m_result = namedtuple('conf_m_result', ['mlpipeline_ui_metadata'])
    
    return conf_m_result(json.dumps(metadata))

## Light GBM

In [6]:
# lightgbm modeling step

def lightgbm_modeling(feat_eng_path: InputPath(str), 
                      lgbm_ensemble_path: OutputPath(str),
                      mlpipeline_ui_metadata_path: OutputPath(str)):
    
    # install the necessary libraries
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','lightgbm'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn'])
    
    # import Library
    import os, json, pickle;
    import numpy as np
    import pandas as pd
    from lightgbm import LGBMClassifier
    from sklearn.metrics import confusion_matrix
    from collections import namedtuple

    #loading the new_feats data
    with open(f'{feat_eng_path}/split_data', 'rb') as f:
        X_train, X_test, y_train, y_test = pickle.load(f)
        
    
    #creating the ensemble_path directory
    os.makedirs(lgbm_ensemble_path, exist_ok = True)
    
    # model initialization
    lgbm = LGBMClassifier(random_state=22,scale_pos_weight=0.362)

    # fitting
    lgbm.fit(X_train, y_train, categorical_feature = 'auto', eval_set=(X_test, y_test),feature_name='auto', verbose=0)
    
    # predict
    lgbm_pred = lgbm.predict(X_test)
    
    #Save the predicted data as a pickle file to be used by the ensembling component.
    with open(f'{lgbm_ensemble_path}/lgbm_pred', 'wb') as f:
        pickle.dump((y_test, lgbm_pred), f)
    
    # plot confusion_matrix
    cm = confusion_matrix(y_test, lgbm_pred)
    vocab = list(np.unique(y_test))
    
    # confusion_matrix pair dataset 
    data = []
    for target_index, target_row in enumerate(cm):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))
    
    # convert confusion_matrix pair dataset to dataframe
    df = pd.DataFrame(data,columns=['target','predicted','count'])
    
    # change 'target', 'predicted' to integer strings
    df[['target', 'predicted']] = (df[['target', 'predicted']].astype(int)).astype(str)
    
    # create kubeflow metric metadata for UI
    metadata = {
                "outputs": [
                    {
                        "type": "confusion_matrix",
                        "format": "csv",
                        "schema": [
                            {
                                "name": "target",
                                "type": "CATEGORY"
                            },
                            {
                                "name": "predicted",
                                "type": "CATEGORY"
                            },
                            {
                                "name": "count",
                                "type": "NUMBER"
                            }
                        ],
                        "source": df.to_csv(header=False, index=False),
                        "storage": "inline",
                        "labels": [
                            "0",
                            "1"
                        ]
                    }
                ]
            }
    
    
    
    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata, metadata_file)

    conf_m_result = namedtuple('conf_m_result', ['mlpipeline_ui_metadata'])
    
    return conf_m_result(json.dumps(metadata))

## Ensembling

In [7]:
# ensembling step

def ensembling(lgbm_ensemble_path: InputPath(str),
               xgb_ensemble_path: InputPath(str),
              # cb_ensemble_path: InputPath(str),
               mlpipeline_ui_metadata_path: OutputPath(str)) -> NamedTuple('conf_m_result', [('mlpipeline_ui_metadata', 'UI_metadata')]):
    
    # install the necessary libraries
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','scipy'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn'])
    
    # import Library
    import os, json, pickle;
    import numpy as np
    import pandas as pd
    from scipy import stats
    from sklearn.metrics import confusion_matrix
    from collections import namedtuple
    
    #loading the new_feats data
    with open(f'{lgbm_ensemble_path}/lgbm_pred', 'rb') as f:
        (y_test, lgbm_pred) = pickle.load(f)
    with open(f'{xgb_ensemble_path}/xgb_pred', 'rb') as g:
        xgb_pred = pickle.load(g)
#     with open(f'{cb_ensemble_path}/cb_pred', 'rb') as h:
#         cb_pred = pickle.load(h)
    
    # create an array of all predictions
    predictions = np.array([xgb_pred, lgbm_pred]) #cb_pred,
    
    # find the most frequent predicted value 
    pred_mode = stats.mode(predictions, axis=0)[0][0]
    
    # plot confusion_matrix
    cm = confusion_matrix(y_test, pred_mode)
    vocab = list(np.unique(y_test))
    
    # confusion_matrix pair dataset 
    data = []
    for target_index, target_row in enumerate(cm):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))
    
    # convert confusion_matrix pair dataset to dataframe
    df = pd.DataFrame(data,columns=['target','predicted','count'])
    
    # change 'target', 'predicted' to integer strings
    df[['target', 'predicted']] = (df[['target', 'predicted']].astype(int)).astype(str)
    
    # create kubeflow metric metadata for UI
    metadata = {
                "outputs": [
                    {
                        "type": "confusion_matrix",
                        "format": "csv",
                        "schema": [
                            {
                                "name": "target",
                                "type": "CATEGORY"
                            },
                            {
                                "name": "predicted",
                                "type": "CATEGORY"
                            },
                            {
                                "name": "count",
                                "type": "NUMBER"
                            }
                        ],
                        "source": df.to_csv(header=False, index=False),
                        "storage": "inline",
                        "labels": [
                            "0",
                            "1"
                        ]
                    }
                ]
            }
    
    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata, metadata_file)

    conf_m_result = namedtuple('conf_m_result', ['mlpipeline_ui_metadata'])
    
    return conf_m_result(json.dumps(metadata))

## Create Pipeline Component

In [8]:
# create light weight components
load_op = comp.create_component_from_func(load_data,base_image="python:3.7.1")
transform_op = comp.create_component_from_func(transform_data,base_image="python:3.7.1")
feature_eng_op = comp.create_component_from_func(feature_engineering,base_image="python:3.7.1")
xgboost_modeling_op = comp.create_component_from_func(xgboost_modeling, base_image="python:3.7.1")
lightgbm_modeling_op = comp.create_component_from_func(lightgbm_modeling, base_image="python:3.7.1")
ensembling_op = comp.create_component_from_func(ensembling, base_image="python:3.7.1")

In [9]:
# define pipeline
@dsl.pipeline(name="logistics_service_analysis", 
              description="Predicting Ontime Delivery.")

# Define parameters to be fed into pipeline
def logistics_service_analysis_pipeline(
                             download_link: str,
                             data_path: str,
                             transform_data_path: str, 
                             feat_eng_data_path: str,
                             xgb_ensemble_path:str,
                             lgbm_ensemble_path:str
                            ):

    # Create load container.
    load_container = load_op(download_link)
    # Create transform container.
    transform_container = transform_op(load_container.output)
    # Create feature engineering container.
    feature_eng_container = feature_eng_op(transform_container.output)
    # Create xgboost modeling container.
    xgb_modeling_container = xgboost_modeling_op(feature_eng_container.output)
    # Create lightgbm modeling container.
    lgbm_modeling_container = lightgbm_modeling_op(feature_eng_container.output)
    # Create ensemble container.
    ensembling_container = ensembling_op(lgbm_modeling_container.outputs["lgbm_ensemble"], \
                                         xgb_modeling_container.outputs["xgb_ensemble"])#, \
                                        # cb_modeling_container.outputs["cb_ensemble"])

In [10]:
# create client that would enable communication with the Pipelines API server 
client = kfp.Client()

In [11]:
# arguments
download_link = 'https://github.com/saurabhpimpalkar8/Logistic_Analysis/blob/main/data/Delivery_truck_trip_data.xlsx?raw=true'
data_path = "data"
transform_data_path = "tdp"
feat_eng_data_path = "feat"                         
xgb_ensemble_path = "xep"
lgbm_ensemble_path = "lep"

In [12]:
pipeline_func = logistics_service_analysis_pipeline

experiment_name = 'logistics_service_analysis_pipeline'
run_name = pipeline_func.__name__ + ' run'

arguments = {
             "download_link": download_link,
             "data_path": data_path,
             "transform_data_path": transform_data_path,
             "feat_eng_data_path": feat_eng_data_path,
             "xgb_ensemble_path": xgb_ensemble_path,
             "lgbm_ensemble_path": lgbm_ensemble_path
            }

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments
                                                 )