In [195]:
import kfp.components as comp
from kfp.components import InputPath, OutputPath
import kfp.dsl as dsl
from typing import NamedTuple
from itertools import product

In [196]:
####### Download Data func ###########
def download_data(bucket: str, training_data_path: str, output_data_path: OutputPath('CSV')):
    import boto3
    s3 = boto3.client("s3", region_name='ap-south-1')
    s3.download_file(bucket, training_data_path, output_data_path)

In [197]:
###### Make a component of download_data function ################
download_data_op = comp.func_to_container_op(download_data,
                                             base_image='python:3.7-slim',
                                             output_component_file='components/download_data.yaml',
                                             packages_to_install=['boto3'])

In [198]:
########## Normalize Data func ################################
def split_data(
    input_data_path: InputPath("CSV"), 
    output_training_data: OutputPath("CSV"), 
    output_training_labels: OutputPath("CSV"), 
    output_test_data: OutputPath("CSV"), 
    output_test_labels: OutputPath("CSV")
):
    import pandas as pd
    df_train = df = pd.read_csv(input_data_path, sep=',', header=0)
    
    x = df_train.iloc[:, 1:]
    y = df_train['label'].tolist()

    # # Select 10000 rows data as a testing dataset
    x_test = x.iloc[0:10000, :].values.astype('float32') # all pixel values 
    y_test = y[0:10000] # Select label for testing data
    x_train = x.iloc[10000:, :].values.astype('float32') # all pixel values 
    y_train = y[10000:]
    
    pd.DataFrame(x_train).to_csv(output_training_data, index=False)
    pd.DataFrame(y_train).to_csv(output_training_labels, index=False)
    
    pd.DataFrame(x_test).to_csv(output_test_data, index=False)
    pd.DataFrame(y_test).to_csv(output_test_labels, index=False)
    print("ALL DONE")

In [199]:
split_data_op = comp.func_to_container_op(split_data, 
                                      base_image='python:3.7-slim',
                                      output_component_file='components/split_data.yaml',
                                      packages_to_install=['pandas'])

In [200]:
def normalize_data(
    training_data_path: InputPath('CSV'), 
    test_data_path: InputPath('CSV'), 
    normalized_training_data: OutputPath('CSV'), 
    normalized_test_data: OutputPath('CSV')
):
    import pandas as pd
    
    x_train = pd.read_csv(training_data_path)
    x_test = pd.read_csv(test_data_path)
    
    x_train = x_train/255.0
    x_test = x_test/255.0
    
    x_train.to_csv(normalized_training_data, index=False)
    x_test.to_csv(normalized_test_data, index=False)
    
    print("ALL DONE")

In [201]:
normalize_data_op = comp.func_to_container_op(normalize_data,
                                          base_image='python:3.7-slim',
                                          output_component_file='components/normalize_data',
                                          packages_to_install=['pandas'])

In [202]:
def train(
    n_estimators: int, 
    depth: int, 
    random_state: int, 
    mlflow_tracking_uri: str,
    mlflow_experiment_name: str,
    model_name: str,
    training_data_path: InputPath('CSV'), 
    training_labels_path: InputPath('CSV'), 
    test_data_path: InputPath('CSV'),
    test_labels_path: InputPath('CSV')
    ):
    
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    
    x_train = pd.read_csv(training_data_path)
    y_train = pd.read_csv(training_labels_path)['0'].tolist()
    
    
    x_test = pd.read_csv(test_data_path)
    y_test = pd.read_csv(test_labels_path)['0'].tolist()
    
    print(x_train, y_train, x_test, y_test)
    model_clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=depth, random_state=random_state)

    # Train the Random Forest algorithm
    model_clf.fit(x_train, y_train)
    
    # validate
    y_pred = model_clf.predict(x_test)
    
    # calculate accuracy
    total = len(y_pred)
    wrong = 0
    for pred, truth in zip(y_pred, y_test):
        wrong += 1 if pred != truth else 0

    accuracy = ((total - wrong)/total) * 100.0

    ########## log model ####################
    import mlflow

    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment(mlflow_experiment_name)

    with mlflow.start_run() as run:
        mlflow.log_param("n_estimators", n_estimators)
        mlflow.log_param("max_depth", depth)
        mlflow.log_param("random_state", random_state)
        mlflow.log_metric("accuracy", accuracy)

        mlflow.sklearn.log_model(model_clf, "model", registered_model_name=model_name)

        model_artifact_location = run.info.artifact_uri + "/model"
        logged_model = 'runs:/'+run.info.run_id+'/model'

    return { 'accuracy': accuracy, 'model_path': logged_model }

In [203]:
train_op = comp.func_to_container_op(train,
                                          base_image='python:3.7',
                                          output_component_file='components/train',
                                          packages_to_install=['mlflow', 'pandas', 'sklearn', 'boto3'])

In [204]:
########## Define Pipeline ######################
@dsl.pipeline(
    name='MNIST Random Forest Pipeline - 2',
    description='Training pipeline for time series forecasting on household power consumption dataset.'

)
def training_pipeline(bucket, path):
    # Returns a dsl.ContainerOp class instance.
    download_data_task = download_data_op(
        bucket, path).set_display_name('Download Raw Data')
    
    key_output = str(list(download_data_task.outputs.keys())[0])

    split_data_task = split_data_op(download_data_task.outputs[key_output]).after(
        download_data_task).set_display_name('Split Data')

    training_data_path = split_data_task.outputs['output_training_data']
    test_data_path = split_data_task.outputs['output_test_data']
    training_labels_path = split_data_task.outputs['output_training_labels']
    test_labels_path = split_data_task.outputs['output_test_labels']
    
    normalize_data_task = normalize_data_op(training_data_path, test_data_path).after(split_data_task).set_display_name('Normalize Data')
    
    normalized_training_data_path = normalize_data_task.outputs['normalized_training_data']
    normalized_test_data_path = normalize_data_task.outputs['normalized_test_data']
    
    mlflow_tracking_uri = "http://mlflow-service.kubeflow.svc.cluster.local:80"
    mlflow_experiment_name = "mnist-rf"
    model_name = "MNIST-RF"
    
    ################################################################
    params = [(100, 1, 0), (500, 4, 3), (800, 3, 0), (50, 4, 0)]
    
    task_num = 0
    
    train_output = {}
    for model_num, (n_estimators, depth, random_state) in enumerate(params):
        task_num += 1
        train_model_task = train_op(
            n_estimators, 
            depth, 
            random_state, 
            mlflow_tracking_uri, 
            mlflow_experiment_name, 
            model_name, 
            normalized_training_data_path,
            training_labels_path,
            normalized_test_data_path,
            test_labels_path
        ).after(normalize_data_task).set_display_name('Model Training: '+str(model_num)).set_memory_request('1G').set_cpu_request('0.5')
        
        train_output['Model Training: '+str(model_num)] = train_model_task.outputs

In [192]:
#################### COMPILE AND TEST PIPELINE DEFINITION (OPTIONAL) ########################
import kfp
kfp.compiler.Compiler().compile(training_pipeline, 'workflow.yaml')

In [205]:
####################### Run Pipeline ############################################
arguments = {"bucket": "kubeflow-ds-test-raw-lake", "path": "data/train.csv"}

authservice_session='authservice_session=MTYzNzcyODcwNnxOd3dBTkVKSVJWTmFWVXBLUmxWUE5FWTNRbFpDUlVKVVVsTktWa05KTlZWSldGQlNRa2hSTWs5T1UwTXlUMEZRU0U1WFZGRTJOVUU9fC6CMExKrebI0JWAtEldZYXftE7SRLX4E7wKdIeWbh_3'
client = kfp.Client(host='http://af498242-istiosystem-istio-2af2-1562593475.ap-south-1.elb.amazonaws.com/pipeline', cookies=authservice_session)
namespace='nikunj-sharma'

# Submit a pipeline run
client.create_run_from_pipeline_func(
    training_pipeline, arguments=arguments, namespace=namespace, experiment_name='MNIST-RF-EXP'
)

RunPipelineResult(run_id=917885a2-259e-49fb-a7bc-ad0ae9e34a9c)

In [194]:
################## SAVE PIPELINE On KUBEFLOW ###############################################
client.upload_pipeline(pipeline_package_path='workflow.yaml',
                             pipeline_name='MNIST RF Pipeline')

{'created_at': datetime.datetime(2021, 11, 24, 6, 56, 34, tzinfo=tzlocal()),
 'default_version': {'code_source_url': None,
                     'created_at': datetime.datetime(2021, 11, 24, 6, 56, 34, tzinfo=tzlocal()),
                     'id': '27f0ead0-adc4-4250-9efd-86c112ed595f',
                     'name': 'MNIST RF Pipeline',
                     'package_url': None,
                     'parameters': [{'name': 'bucket', 'value': None},
                                    {'name': 'path', 'value': None}],
                     'resource_references': [{'key': {'id': '27f0ead0-adc4-4250-9efd-86c112ed595f',
                                                      'type': 'PIPELINE'},
                                              'name': None,
                                              'relationship': 'OWNER'}]},
 'description': None,
 'error': None,
 'id': '27f0ead0-adc4-4250-9efd-86c112ed595f',
 'name': 'MNIST RF Pipeline',
 'parameters': [{'name': 'bucket', 'value': None},
     