In [1]:
# !pip install --upgrade --user "kfp" google-cloud-aiplatform google-cloud-storage category_encoders pandas google_cloud_pipeline_components

In [2]:
PROJECT_ID = "blackfridayintelia"
REGION = "us-central1"
BUCKET_URI = "gs://blackfriday_kfp_pipeline"
DATA_URI = "gs://blackfriday_data/train.csv"
SERVICE_ACCOUNT = "397218668039-compute@developer.gserviceaccount.com"
PIPELINE_ROOT = "{}/pipeline_root/xgb".format(BUCKET_URI)
DEPLOY_IMAGE = "gcr.io/cloud-aiplatform/prediction/xgboost-cpu.1-1:latest"

In [3]:
# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $DATA_URI
# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI
# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $DATA_URI

In [4]:
import os
from typing import NamedTuple
import google.cloud.aiplatform as aip
import kfp
from kfp import compiler, dsl
# from kfp.v2.components import importer_node
from kfp.dsl import Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, component, Metrics
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,
                                                              ModelDeployOp)
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp

In [5]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [6]:
@component(base_image='python:3.9', 
           packages_to_install=['pandas', 'pyarrow'])
def dataloader(
    # An input parameter of type string.
    message: str,
    # input_dataset
    imported_dataset: Input[Dataset],
    # Use Output to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    output_dataset: Output[Dataset]
):
    import pandas as pd
    df = pd.read_csv(imported_dataset.path)
    
    df['User_ID'] = df['User_ID'].astype(str)
    df['Occupation'] = df['Occupation'].astype(str)
    df['Marital_Status'] = df['Marital_Status'].astype(str)
    df['Product_Category_1'] = df['Product_Category_1'].astype(str)
    df['Product_Category_2'] = df['Product_Category_2'].astype(str)
    df['Product_Category_3'] = df['Product_Category_3'].astype(str)
    
    print("description:")
    print(df['Purchase'].describe())
    
    output_dataset.metadata["message"] = message
    output_dataset.metadata["shape"] = df.shape
    output_dataset.metadata["format"] = "parquet"
    
    df.to_parquet(output_dataset.path)
    

In [7]:
@component(base_image='python:3.9', 
           packages_to_install=['pandas', 'pyarrow'])
def transformation(
    # input_dataset
    imported_dataset: Input[Dataset],
    # Use Output to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    output_dataset: Output[Dataset]
):
    import pandas as pd
    df = pd.read_parquet(imported_dataset.path)
    
    print('data scaling by sqrt and shrink into [0, 10]')
    df['Purchase'] = (df['Purchase'].pow(1/2)-3.464)/15
    
    print("description:")
    print(df['Purchase'].describe())
    
    output_dataset.metadata["message"] = "Feature 'Purchase' sqrt transformed and shrinked into [0.0, 10.0]"
    output_dataset.metadata["shape"] = df.shape
    output_dataset.metadata["format"] = "parquet"
    
    df.to_parquet(output_dataset.path)
    

In [8]:
@component(base_image='python:3.9', 
           packages_to_install=['pandas', 'pyarrow', 'scikit-learn'])
def traintestsplit(
    # input_dataset
    imported_dataset: Input[Dataset],
    random_seed: int,
    test_size: float,
    # Use Output to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    X_train_dataset: Output[Dataset],
    y_train_dataset: Output[Dataset],
    X_test_dataset: Output[Dataset],
    y_test_dataset: Output[Dataset]    
):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    import pickle
    
    df = pd.read_parquet(imported_dataset.path)
    
    print('train test split')
    X_train, X_test, y_train, y_test = train_test_split(df.drop('Purchase', axis=1), df['Purchase'], random_state=random_seed, test_size=test_size)

    odd_users = X_test[~X_test.User_ID.isin( X_train.User_ID.unique())].shape[0]
    
    X_train_dataset.metadata["message"] = "X_train_dataset"
    X_train_dataset.metadata["shape"] = X_train.shape
    X_train_dataset.metadata["format"] = "parquet"
    X_train.to_parquet(X_train_dataset.path)
    
    X_test_dataset.metadata["message"] = "X_test_dataset"
    X_test_dataset.metadata["shape"] = X_test.shape
    X_test_dataset.metadata["format"] = "parquet"
    X_test_dataset.metadata["odd_users"] = odd_users
    X_test.to_parquet(X_test_dataset.path)
    
    y_train_dataset.metadata["message"] = "y_train_dataset"
    y_train_dataset.metadata["shape"] = y_train.shape
    y_train_dataset.metadata["format"] = "pickle"
    with open(y_train_dataset.path, 'wb') as f:
        pickle.dump(y_train, f)
    
    y_test_dataset.metadata["message"] = "y_test_dataset"
    y_test_dataset.metadata["shape"] = y_test.shape
    y_test_dataset.metadata["format"] = "pickle"
    y_test_dataset.metadata["odd_users"] = odd_users
    with open(y_test_dataset.path, 'wb') as f:
        pickle.dump(y_test, f)
        
    print(min(y_train), max(y_train))
    print(min(y_test), max(y_test))
    

In [9]:
@component(base_image='python:3.9', 
           packages_to_install=['pandas', 'pyarrow', 'category_encoders'])
def target_encoding(
    # input_dataset
    imported_X_train: Input[Dataset],
    # input_dataset
    imported_y_train: Input[Dataset],
    # input_dataset
    imported_X_test: Input[Dataset],
    # Use Output to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    output_X_train: Output[Dataset],
    # of type `Dataset`.
    output_X_test: Output[Dataset]
):
    import pandas as pd
    from category_encoders import TargetEncoder
    import pickle
    
    X_train = pd.read_parquet(imported_X_train.path)
    X_test = pd.read_parquet(imported_X_test.path)

    with open(imported_y_train.path, 'rb') as f:
        y_train = pickle.load(f)

    print('target encode all the categorical features')
    encoder = TargetEncoder()
    enc = encoder.fit(X=X_train, y=y_train)

    df_train_X = enc.transform(X_train)
    df_test_X = enc.transform(X_test)

    print('train head')
    print(df_train_X.head())

    output_X_train.metadata["message"] = "Target encode all the categorical features"
    output_X_train.metadata["shape"] = df_train_X.shape
    output_X_train.metadata["format"] = "parquet"        
    output_X_train.metadata["type"] = "X_train"
    df_train_X.to_parquet(output_X_train.path)

    output_X_test.metadata["message"] = "Target encode all the categorical features"
    output_X_test.metadata["shape"] = df_test_X.shape
    output_X_test.metadata["format"] = "parquet"
    output_X_test.metadata["type"] = "X_test"
    df_test_X.to_parquet(output_X_test.path)
    

In [10]:
@component(base_image='python:3.9', 
           packages_to_install=['pandas', 'pyarrow', 'xgboost==1.7', 'scikit-learn'])
def train_xgb(
    # input_dataset
    imported_X_train: Input[Dataset],
    # input_dataset
    imported_y_train: Input[Dataset],
    # input_dataset
    imported_X_test: Input[Dataset],
    # input_dataset
    imported_y_test: Input[Dataset],
    random_seed: int,
    model: Output[Artifact],
    model_path: OutputPath(str),
    metrics: Output[Metrics]
):
    from math import sqrt
    import pandas as pd
    import pickle
    import os
    from sklearn.metrics import mean_squared_error
    from xgboost import XGBRegressor
    
    
    df_train_X = pd.read_parquet(imported_X_train.path)
    df_test_X = pd.read_parquet(imported_X_test.path)

    with open(imported_y_train.path, 'rb') as f:
        y_train = pickle.load(f)
    with open(imported_y_test.path, 'rb') as f:
        y_test = pickle.load(f)
    
    xgb_reg = XGBRegressor( seed=random_seed)
    xgb_reg.fit(df_train_X, y_train)
    xgb_y_pred = xgb_reg.predict(df_test_X)
    
    scaled_rmse = sqrt(mean_squared_error(y_test, xgb_y_pred))
    rmse = sqrt(mean_squared_error((y_test*15 + 3.464)*(y_test*15 + 3.464), (xgb_y_pred*15 + 3.464)*(xgb_y_pred*15 + 3.464)))
    print('Scaled RMSE:', scaled_rmse)
    print("RMSE of XGBoost Model on the original test data is ", rmse)
    
    metrics.log_metric('scaled_rmse', scaled_rmse)
    metrics.log_metric('rmse', rmse)
    model.metadata['type'] = 'XGBRegressor'

    os.mkdir(f'{model.path}_export')
    with open(f'{model.path}_export/model.pkl', 'wb') as f:
        pickle.dump(xgb_reg, f)

    print('the path:::::', model.path)
    with open(model_path, 'w') as f:
        f.write(model.path.replace('/gcs/', 'gs://').replace('/model', '/model_export'))
    

In [11]:
@component(base_image='python:3.9', 
           packages_to_install=['pandas', 'pyarrow', 'fastai', 'torch', 'torchvision', 'torchaudio', 'scikit-learn'])
def train_dnn(
    # input_dataset
    imported_X_train: Input[Dataset],
    # input_dataset
    imported_y_train: Input[Dataset],
    # input_dataset
    imported_X_test: Input[Dataset],
    # input_dataset
    imported_y_test: Input[Dataset],
    random_seed: int,
    model: Output[Artifact],
    model_path: OutputPath(str),
    metrics: Output[Metrics]
):
    from math import sqrt
    import pandas as pd
    import pickle
    import os
    from sklearn.metrics import mean_squared_error
    from fastai.collab import CollabDataLoaders, collab_learner
    import random
    import torch
    import numpy as np
    import warnings

    random.seed(random_seed)
    np.random.seed(random_seed)
    torch.random.manual_seed(random_seed)
    torch.manual_seed(random_seed) # cpu  vars
    torch.cuda.manual_seed_all(random_seed) # gpu 
    torch.cuda.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True  #needed
    torch.backends.cudnn.benchmark = False

    df_train_X = pd.read_parquet(imported_X_train.path)
    df_test_X = pd.read_parquet(imported_X_test.path)

    with open(imported_y_train.path, 'rb') as f:
        y_train = pickle.load(f)
    with open(imported_y_test.path, 'rb') as f:
        y_test = pickle.load(f)
    
    ratings_dict = {'item': list(df_train_X.Product_ID),
                'user': list(df_train_X.User_ID),
                'rating': list(y_train)}
    ratings = pd.DataFrame(ratings_dict)

    ratings_test_dict = {'item': list(df_test_X.Product_ID),
                    'user': list(df_test_X.User_ID),
                    'rating': list(y_test)}
    ratings_test = pd.DataFrame(ratings_test_dict)

    dls = CollabDataLoaders.from_df(ratings, bs=64, seed=random_seed)
    
    learn = collab_learner(dls, n_factors=160, use_nn=True, y_range=(0, 10))

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        learn.fit_one_cycle(5, 5e-3, wd=0.1)
    
    ## evaluate
    dl = learn.dls.test_dl(ratings_test, with_labels=True)
    
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        pred = learn.get_preds(dl=dl)

    scaled_rmse = sqrt(mean_squared_error(y_test, [x.tolist()[0] for x in pred[0]]))
    rmse = sqrt(mean_squared_error((y_test*15 + 3.464)*(y_test*15 + 3.464), 
                                   [(x.tolist()[0]*15 + 3.464)*(x.tolist()[0]*15 + 3.464) for x in pred[0]]
                                   ))
    print('Scaled RMSE:', scaled_rmse)
    print("RMSE of XGBoost Model on the original test data is ", rmse)
    
    metrics.log_metric('scaled_rmse', scaled_rmse)
    metrics.log_metric('rmse', rmse)
    model.metadata['type'] = 'FastAI_collab_learner'
    model.metadata['DNN_technology'] = 'Pytorch'
    # model.metadata['containerSpec'] = {
    #   'imageUri':
    #       'us-docker.pkg.dev/vertex-ai/automl-tabular/prediction-server:prod'
    # }
    # learn.export(output_model.path)
    # print('the path:::::', PIPELINE_ROOT, output_model.path)
    # output_model.uri = output_model.path
    os.mkdir(f'{model.path}_export')
    with open(f'{model.path}_export/model.mar', 'wb') as f:
        torch.save(learn.model, f)

    print('the path:::::', model.path)
    with open(model_path, 'w') as f:
        f.write(model.path.replace('/gcs/', 'gs://').replace('/model', '/model_export'))
    
    

In [12]:
random_seed = 42

@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="blackfriday-pipeline-v0",
)


def pipeline(message: str):
    importer = kfp.dsl.importer(
        artifact_uri=DATA_URI,
        artifact_class=Dataset,
        reimport=False,
    )
    dataloading_task = dataloader(
        message=message,
        imported_dataset=importer.output
    )
    transformation_task = transformation(
        imported_dataset=dataloading_task.output
    )
    traintestsplit_task = traintestsplit(
        imported_dataset=transformation_task.output,
        random_seed=random_seed,
        test_size=0.25
    )
    target_encoding_task = target_encoding(
        imported_X_train=traintestsplit_task.outputs["X_train_dataset"],
        imported_y_train=traintestsplit_task.outputs["y_train_dataset"],
        imported_X_test=traintestsplit_task.outputs["X_test_dataset"],
    )
    train_xgb_task = train_xgb(
        imported_X_train=target_encoding_task.outputs["output_X_train"],
        imported_y_train=traintestsplit_task.outputs["y_train_dataset"],
        imported_X_test=target_encoding_task.outputs["output_X_test"],
        imported_y_test=traintestsplit_task.outputs["y_test_dataset"],
        random_seed=random_seed
    )
    train_dnn_task = train_dnn(
        imported_X_train=traintestsplit_task.outputs["X_train_dataset"],
        imported_y_train=traintestsplit_task.outputs["y_train_dataset"],
        imported_X_test=traintestsplit_task.outputs["X_test_dataset"],
        imported_y_test=traintestsplit_task.outputs["y_test_dataset"],
        random_seed=random_seed
    )
    endpoint_xgb_op = EndpointCreateOp(
        project=PROJECT_ID,
        location=REGION,
        display_name="blackfriday_xgb_endpoint",
    )
    endpoint_dnn_op = EndpointCreateOp(
        project=PROJECT_ID,
        location=REGION,
        display_name="blackfriday_dnn_endpoint",
    )
    import_unmanaged_xgb_model_task = kfp.dsl.importer(
        artifact_uri=train_xgb_task.outputs["model_path"],
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-7:latest",
            },
        },
    )
    xgb_upload_op = ModelUploadOp(
      project=PROJECT_ID,
      display_name="blackfriday_xgb",
      unmanaged_container_model=import_unmanaged_xgb_model_task.outputs["artifact"]
    )
    import_unmanaged_dnn_model_task = kfp.dsl.importer(
        artifact_uri=train_dnn_task.outputs["model_path"],
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/pytorch-cpu.2-0:latest",
            },
        },
    )
    dnn_upload_op = ModelUploadOp(
      project=PROJECT_ID,
      display_name="blackfriday_dnn",
      unmanaged_container_model=import_unmanaged_dnn_model_task.outputs["artifact"]
    )

    ModelDeployOp(
        model=xgb_upload_op.outputs["model"],
        endpoint=endpoint_xgb_op.outputs["endpoint"],
        dedicated_resources_machine_type="n1-standard-4",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
        service_account="397218668039-compute@developer.gserviceaccount.com"
    )
    ModelDeployOp(
        model=dnn_upload_op.outputs["model"],
        endpoint=endpoint_dnn_op.outputs["endpoint"],
        dedicated_resources_machine_type="n1-standard-4",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
        service_account="397218668039-compute@developer.gserviceaccount.com"
    )

In [13]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="blackfriday_pipeline_20231101.yaml"
)
 

In [14]:
DISPLAY_NAME = "BlackFriday_pipeline_20231101"

job = aip.PipelineJob(
    enable_caching=False,
    display_name=DISPLAY_NAME,
    template_path="blackfriday_pipeline_20231101.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"message": "BlackFriday Sales Prediction Case Study"},
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/397218668039/locations/us-central1/pipelineJobs/blackfriday-pipeline-v0-20231101052843
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/397218668039/locations/us-central1/pipelineJobs/blackfriday-pipeline-v0-20231101052843')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/blackfriday-pipeline-v0-20231101052843?project=397218668039
PipelineJob projects/397218668039/locations/us-central1/pipelineJobs/blackfriday-pipeline-v0-20231101052843 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/397218668039/locations/us-central1/pipelineJobs/blackfriday-pipeline-v0-20231101052843 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/397218668039/locations/us-central1/pipelineJobs/blackfriday-pipeline-v0-20231101052843 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/39