In [None]:
import kfp 
import kserve 
from kfp import dsl 
from kfp.components import func_to_container_op, OutputPath, InputPath
from kfp import components
import os 
from functools import partial
# https://www.kubeflow.org/docs/distributions/azure/azureendtoend/
# https://medium.com/kubeflow/an-end-to-end-ml-pipeline-on-prem-notebooks-kubeflow-pipelines-on-the-new-minikf-33b7d8e9a836
 

In [None]:
@partial(
    kfp.create_component_from_func,
    base_image="python:3.8",
    packages_to_install=["pandas", "scikit-learn"]
)
def process(train_x_path: OutputPath("pickle"),
 train_y_path: OutputPath("pickle"), 
 test_x_path: OutputPath("pickle"), 
 test_y_path: OutputPath("pickle")):
    import pandas as pd 
    from sklearn.datasets import load_boston
    import os, pickle
    from sklearn.model_selection import train_test_split

    boston_df = load_boston()
    boston = pd.DataFrame(boston_df.data, columns=boston_df.feature_names)
    
    X=boston.iloc[:,0:-1]
    y=boston.iloc[:,-1]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=42)

    # output the data to the load_data_path
    with open(train_x_path, "wb") as f:
        pickle.dump(X_train, f)
    with open(train_y_path, "wb") as f:
        pickle.dump(y_train, f)
    with open(test_x_path, "wb") as f:
        pickle.dump(X_test, f)
    with open(test_y_path, "wb") as f:
        pickle.dump(y_test, f)
    

In [None]:
# train step 
@partial(
    kfp.create_component_from_func,
    base_image="python:3.8",
    packages_to_install = ["pandas", "scikit-learn", "mlflow", "joblib"],
)

def train(train_x_path: InputPath("pickle"),
 train_y_path: InputPath("pickle"), 
 test_x_path: InputPath("pickle"),
 test_y_path: InputPath("pickle"),
 model_path: OutputPath("dump")): # input stuff about our model env here too? 
    import pandas as pd 
    import os, pickle
    from sklearn.tree import DecisionTreeRegressor
    from joblib import dump
    import sklearn.metrics as metrics
    #import mlflow 

    with open(train_x_path, mode = "rb") as f:
        X_train = pickle.load(f)
    with open(train_y_path, mode = "rb") as f:
        y_train = pickle.load(f)
    with open(test_x_path, mode = "rb") as f:
        X_test = pickle.load(f)
    with open(test_y_path, mode = "rb") as f:
        y_test = pickle.load(f)

    # train our model 
    regressor = DecisionTreeRegressor(random_state=42, max_depth=5)
    
    regressor.fit(X_train, y_train)
    
    with open(model_path, mode = "wb") as f:
        dump(regressor, f)
    #dump(regressor, 'boston_model.joblib')

    # evaluate our model
    y_pred = regressor.predict(X_test)
    #metrics.r2_score(y_test, y_pred)
    sk_mse = metrics.mean_squared_error(y_test, y_pred)
    # output here to compare so that we can then decide which to use


In [None]:
# challenger model 
@partial(
    kfp.create_component_from_func,
    base_image="python:3.8",
    packages_to_install = ["pandas", "lightgbm", "scikit-learn", "mlflow", "joblib"],
)

def train(train_x_path: InputPath("pickle"),
train_y_path: InputPath("pickle"),
test_x_path: InputPath("pickle"),
test_y_path: InputPath("pickle"), 
model_path: OutputPath("dump")):
    import pandas as pd 
    import os, pickle
    from sklearn.tree import DecisionTreeRegressor
    import lightgbm as lgb
    from joblib import dump
    import sklearn.metrics as metrics
    #import mlflow 

    with open(train_x_path, mode = "rb") as f:
        X_train = pickle.load(f)
    with open(train_y_path, mode = "rb") as f:
        y_train = pickle.load(f)
    with open(test_x_path, mode = "rb") as f:
        X_test = pickle.load(f)
    with open(test_y_path, mode = "rb") as f:
        y_test = pickle.load(f)

    train_set = lgb.Dataset(X_train, y_train)
    lgb_eval = lgb.Dataset(X_test, y_test, reference=train_set)
    # define our parameters 
    params = {
    'task': 'train', 
    'boosting': 'gbdt',
    'objective': 'regression',
    'num_leaves': 10,
    'learnnig_rage': 0.05,
    'metric': {'l2','l1'},
    'verbose': -1
    }
    model = lgb.train(params, train_set, 
    num_boost_round=100, 
    valid_sets = lgb_eval,
    early_stopping_rounds=10)

# https://www.datatechnotes.com/2022/03/lightgbm-regression-example-in-python.html

    with open(model_path, mode = "wb") as f:
        dump(model, f)

    # evaluate our model
    y_pred = model.predict(X_test)
    #metrics.r2_score(y_test, y_pred)
    lgb_mse = metrics.mean_squared_error(y_test, y_pred)
    # output here to compare so that we can then decide which to use
    

In [None]:
# model evaluation and interpretation step 
@partial(
    kfp.create_component_from_func,
    base_image="python:3.8",
    packages_to_install = ["pandas", "scikit-learn", "mlflow", "joblib", "shap", "matplotlib"],
)

def evaluate(model_path: InputPath("dump"),
    test_x_path: InputPath("pickle"),
    test_y_path: InputPath("pickle"),
    lime_path: OutputPath("pickle")):
        import pandas as pd 
        import os, pickle
        from sklearn.tree import DecisionTreeRegressor
        from joblib import dump, load
        import sklearn.metrics as metrics
        import matplotlib.pyplot as plt
        import lime
        import lime.lime_tabular
    
        with open(model_path, mode="rb") as file_reader:
            model = load(file_reader)
        with open(test_x_path, mode = "rb") as f:
            X_test = pickle.load(f)
        with open(test_y_path, mode = "rb") as f:
            y_test = pickle.load(f)

        # evaluate the model 
        explainer = lime.lime_tabular.LimeTabularExplainer(train, feature_names=boston.feature_names, class_names=['price'], categorical_features=categorical_features, verbose=True, mode='regression')

        i = 25
        exp = explainer.explain_instance(X_test[i], model.predict, num_features=5)

        exp.as_list()

        with open(lime_path, mode="wb") as file_writer:
            #pickle.dump(exp, file_writer)
            exp.save_to_file(file_writer)


In [None]:
# upload step - also upload model artifact to our location - s3 bucket or something 
@partial(
    kfp.create_component_from_func,
    base_image="python:3.8",
    packages_to_install = ["kserve", "kubernetes", "mlflow", "mlflow", "joblib", "boto3"],
)

def upload_mod(model_name: str,
    model_path: InputPath("dump"),
    ):
    import os, pickle
    import kserve
    from kserve import KServeClient
    from kserve import constants
    import mlflow 
    from mlflow.tracking.client import MlflowClient
    import boto3
    import joblib

    os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://mlflow-minio.mlflow-system.svc:9000"
    os.environ["AWS_ACCESS_KEY_ID"] = secret["AWS_ACCESS_KEY_ID"]
    os.environ["AWS_SECRET_ACCESS_KEY"] = secret["AWS_SECRET_ACCESS_KEY"]

    client = MlflowClient("http://mlflow-service.mlflow-system.svc:5000")
    mlflow.set_tracking_uri("http://mlflow-service.mlflow-system.svc:5000")
    with open(model_path, mode="rb") as file_reader:
        model = joblib.load(file_reader)
    mlflow.pyfunc.save_model(python_model = model, 
        path = model_name, conda_env = conda_env,) # check this part 
    
    run = client.create_run(experiment_id="0") # create run
    client.log_artifact(run.info.run_id, model_name)
    with mlflow.start_run(run_name=run.info.run_id) as run_: # regist model
        mlflow.pyfunc.log_model(
            python_model=model,
            artifact_path=model_name,
            registered_model_name="BCI-Model",
        )

    # how to upload mlflow model to s3 bucket?
    
    
    


In [None]:
# kserve portion 
@ partial(
    kfp.create_component_from_func,
    base_image="python:3.8",
    packages_to_install = ["kserve", "kubernetes", "mlflow", "mlflow", "joblib", "boto3"],
)
def serve_model(model_name: str,
model_path: InputPath("s3"), 
namespace: str = "kserve-test",
action: str = "apply",
framework: str = "sklearn",
):
    import os, pickle
    import kserve
    from kserve import KServeClient
    from kserve import constants
    from kfp import components

    
    kserve_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/'
                                               'master/components/kserve/component.yaml')


    kserve_op(action = action, model_name = model_name, model_uri = model_path, namespace = namespace, framework = framework).set_image_pull_policy('Always')



In [None]:
# end to end pipeline definition - may need to adjust some of this 
@dsl.pipeline(
    name = "end to end pipeline",
    description = "end to end pipeline that ..."
)
# https://github.com/jerife/MLOps-on-kubernetes/blob/main/pipeline.py
def end_to_end_pipeline():
    data_process = dsl.ContainerOp(
        name = "data_processing",
        image = "jacobkun/red_etl:alpha",
        command = ["python", "data_process.py"],

    )

    model_train = dsl.ContainerOp(
        name = "model_training",
        image = "jacobkun/model_train:alpha",
        command = ["python", "model_train.py"],
    )

    model_upload = dsl.ContainerOp(
        name = "model_upload",
        image = "jacobkun/model_upload:alpha",
        command = ["python", "model_upload.py"],
    )

    serve_model = dsl.ContainerOp(
        name = "model_deploy",
        image = "jacobkun/model_deploy:alpha",
        command = ["python", "model_deploy.py"],
    )

    # this will be for performance testing etc. - still need to think through this part a bit more..
    model_check = dsl.ContainerOp(
        name = "model_check",
        image = "jacobkun/model_check:alpha",
        command = ["python", "model_check.py"],
    )


In [None]:
# submit the pipeline 
if __name__ == "__main__":
    kfp.compiler.Compiler().compile(end_to_end_pipeline, "end_to_end.yaml")

    # Submit the pipeline to the client
    with open(os.environ['KF_PIPELINES_SA_TOKEN_PATH'], "r") as f:
        TOKEN = f.read()

    client = kfp.Client(
        host='http://ml-pipeline.kubeflow.svc.cluster.local:8888',
        existing_token=TOKEN)
    # create our experiment 
    client.create_experiment("end_to_end")
    client.create_run_from_pipeline_func(end_to_end_pipeline, arguments = {}, experiment_name = "end_to_end")
