In [1]:
import kfp
client = kfp.Client()
NAMESPACE = client.get_user_namespace()
EXPERIMENT_NAME = 'dphi'
PREFIX = "dphi_"

print(NAMESPACE)

admin


In [2]:
from collections import namedtuple
Settings = namedtuple('Settings', [
    'pandas_version',
    'sklearn_version',
    'numpy_version',
    'base_python_image',
    'xgboost_version',
    'evidently_version'
])

settings = Settings(
    pandas_version="1.5.3",
    sklearn_version="1.2.2",
    numpy_version="1.24.2",
    base_python_image="python:3.10.11",
    xgboost_version="1.7.5",
    evidently_version="0.2.8"
) 
print(f"{settings}")

Settings(pandas_version='1.5.3', sklearn_version='1.2.2', numpy_version='1.24.2', base_python_image='python:3.10.11', xgboost_version='1.7.5', evidently_version='0.2.8')


In [3]:
# import kfp dsl components
import kfp.dsl as dsl
from functools import partial
from kfp.dsl import pipeline, ContainerOp
from kfp.components import InputPath, OutputPath, create_component_from_func

In [4]:
@partial(
    create_component_from_func,
    output_component_file=f"{PREFIX}_download_component.yaml",
    base_image=settings.base_python_image, 
    packages_to_install=[
        f"pandas=={settings.pandas_version}",
        f"numpy=={settings.numpy_version}",
        f"scikit-learn=={settings.sklearn_version}"
    ] 
)
def download_data(output_path: OutputPath("CSV")):

    import json
    import argparse
    from pathlib import Path

    import numpy as np
    import pandas as pd

    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import OneHotEncoder
    
    # Gets and split dataset
    url='https://drive.google.com/file/d/1e83nTKyy9UiiparbtiLMTbF1HC40lr0r/view?usp=share_link'
    url='https://drive.google.com/uc?id=' + url.split('/')[-2]
    df = pd.read_csv(url)
    df[['grade','view','waterfront']] = df[['grade','view','waterfront']].astype('object')

    # Delete entry with 33 bedrooms
    df = df[df["bedrooms"] != 33]

    features = ['sqft_living','grade', 'sqft_above', 'sqft_living15',
           'bathrooms','view','sqft_basement','lat','long','waterfront',
           'yr_built', 'bedrooms']
    
    #creating two datasets for evidently.ai API
    ref_data = df[:15000]
    prod_data = df[15000:]
    
    X_train, X_val, y_train, y_val = train_test_split(ref_data[features], ref_data['price'], test_size=0.2, shuffle=True, random_state=42)

    categorical = ['grade', 'view', 'waterfront']
    ohe = OneHotEncoder(handle_unknown = 'ignore')
    ohe = ohe.fit(X_train[categorical])
    
    def preprocessing(X, y, ohe):
        # Convert grade, view, waterfront to type object
        X[['grade','view','waterfront']] = X[['grade','view','waterfront']].astype('object')
        
        # log transform the target varibale 
        y = np.log1p(y)
        
        # define categorical and numerical varibales 
        categorical = ['grade', 'view', 'waterfront']
        numerical = ['sqft_living', 'sqft_above', 'sqft_living15',
            'bathrooms','sqft_basement','lat','long','yr_built',
            'bedrooms']
        
        # one-hot encode categorical variables
        X_cat = ohe.transform(X[categorical]).toarray()
        
        # define numerical columns 
        X_num = np.array(X[numerical])
        
        # concatenate numerical and categorical variables
        X = np.concatenate([X_cat, X_num], axis=1)
        
        print('Shape after one-hot encoding')
        print(f'X shape: {X.shape}')
        
        return X, y
    
    X_train, y_train = preprocessing(X_train, y_train, ohe)
    X_val, y_val = preprocessing(X_val, y_val, ohe)
    X_prod, y_prod = preprocessing(prod_data[features], prod_data['price'], ohe)

    X_train_full, y_train_full = preprocessing(ref_data[features], ref_data['price'], ohe)

    ref_data_numpy = ref_data.to_numpy()
    prod_data_numpy = prod_data.to_numpy()

    # Creates `data` structure to save and 
    # share train, val and prod datasets.
    data = {'x_train' : X_train.tolist(),
            'y_train' : y_train.tolist(),
            'x_train_full' : X_train_full.tolist(),
            'y_train_full' : y_train_full.tolist(),
            'x_val' : X_val.tolist(),
            'y_val' : y_val.tolist(),
            'x_prod' : X_prod.tolist(),
            'y_prod' : y_prod.tolist(),
            'ref_data' : ref_data_numpy.tolist(),
            'prod_data' : prod_data_numpy.tolist()}

    # Creates a json object based on `data`
    data_json = json.dumps(data)
    
    with open(output_path, "w+", encoding="utf-8") as f:
        json.dump(data_json, f)

In [5]:
@partial(
    create_component_from_func,
    output_component_file=f"{PREFIX}_extremegboost_component.yaml",
    base_image=settings.base_python_image, 
    packages_to_install=[
        f"pandas=={settings.pandas_version}",
        f"numpy=={settings.numpy_version}",
        f"scikit-learn=={settings.sklearn_version}",
        f"xgboost=={settings.xgboost_version}"
    ] 
)
def process_data(input_path: InputPath("CSV"), output_path: OutputPath("CSV")):
    import json
    import argparse
    from pathlib import Path

    import xgboost as xgb
    import numpy as np
    import pandas as pd

    from sklearn.metrics import mean_squared_error
    from sklearn.metrics import accuracy_score
    
    # Open and reads file "data"
    with open(input_path) as f:
        data = json.load(f)
    
    # The excted data type is 'dict', however since the file
    # was loaded as a json object, it is first loaded as a string
    # thus we need to load again from such string in order to get 
    # the dict-type object.
    data = json.loads(data)

    x_train = data['x_train']
    y_train = data['y_train']
    x_val = data['x_val']
    y_val = data['y_val']

    
    # Initialize XGB with objective function
    parameters = {"objective": 'reg:squarederror',
                "n_estimators": 100,
                "verbosity": 0}

    model = xgb.XGBRegressor(**parameters)
    model.fit(x_train, y_train)
        
    # generate predictions
    y_pred_train = model.predict(x_train).reshape(-1,1)
    y_pred = model.predict(x_val).reshape(-1,1)
        
    # calculate errors
    rmse_train = mean_squared_error(y_pred_train, y_train, squared=False)
    rmse_val = mean_squared_error(y_pred, y_val, squared=False)
    print(f"rmse training: {rmse_train:.3f}\t rmse validation: {rmse_val:.3f}")

    
    #Extracting ref and prod from 'data'
    ref_data = data['ref_data']
    prod_data = data['prod_data']
    X_train_full = data['x_train_full']
    X_prod = data['x_prod']

    column_names = ["id", "data", "price", "bedrooms", "bathrooms", "sqft_living", "sqft_lot", "floors", "waterfront", "view", "...", "grade", "sqft_above", "sqft_basement", "yr_built", "yr_renovated", "zipcode", "lat", "long", "sqft_living15", "sqft_lot15"]

    #ref_data and prod_data was stored as numpy.darray, therefore, converting back to pandas DataFrame.
    ref_data = pd.DataFrame.from_records(ref_data)
    ref_data.columns = column_names

    prod_data = pd.DataFrame.from_records(prod_data)
    prod_data.columns = column_names

    ref_data['prediction'] = model.predict(X_train_full)
    prod_data['prediction'] = model.predict(X_prod)
    ref_data['price_log'] = np.log1p(ref_data['price'])
    prod_data['price_log'] = np.log1p(prod_data['price'])

    ref_data_final_numpy = ref_data.to_numpy()
    prod_data_final_numpy = prod_data.to_numpy()

    evidently_data = {
        'ref_data' : ref_data_final_numpy.tolist(),
        'prod_data' : prod_data_final_numpy.tolist()
    }

    # Creates a json object based on `evidently_data`
    evidently_data_json = json.dumps(evidently_data)
        
    with open(output_path, "w+", encoding="utf-8") as f:
        json.dump(evidently_data_json, f)
    

In [6]:
@partial(
    create_component_from_func,
    output_component_file=f"{PREFIX}evidently_monitoring_component.yaml",
    base_image=settings.base_python_image, # use python base image
    packages_to_install=[
        f"pandas=={settings.pandas_version}",
        f"evidently=={settings.evidently_version}"
    ]    
)
def evidently_monitoring(intput_path: InputPath(), mlpipeline_ui_metadata_path: OutputPath(str)):
    
    import json
    import argparse
    from pathlib import Path
    from collections import namedtuple
    import os

    import pandas as pd
    
    import evidently

    from evidently.dashboard import Dashboard
    from evidently.pipeline.column_mapping import ColumnMapping
    # packages for interactive dashboards
    from evidently.dashboard.tabs import DataDriftTab
    
        # Open and reads file "data"
    with open(intput_path) as data_file:
        evidently_data = json.load(data_file)

    evidently_data = json.loads(evidently_data)

    ref_data = evidently_data['ref_data']
    prod_data = evidently_data['prod_data']

    column_names = ["id", "data", "price", "bedrooms", "bathrooms", "sqft_living", "sqft_lot", "floors", "waterfront", "view", "...", "grade", "sqft_above", "sqft_basement", "yr_built", "yr_renovated", "zipcode", "lat", "long", "sqft_living15", "sqft_lot15", "prediction", "price_log"]

    ref_data = pd.DataFrame.from_records(ref_data)
    ref_data.columns = column_names

    prod_data = pd.DataFrame.from_records(prod_data)
    prod_data.columns = column_names


    # Define the data drift dashboard using the DataDriftTab.
    column_mapping = ColumnMapping()
    target = 'price_log'
    numerical_features = ['sqft_living', 'sqft_above', 'sqft_living15',
            'bathrooms','sqft_basement','lat','long',
            'yr_built','bedrooms']
    categorical_features = ['grade', 'view', 'waterfront']
    column_mapping.target = target
    column_mapping.prediction = 'prediction'
    column_mapping.numerical_features = numerical_features
    column_mapping.categorical_features = categorical_features

    print(column_mapping)

    data_drift_dashboard = Dashboard(tabs=[DataDriftTab(verbose_level=1)])
    data_drift_dashboard.calculate(ref_data, prod_data, column_mapping=column_mapping)
        
    data_drift_dashboard_filename = "data_drift.html"
    local_dir = "/tmp/artifact_downloads"
    if not os.path.exists(local_dir):
        os.mkdir(local_dir)
    static_html_path = os.path.join(local_dir, data_drift_dashboard_filename)
    data_drift_dashboard.save(static_html_path)
    with open(static_html_path, "r") as f:
        inline_report = f.read()
        
    metadata = {
        "outputs": [
            {
                "storage": "inline",
                "source": inline_report,
                "type": "web-app",
            },
        ]
    }
        
    from collections import namedtuple
        
    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata, metadata_file)

In [8]:
@pipeline(
    name = EXPERIMENT_NAME,
)
def custom_pipeline():
    
    '''pipeline'''   
    download_task = download_data()
    download_task.set_display_name("download data")
    
    # variable name "output_path", all "_path" will be removed by sysem
    process_data_task = process_data(download_task.outputs["output"])
    process_data_task.set_display_name("extreme_gboost")
    
    visualization_task = evidently_monitoring(process_data_task.outputs["output"])
    visualization_task.set_display_name("evidently monitoring")


In [9]:
PIPE_LINE_FILE_NAME=f"{PREFIX}_assignment_pipeline"
kfp.compiler.Compiler().compile(custom_pipeline, f"{PIPE_LINE_FILE_NAME}.yaml")