In [None]:
!pip install kfp

In [None]:
import kfp
import kfp.components as comp
import requests
import kfp.dsl as dsl

def prepare_data():
  import pandas as pd
  df = pd.read_csv("https://raw.githubusercontent.com/plotly/datasets/master/iris-data.csv")
  df = df.dropna()
  df.to_csv("data/final_df.csv", index=False)

def split_data():
  import pandas as pd
  import numpy as np
  from sklearn.model_selection import train_test_split
  data = pd.read_csv("data/final_df.csv")
  target_feature = "class"
  X = data.loc[:, data.columns != target_feature]
  y = data.loc[:, data.columns == target_feature]

  x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y, random_state=47)
  print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)

  np.save('data/x_train.npy', x_train)
  np.save('data/x_test.npy', x_test)
  np.save('data/y_train.npy', y_train)
  np.save('data/y_test.npy', y_test)

def training():
  import pandas as pd
  import numpy as np
  from sklearn.linear_model import LogisticRegression

  x_train = np.load('data/x_train.npy', allow_pickle=True)
  y_train = np.load('data/y_train.npy', allow_pickle=True)

  classifier = LogisticRegression(max_iter=500)
  classifier.fit(x_train,y_train)

  import pickle
  with open('data/model.pkl','wb') as f:
    pickle.dump(classifier, f)

def predict_test_data():
  import pandas as pd
  import numpy as np
  import pickle
  
  with open('data/model.pkl','rb') as f:
    lr_model = pickle.load(f)
  
  x_test = np.load('data/x_test.npy', allow_pickle = True)

  y_pred = lr_model.predict(x_test)
  print(y_pred)

  np.save('data/y_predict_model.npy', y_pred)

def predict_proba_test_data():
  import pandas as pd
  import numpy as np
  import pickle
  
  with open('data/model.pkl','rb') as f:
    lr_model = pickle.load(f)
  
  x_test = np.load('data/x_test.npy', allow_pickle = True)

  y_pred_proba = lr_model.predict_proba(x_test)
  print(y_pred_proba)

  np.save('data/y_predict_proba_model.npy', y_pred_proba)

def metrics():
  import pandas as pd
  import numpy as np
  from sklearn import metrics
  from sklearn.metrics import accuracy_score, precision_score, recall_score, log_loss, roc_auc_score

  y_test = np.load('data/y_test.npy', allow_pickle=True)
  y_pred = np.load('data/y_predict_model.npy', allow_pickle=True)
  y_pred_proba = np.load('data/y_predict_proba_model.npy', allow_pickle=True)

  acc = accuracy_score(y_test,y_pred)
  recall = recall_score(y_test, y_pred, average='micro')
  prec = precision_score(y_test, y_pred, average='micro')
  entropy = log_loss(y_test, y_pred_proba)

  print("\nMetrics :", {"accuracy": round(acc,2), "recall": round(recall,2), "precision": round(prec,2), "log loss": round(entropy,2)})

  print(metrics.classification_report(y_test, y_pred))


create_component_prepare_data = kfp.components.create_component_from_func(
    func = prepare_data,
    base_image = 'python:3.8.10',
    packages_to_install = ['pandas == 1.3.5','numpy == 1.22.4'])
create_component_split_data = kfp.components.create_component_from_func(
    func = split_data,
    base_image = 'python:3.8.10',
    packages_to_install = ['pandas == 1.3.5','numpy == 1.22.4','scikit-learn == 1.2.1']
)
create_component_training = kfp.components.create_component_from_func(
    func = training,
    base_image = 'python:3.8.10',
    packages_to_install = ['pandas == 1.3.5','numpy == 1.22.4','scikit-learn == 1.2.1']
)
create_component_predict_test_data = kfp.components.create_component_from_func(
    func = predict_test_data,
    base_image = 'python:3.8.10',
    packages_to_install = ['pandas == 1.3.5','numpy == 1.22.4','scikit-learn == 1.2.1']
)
create_component_predict_proba_test_data = kfp.components.create_component_from_func(
    func = predict_proba_test_data,
    base_image = 'python:3.8.10',
    packages_to_install = ['pandas == 1.3.5','numpy == 1.22.4','scikit-learn == 1.2.1']
)
create_component_metrics = kfp.components.create_component_from_func(
    func = metrics,
    base_image = 'python:3.8.10',
    packages_to_install = ['pandas == 1.3.5','numpy == 1.22.4','scikit-learn == 1.2.1']
)

@dsl.pipeline(
    name = " Logistic Regression model classifier for IRIS dataset",
    description = "simple kubeflow pipeline"
)

def iris_lr_classifier(data_path: str):
  vop = dsl.VolumeOp(
      name = "t-vol",
      resource_name = 't-vol',
      size = '200M',
      modes = dsl.VOLUME_MODE_RWO)
  
  prepare_data_task = create_component_prepare_data.add_pvolumes({data_path: vop.volume})
  split_task = create_component_split_data.add_pvolumes({data_path: vop.volume}).after(prepare_data_task)
  classifier_training = create_component_training.add_pvolumes({data_path: vop.volume}).after(split_task)
  log_predicted_class = create_component_predict_test_data.add_pvolumes({data_path: vop.volume}).after(classifier_training)
  log_predicted_probabilities = create_component_predict_proba_test_data.add_pvolumes({data_path: vop.volume}).after(log_predicted_class)
  log_metrics_task = create_component_metrics.add_pvolumes({data_path: vop.volume}).after(log_predicted_probabilities)


  prepare_data_task.execution_options.caching_strategy.max_cache_staleness = 'P0D'
  split_task.execution_options.caching_strategy.max_cache_staleness = 'P0D'
  classifier_training.execution_options.caching_strategy.max_cache_staleness = 'P0D'
  log_predicted_class.execution_options.caching_strategy.max_cache_staleness = 'P0D'
  log_predicted_probabilities.execution_options.caching_strategy.max_cache_staleness = 'P0D'
  log_metrics_task.execution_options.caching_strategy.max_cache_staleness = 'P0D'

kfp.compiler.Compiler.compile(pipeline_func=iris_lr_classifier,
                              package_path='data/lr_iris_classifier_pipeline1.yaml'
)

cleint = kfp.Client()

DATA_PATH = '/data'

import datetime
print(datetime.datetime.now().date())


pipeline_func = iris_lr_classifier

experiment_name = 'iris_classifier_experiment_'+ str(datetime.datetime.now().date())

run_name = pipeline_func.__name__+' run'

namespace = 'kubeflow'

arguments = {"data path": DATA_PATH}

kfp.compiler.Compiler.compile(
    pipeline_func,
    '{}.zip'.format(experiment_name))

run_result = client.create_run_from_pipeline_func(pipeline_func,
                                                  experiment_name = experiment_name,
                                                  run_name = run_name,
                                                  arguments = arguments)
