In [None]:
#!python -m pip install --user --upgrade pip

#!pip3 install pandas==0.23.4 matplotlib==3.0.3 scipy==1.2.1 scikit-learn==0.22 tensorflow==2.0 keras==1.2.2 --user

In [None]:
#!pip3 install seaborn

In [None]:
# install kubeflow pipeline sdk
#!pip3 install kfp --user  

In [None]:
# import libraries for pipeline
import kfp
import kfp.dsl as dsl
import kfp.components as comp

In [None]:
# create  directory for outputs.
output_dir = "/home/jovyan/Road-Safety-OSP/data/"


In [None]:
# create preprocessing fucntion

def preprocess(data_path):
    
    # Import Libraries
    
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas==0.23.4'])
    import pandas as pd
    from datetime import datetime
    
    # import data
    
    accident_data = pd.read_csv('https://raw.githubusercontent.com/Uthmanic/07-road-safety/master/data/dftRoadSafety_Accidents_2016.csv')
    vehicle_data = pd.read_csv('https://raw.githubusercontent.com/Uthmanic/07-road-safety/master/data/Veh.csv')
    
    all_data = pd.merge(vehicle_data, accident_data, how = 'inner', on = 'Accident_Index')
    
    # function for obtaining month in date column
    
    def month(date):
        fulldate = datetime.strptime(date, '%d/%m/%Y')
        return int(datetime.strftime(fulldate, '%m'))
    
    # create a coloumn for month
    all_data['Month'] = all_data['Date'].apply(month)
    
    
    # function for obtaining year in date column
    def year(date):
        fulldate = datetime.strptime(date, '%d/%m/%Y')
        return int(datetime.strftime(fulldate, '%Y'))
    
    # create a coloumn for year
    all_data['Year'] = all_data['Date'].apply(year)
    
     
    # function for obtaining hour in time column
    def hour(time):
        try:
            fulltime = datetime.strptime(time, '%H:%M')
            return int(datetime.strftime(fulltime, '%H'))
        except Exception:
            # for missing values 
            return 0
        
    # create a coloumn for hour of the day    
    all_data['Hour_of_the_day'] = all_data['Time'].apply(hour)
    
    # drop irrelevant columns
    
    all_data.drop(['LSOA_of_Accident_Location', 'Local_Authority_(Highway)', 'Time', 'Date'], axis=1, inplace=True)
    
    # drop rows with nan values 
    all_data.dropna(inplace=True)

    # serialize clean data to output directory
    with open(f'{data_path}/clean_data','wb') as f:
        pickle.dump((all_data),f)
        
    
    return (print('Done!'))


In [None]:
preprocess(output_dir)

In [None]:
# create training and prediction function

def train_predict(data_path):
    
    # import Library
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn==0.22'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas==0.23.4'])
    
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import  f1_score
    
    # deserialize clean data from output directory
    with open(f'{data_path}/clean_data','rb') as f:
        all_data = pickle.load(f)
    
    
    
    # create features and targets
    X = all_data.drop(columns=['Accident_Index', 'Accident_Severity'])
    y = all_data['Accident_Severity']
    
    # split data based on y categories
    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y, random_state=1)
    
    # import model
    RFC = RandomForestClassifier(random_state=1)

    #fit train set
    RFC.fit(x_train, y_train)

    # predict test set
    RFC_pred = RFC.predict(x_test)

    # accuracy of test set f1-score
    RFC_f1 = f1_score(y_test, RFC_pred,average='micro')
    print('RFC f1_score: {}'.format(RFC_f1))
    
    # write predictions to results.txt
    with open(f'{data_path}/results.txt','w') as result:
        result.write(f'Prediciton: {RFC_pred} | Actual {y_test}')
    
    
    
    return(print('Done!'))

In [None]:
train_predict(output_dir)

In [None]:
# create light weight components

preprocess_op = comp.func_to_container_op(preprocess)#, base_image="tensorflow/tensorflow:latest-gpu-py3")
train_predict_op = comp.func_to_container_op(train_predict)#, base_image="tensorflow/tensorflow:latest-gpu-py3")


In [None]:
# create client that would enable communication with the Pipelines API server 
client = kfp.Client()


In [None]:
#get_ipython().system('which dsl-compile')


In [None]:
# define pipeline
@dsl.pipeline(name="Road Safety ML Pipeline", description="Performs Preprocessing, training and prediction")

# Define parameters to be fed into pipeline
def road_safety_pipeline(data_path: str ):
    
    # Define volume to share data between components.
    vop = dsl.VolumeOp(
    name="create_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO) #RWO

    # Create preprocess components.
    road_safety_preprocess_container = preprocess_op(data_path).add_pvolumes({data_path: vop.volume})

    # Create train&prediction component.
    road_safety_train_predict_container = train_predict_op(data_path).add_pvolumes({data_path: road_safety_preprocess_container.pvolume})


    # Print the result of the prediction
    road_safety_result_container = dsl.ContainerOp(
            name="print_prediction",
            image='library/bash:4.4.23', # 'gcr.io/kubeflow-images-public/tensorflow-2.1.0-notebook-gpu:1.0.0'
            pvolumes={data_path: road_safety_train_predict_container.pvolume},
            arguments=['cat', f'{data_path}/results.txt']
    )

In [None]:
DATA_PATH = '/home/jovyan/Road-Safety-OSP/data/clean_data'


pipeline_func = road_safety_pipeline

experiment_name = 'road_safety_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)
