### Iris Classfier - Machine Learning Pipeline

#### 1. Import the required libraries

In [1]:
from kfp.dsl import component, pipeline
import kfp
from kfp import kubernetes

#### 2. Data Preparation

In [2]:
@component(
    packages_to_install=["pandas", "numpy", "scikit-learn"],
    base_image="python:3.9"
)
def prepare_data(data_path: str):
    import pandas as pd
    import os
    from sklearn import datasets
    
    iris = datasets.load_iris()
    df = pd.DataFrame(iris.data, columns=iris.feature_names)
    df['species'] = iris.target
    
    df = df.dropna()
    df.to_csv(f'{data_path}/final_df.csv', index=False)

#### 3. Split data into Train and Test set

In [3]:
@component(
    packages_to_install=["pandas", "numpy", "scikit-learn"],
    base_image="python:3.9",
)
def train_test_split(data_path: str):    
    import pandas as pd
    import numpy as np
    import os
    from sklearn.model_selection import train_test_split
    
    final_data = pd.read_csv(f'{data_path}/final_df.csv')
    
    target_column = 'species'
    X = final_data.loc[:, final_data.columns != target_column]
    y = final_data.loc[:, final_data.columns == target_column]
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3,stratify = y, random_state=47)
    
    np.save(f'{data_path}/X_train.npy', X_train)
    np.save(f'{data_path}/X_test.npy', X_test)
    np.save(f'{data_path}/y_train.npy', y_train)
    np.save(f'{data_path}/y_test.npy', y_test)

#### 4. Model training

In [4]:
@component(
    packages_to_install=["pandas", "numpy", "scikit-learn"],
    base_image="python:3.9",
)
def training_basic_classifier(data_path: str):
    import pandas as pd
    import numpy as np
    import os
    from sklearn.linear_model import LogisticRegression
    
    X_train = np.load(f'{data_path}/X_train.npy',allow_pickle=True)
    y_train = np.load(f'{data_path}/y_train.npy',allow_pickle=True)
    
    classifier = LogisticRegression(max_iter=500)
    classifier.fit(X_train,y_train)
    import pickle
    with open(f'{data_path}/model.pkl', 'wb') as f:
        pickle.dump(classifier, f)

#### 5. Model provisioning

In [5]:
@component(
    packages_to_install=["pandas", "numpy", "scikit-learn", "mlflow", "boto3"],
    base_image="python:3.9",
)
def register_model(data_path: str, aws_access_key_id: str, aws_secret_access_key: str, aws_default_region: str) -> dict:
    import pandas as pd
    import numpy as np
    import pickle
    import os
    import mlflow
    from mlflow.models import infer_signature
    from sklearn import datasets
   
    with open(f'{data_path}/model.pkl','rb') as f:
        logistic_reg_model = pickle.load(f)
    
    # Infer the model signature
    X_test = np.load(f'{data_path}/X_test.npy', allow_pickle=True)
    y_pred = logistic_reg_model.predict(X_test)
    signature = infer_signature(X_test, y_pred)
    
    # Set AWS credentials in the environment
    os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key_id
    os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key
    os.environ["AWS_DEFAULT_REGION"] = aws_default_region
    
    # log and register the model using MLflow scikit-learn API
    mlflow.set_tracking_uri("http://mlflowserver.kubeflow:5000")
    reg_model_name = "SklearnLogisticRegression"
    
    experiment_id = mlflow.create_experiment("test-1") 
    
    with mlflow.start_run(experiment_id=experiment_id) as run:
        mlflow.log_param('max_iter', 500)

        # Log model artifact to S3
        artifact_path = "sklearn-model"      
        mlflow.log_artifact(local_path=f'{data_path}/model.pkl', artifact_path=artifact_path)
        
        model_info = mlflow.sklearn.log_model(
            sk_model=logistic_reg_model,
            artifact_path="sklearn-model",
            signature=signature,
            registered_model_name=reg_model_name,
        )
    
    model_uri = f"runs:/{run.info.run_id}/sklearn-model"
    
    # Register model linked to S3 artifact location          
    mlflow.register_model(
        model_uri,
        reg_model_name
    )

    return {"artifact_path": artifact_path, "artifact_uri": run.info.artifact_uri}

#### 6. Model evaluation

In [6]:
@component(
    packages_to_install=["pandas", "numpy", "scikit-learn", "mlflow", "boto3"],
    base_image="python:3.9",
)
def predict_on_test_data(data_path: str, model_info: dict, aws_access_key_id: str, aws_secret_access_key: str, aws_default_region: str) -> str:
    import pandas as pd
    import numpy as np
    import pickle
    import os
    import mlflow
    
    # Set AWS credentials in the environment
    os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key_id
    os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key
    os.environ["AWS_DEFAULT_REGION"] = aws_default_region
    
    artifact_path = model_info["artifact_path"]
    artifact_uri = model_info["artifact_uri"]
    
    mlflow.set_tracking_uri("http://mlflowserver.kubeflow:5000")
    model_uri = f"{artifact_uri}/{artifact_path}"
    logistic_reg_model = mlflow.sklearn.load_model(model_uri)
        
    X_test = np.load(f'{data_path}/X_test.npy',allow_pickle=True)
    y_pred = logistic_reg_model.predict(X_test)
    np.save(f'{data_path}/y_pred.npy', y_pred)
    
    X_test = np.load(f'{data_path}/X_test.npy',allow_pickle=True)
    y_pred_prob = logistic_reg_model.predict_proba(X_test)
    np.save(f'{data_path}/y_pred_prob.npy', y_pred_prob)
    
    return model_uri

#### 7. Model deployment

In [7]:
@component(
    packages_to_install=["kserve"],
    base_image="python:3.9",
)
def model_serving(model_uri: str):
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec
    import os

    namespace = utils.get_default_target_namespace()
    
    name='sklearn-iris-v2'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                   service_account_name='mlflow-sa',
                                   sklearn=(V1beta1SKLearnSpec(storage_uri=model_uri)))))

#### 8. The complete pipeline

In [None]:
from kubernetes import client, config
import base64

@pipeline(
    name="iris-pipeline",
)
def iris_pipeline(data_path: str):
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-iris-mlflow-pvc',
        access_modes=['ReadWriteMany'],
        size='1Mi',
        storage_class_name='standard'
    )
    
    # Load Kubernetes configuration
    config.load_kube_config()

    # Fetch the AWS credentials from the secret
    secret_name = "aws-credentials"
    secret_namespace = "kubeflow"
    secret_key_id = "AWS_ACCESS_KEY_ID"
    secret_key_access = "AWS_SECRET_ACCESS_KEY"
    secret_region = "AWS_DEFAULT_REGION"

    v1 = client.CoreV1Api()
    secret = v1.read_namespaced_secret(secret_name, namespace=secret_namespace)
    
    # Convert bytes to string
    aws_access_key_id = base64.b64decode(secret.data[secret_key_id]).decode('utf-8')
    aws_secret_access_key = base64.b64decode(secret.data[secret_key_access]).decode('utf-8')
    aws_default_region = base64.b64decode(secret.data[secret_region]).decode('utf-8')
    
    prepare_data_task = prepare_data(data_path=data_path)
    kubernetes.mount_pvc(prepare_data_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    
    train_test_split_task = train_test_split(data_path=data_path) 
    kubernetes.mount_pvc(train_test_split_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    train_test_split_task.after(prepare_data_task)
    
    training_basic_classifier_task = training_basic_classifier(data_path=data_path)
    kubernetes.mount_pvc(training_basic_classifier_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    training_basic_classifier_task.after(train_test_split_task)
    
    register_model_task = register_model(data_path=data_path, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_default_region=aws_default_region)
    kubernetes.mount_pvc(register_model_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    kubernetes.mount_pvc(register_model_task, pvc_name="mlflow-pvc", mount_path='/opt/mlflow/')
    register_model_task.after(training_basic_classifier_task)
    
    predict_on_test_data_task = predict_on_test_data(data_path=data_path, model_info=register_model_task.output, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_default_region=aws_default_region)
    kubernetes.mount_pvc(predict_on_test_data_task, pvc_name=pvc1.outputs['name'], mount_path='/data')
    predict_on_test_data_task.after(register_model_task)
    
    model_serving_task = model_serving(model_uri=predict_on_test_data_task.output)
    model_serving_task.after(predict_on_test_data_task)
    
    delete_pvc1 = kubernetes.DeletePVC(pvc_name=pvc1.outputs['name']).after(model_serving_task)

#### 9. Initiate ML Pipeline

In [None]:
from kfp import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=iris_pipeline,
    package_path='iris_mlflow_kserve_pipeline.yaml'
)

client = kfp.Client(host='http://localhost:8080')
client.create_run_from_pipeline_func(
    iris_pipeline, arguments={
        'data_path': '/data'
    }, enable_caching=True)