In [1]:
import kfp
import kfp.components as comp
import requests
import kfp.dsl as dsl

In [2]:
!pip show kfp

Name: kfp
Version: 1.6.3
Summary: KubeFlow Pipelines SDK
Home-page: 
Author: google
Author-email: 
License: 
Location: /opt/conda/lib/python3.8/site-packages
Requires: absl-py, click, cloudpickle, Deprecated, docstring-parser, fire, google-api-python-client, google-auth, google-cloud-storage, jsonschema, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, PyYAML, requests-toolbelt, strip-hints, tabulate
Required-by: 


In [3]:
def prepare_data():
    import pandas as pd
    print("---- Inside prepare_data component ----")
    # Load dataset
    df = pd.read_csv("https://aiapstorage.blob.core.windows.net/kfdemo/winequality.csv?sp=r&st=2023-05-15T05:55:30Z&se=2023-06-05T13:55:30Z&spr=https&sv=2022-11-02&sr=b&sig=ST5veeeeir8vcLs8Rj0AucrNrsSmUyMEbxPwHZ3%2FL3g%3D")
    df = df.dropna()
    df.to_csv(f'data/final_df.csv', index=False)
    print("\n ---- data csv is saved to PV location /data/final_df.csv ----")

In [22]:
def split_and_save_data(config_path=f'data/final_df.csv',split_ratio=0.3,random_state=100,
                        train_data_path=f"data/processed/train_winequality.csv",test_data_path=f"data/processed/test_winequality.csv"):
    import pandas as pd 
    from sklearn.model_selection import train_test_split
    import os
    df = pd.read_csv(config_path, sep=",")
    train, test =train_test_split(
        df,
        test_size=split_ratio,
        random_state=random_state
    )
    os.makedirs("data/processed/", exist_ok=True)
    train.to_csv(train_data_path, index=False, encoding= "utf-8", sep= ",")
    test.to_csv(test_data_path, index=False, encoding= "utf-8", sep= ",")

In [53]:
def train_and_evaluate(config_path=f'data/final_df.csv',train_data_path=f"data/processed/train_winequality.csv",
                       test_data_path=f"data/processed/test_winequality.csv",
                       random_state=100,
                       model_dir="data/saved_models",
                       alpha=0.96,
                       l1_ratio=0.48,
                       target="quality",
                       scores_file= "data/reports/scores.json",
                       params_file= "data/reports/params.json"
                      ):
    import pandas as pd
    from sklearn.linear_model import ElasticNet
    import numpy as np
    import os
    import json
    import joblib
    
    def eval_metrics(actual, pred):
        import numpy as np
        from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
        rmse = np.sqrt(mean_squared_error(actual, pred))
        mae = mean_absolute_error(actual, pred)
        r2 = r2_score(actual, pred)
        return rmse, mae, r2
    
    train = pd.read_csv(train_data_path, sep=",")
    test = pd.read_csv(test_data_path, sep=",")

    train_y = train[target]
    test_y = test[target]

    train_x = train.drop(target, axis=1)
    test_x = test.drop(target, axis=1)

    lr = ElasticNet(
        alpha=alpha, 
        l1_ratio=l1_ratio, 
        random_state=random_state)
    lr.fit(train_x, train_y)

    predicted_qualities = lr.predict(test_x)

    (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

    print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
    print("  RMSE: %s" % rmse)
    print("  MAE: %s" % mae)
    print("  R2: %s" % r2)

#######################To Track the Params and Scores Locally ##############################
    os.makedirs("data/reports", exist_ok=True)
    with open(scores_file, "w") as f:
        scores = {
            "rmse": rmse,
            "mae": mae,
            "r2": r2
        }
        json.dump(scores, f, indent=4)

    with open(params_file, "w") as f:
        params = {
            "alpha": alpha,
            "l1_ratio": l1_ratio,
        }
        json.dump(params, f, indent=4)
#####################################################


    os.makedirs(model_dir, exist_ok=True)
    model_path = os.path.join(model_dir, "model.joblib")

    joblib.dump(lr, model_path)

In [9]:
prepare_data()

---- Inside prepare_data component ----

 ---- data csv is saved to PV location /data/final_df.csv ----


In [24]:
split_and_save_data(config_path=f'data/final_df.csv',split_ratio=0.3,random_state=100,
                        train_data_path=f"data/processed/train_winequality.csv",test_data_path=f"data/processed/test_winequality.csv")

In [54]:
train_and_evaluate(config_path=f'data/final_df.csv',train_data_path=f"data/processed/train_winequality.csv",
                       test_data_path=f"data/processed/test_winequality.csv",
                       random_state=100,
                       model_dir="data/saved_models",
                       alpha=0.96,
                       l1_ratio=0.48,
                       target="quality",
                       scores_file= "data/reports/scores.json",
                       params_file= "data/reports/params.json"
                      )

Elasticnet model (alpha=0.960000, l1_ratio=0.480000):
  RMSE: 0.8493911926707736
  MAE: 0.6371715819721345
  R2: 0.04478935385622396


In [50]:
create_step_prepare_data=kfp.components.create_component_from_func(
    func=prepare_data,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4']
)

In [51]:
create_step_split_and_save_data=kfp.components.create_component_from_func(
    func=split_and_save_data,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','scikit-learn']
)



In [55]:
create_step_train_and_evaluate=kfp.components.create_component_from_func(
    func=train_and_evaluate,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','scikit-learn','numpy==1.21.0']
)



In [56]:
# Define the pipeline
@dsl.pipeline(
   name='Winequality Kubeflow Demo Pipeline',
   description='A sample pipeline that performs Winequality task'
)
# Define parameters to be fed into pipeline
def winequality_pipeline(data_path: str):
    vop = dsl.VolumeOp(
    name="wq-vol",
    resource_name="wq-vol", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO)
    
    prepare_data = create_step_prepare_data().add_pvolumes({data_path: vop.volume})
    split_and_save_data = create_step_split_and_save_data().add_pvolumes({data_path: vop.volume}).after(prepare_data)
    train_and_evaluate = create_step_train_and_evaluate().add_pvolumes({data_path: vop.volume}).after(split_and_save_data)
    
    prepare_data.execution_options.caching_strategy.max_cache_staleness = "P0D"
    split_and_save_data.execution_options.caching_strategy.max_cache_staleness = "P0D"
    train_and_evaluate.execution_options.caching_strategy.max_cache_staleness = "P0D"
       

In [58]:
kfp.compiler.Compiler().compile(
    pipeline_func=winequality_pipeline,
    package_path='Winequality_pipeline.yaml')


In [59]:
import requests

HOST = "http://40.70.70.47:8080/"
USERNAME = "user@example.com"
PASSWORD = "12341234"
NAMESPACE = "kubeflow"

session = requests.Session()
response = session.get(HOST)

headers = {
    "Content-Type": "application/x-www-form-urlencoded",
}

data = {"login": USERNAME, "password": PASSWORD}
session.post(response.url, headers=headers, data=data)
session_cookie = session.cookies.get_dict()["authservice_session"]

client = kfp.Client(
    host=f"{HOST}/pipeline",
    cookies=f"authservice_session={session_cookie}",
    namespace=NAMESPACE,
)

In [61]:
DATA_PATH = '/data'

import datetime
print(datetime.datetime.now().date())


pipeline_func = winequality_pipeline
experiment_name = 'winequality_pipeline' +"_"+ str(datetime.datetime.now().date())
run_name = pipeline_func.__name__ + ' run'
namespace = "kubeflow"

arguments = {"data_path":DATA_PATH}

kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

2023-05-15
