# Pipeline
1. Push code [Optional]
0. BQ
2. Dataset splitting -> Train, Valid -> GCS
2. GCS -> Data prep -> GCS
2. GCS -> Training starts -> Model upload to bucket
3. Deployment

# Todos

- [ ] CI/CD
- [ ] Data Splitting component
- [X] Add data preprocessing component
- [X] Recommender logic in-place
- [ ] Conditional deployment (champion-challenger model)
- [ ] Tf data for preprocessing

# Code

In [110]:
REGION = "us-central1"
PROJECT_ID = !(gcloud config get-value project)
PROJECT_ID = PROJECT_ID[0]

DATASET = "movielens"
FOLDER = "match"
PIPELINE_JSON = f"{FOLDER}/{DATASET}_kfp_pipeline.json"

ARTIFACT_STORE = f"gs://kfp-{DATASET}-artifact-store-{PROJECT_ID}"
PIPELINE_ROOT = f"{ARTIFACT_STORE}/pipeline"
DATA_ROOT = f"{ARTIFACT_STORE}/data"
JOB_DIR_ROOT = f"{ARTIFACT_STORE}/jobs"
TRAINING_FILE_PATH = f"{DATA_ROOT}/training/dataset.csv"
VALIDATION_FILE_PATH = f"{DATA_ROOT}/validation/dataset.csv"

In [111]:
IMAGE_NAME = "trainer_image_movielens"
TAG = "latest"
TRAINING_CONTAINER_IMAGE_URI = f"gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{TAG}"
TRAINING_CONTAINER_IMAGE_URI

'gcr.io/qwiklabs-gcp-04-853e5675f5e8/trainer_image_movielens:latest'

In [112]:
# https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
SERVING_CONTAINER_IMAGE_URI = (
    "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-3:latest"
)

In [113]:
%env DATASET={DATASET}
%env DATA_ROOT={DATA_ROOT}
%env ARTIFACT_STORE={ARTIFACT_STORE}
%env PIPELINE_ROOT={PIPELINE_ROOT}
%env PROJECT_ID={PROJECT_ID}
%env REGION={REGION}
%env JOB_DIR_ROOT={JOB_DIR_ROOT}
%env TRAINING_FILE_PATH={TRAINING_FILE_PATH}
%env VALIDATION_FILE_PATH={VALIDATION_FILE_PATH}
%env SERVING_CONTAINER_IMAGE_URI={SERVING_CONTAINER_IMAGE_URI}
%env TRAINING_CONTAINER_IMAGE_URI={TRAINING_CONTAINER_IMAGE_URI}

# Set `PATH` to include the directory containing KFP CLI
PATH = %env PATH
%env PATH=/home/jupyter/.local/bin:{PATH}

env: DATASET=movielens
env: DATA_ROOT=gs://kfp-movielens-artifact-store-qwiklabs-gcp-04-853e5675f5e8/data
env: ARTIFACT_STORE=gs://kfp-movielens-artifact-store-qwiklabs-gcp-04-853e5675f5e8
env: PIPELINE_ROOT=gs://kfp-movielens-artifact-store-qwiklabs-gcp-04-853e5675f5e8/pipeline
env: PROJECT_ID=qwiklabs-gcp-04-853e5675f5e8
env: REGION=us-central1
env: JOB_DIR_ROOT=gs://kfp-movielens-artifact-store-qwiklabs-gcp-04-853e5675f5e8/jobs
env: TRAINING_FILE_PATH=gs://kfp-movielens-artifact-store-qwiklabs-gcp-04-853e5675f5e8/data/training/dataset.csv
env: VALIDATION_FILE_PATH=gs://kfp-movielens-artifact-store-qwiklabs-gcp-04-853e5675f5e8/data/validation/dataset.csv
env: SERVING_CONTAINER_IMAGE_URI=us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-3:latest
env: TRAINING_CONTAINER_IMAGE_URI=gcr.io/qwiklabs-gcp-04-853e5675f5e8/trainer_image_movielens:latest
env: PATH=/home/jupyter/.local/bin:/home/jupyter/.local/bin:/home/jupyter/.local/bin:/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/us

In [56]:
import os

os.makedirs(FOLDER, exist_ok=True)

# Dataset

## Import dataset

In [6]:
# %%bash

# DATASET_LOCATION=US
# DATASET_ID=covertype_dataset
# TABLE_ID=covertype
# DATA_SOURCE=gs://workshop-datasets/covertype/small/dataset.csv
# SCHEMA=Elevation:INTEGER,\
# Aspect:INTEGER,\
# Slope:INTEGER,\
# Horizontal_Distance_To_Hydrology:INTEGER,\
# Vertical_Distance_To_Hydrology:INTEGER,\
# Horizontal_Distance_To_Roadways:INTEGER,\
# Hillshade_9am:INTEGER,\
# Hillshade_Noon:INTEGER,\
# Hillshade_3pm:INTEGER,\
# Horizontal_Distance_To_Fire_Points:INTEGER,\
# Wilderness_Area:STRING,\
# Soil_Type:STRING,\
# Cover_Type:INTEGER

# bq --location=$DATASET_LOCATION --project_id=$PROJECT_ID mk --dataset $DATASET_ID

# bq --project_id=$PROJECT_ID --dataset_id=$DATASET_ID load \
# --source_format=CSV \
# --skip_leading_rows=1 \
# --replace \
# $TABLE_ID \
# $DATA_SOURCE \
# $SCHEMA

In [7]:
!gsutil ls | grep ^{ARTIFACT_STORE}/$ || gsutil mb -l {REGION} {ARTIFACT_STORE}

gs://kfp-movielens-artifact-store-qwiklabs-gcp-04-853e5675f5e8/


## Train, Valid split

In [8]:
# TODO: create data prep KFP component

In [9]:
# CREATE TRAIN DATASET
!bq query \
-n 0 \
--destination_table movielens.training \
--replace \
--use_legacy_sql=false \
'SELECT * \
FROM `movielens.ratings` AS table \
WHERE \
MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(table))), 10) IN (1, 2, 3, 4)' 

!bq extract \
--destination_format CSV \
movielens.training \
$TRAINING_FILE_PATH

Waiting on bqjob_r19b278b7f034f05f_0000017f97ea6b85_1 ... (23s) Current status: DONE   
Waiting on bqjob_r4abf94e0b0f8b7c3_0000017f97ead083_1 ... (34s) Current status: DONE   


In [10]:
# CREATE VALID DATASET
!bq query \
-n 0 \
--destination_table $DATASET.validation \
--replace \
--use_legacy_sql=false \
'SELECT * \
FROM `movielens.ratings` AS table \
WHERE \
MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(table))), 10) IN (8)' 

!bq extract \
--destination_format CSV \
$DATASET.validation \
$VALIDATION_FILE_PATH

Waiting on bqjob_r53f7547101c14ec2_0000017f97eb6143_1 ... (15s) Current status: DONE   
Waiting on bqjob_rd4a28b17b21772_0000017f97eba789_1 ... (5s) Current status: DONE   


In [11]:
# test

import pandas as pd

df_train = pd.read_csv(TRAINING_FILE_PATH)
df_validation = pd.read_csv(VALIDATION_FILE_PATH)
print(df_train.shape, f"Training path: {TRAINING_FILE_PATH}\n")
print(df_validation.shape, f"Validation path: {VALIDATION_FILE_PATH}")

(7996867, 4) Training path: gs://kfp-movielens-artifact-store-qwiklabs-gcp-04-853e5675f5e8/data/training/dataset.csv

(2002053, 4) Validation path: gs://kfp-movielens-artifact-store-qwiklabs-gcp-04-853e5675f5e8/data/validation/dataset.csv


# Training and Tuning

## Training model image

In [12]:
%%writefile {FOLDER}/train.py

"""Trainer script."""


Overwriting match/train.py


In [13]:
%%writefile {FOLDER}/Dockerfile
FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U fire cloudml-hypertune scikit-learn==0.20.4 pandas==0.24.2
WORKDIR /app
COPY train.py .

ENTRYPOINT ["python", "train.py"]

Overwriting match/Dockerfile


In [14]:
# !gcloud builds submit --timeout 15m --tag $TRAINING_CONTAINER_IMAGE_URI $FOLDER

## Components

### Data prep

In [146]:
%%writefile {FOLDER}/data.py
from kfp.v2.dsl import component

@component(
    # this component builds the recommender model with BigQuery ML
    packages_to_install=["google-cloud-bigquery","tensorflow", "tensorflow_datasets", "pandas", "fsspec", "gcsfs","pyarrow","fastparquet"],
    base_image="python:3.9",
    output_component_file="data_prep.yaml"
)
def data_prep(training_file_path:str,
              validation_file_path:str,
              training_clean_file_path:str,
              validation_clean_file_path:str):
    import pandas as pd
    import numpy as np

    df_train = pd.read_csv(training_file_path)
    df_val = pd.read_csv(validation_file_path)

    movies_ids = list(set(list(df_train.movieId.unique()) + list(df_val.movieId.unique())))
    users_ids = list(set(list(df_train.userId.unique()) + list(df_val.userId.unique())))

    dict_movies = dict(zip(movies_ids, range(len(movies_ids)) ))
    dict_users = dict(zip(users_ids, range(len(users_ids)) ))

    df_train["movieId"] = df_train["movieId"].map(dict_movies)
    df_val["movieId"] = df_val["movieId"].map(dict_movies)

    df_train["userId"] = df_train["userId"].map(dict_users)
    df_val["userId"] = df_val["userId"].map(dict_users)

    col = ["userId", "movieId", "rating"]
    df_train[col] = df_train[col].astype(np.float32)
    df_val[col] = df_val[col].astype(np.float32)

    # save to bucket
    df_train.to_csv(training_clean_file_path, index=False)
    df_val.to_csv(validation_clean_file_path, index=False)
    

Overwriting match/data.py


### Training

In [148]:
%%writefile {FOLDER}/training_lightweight_component.py

"""Lightweight component training function."""
from kfp.v2.dsl import component
import os

DATASET = os.environ["DATASET"]

@component(
    # this component builds the recommender model
    packages_to_install=["google-cloud-bigquery", "sklearn","tensorflow", "tensorflow_datasets", "pandas", "fsspec", "gcsfs","pyarrow","fastparquet"],
    base_image="python:3.9",
    output_component_file="train_and_fit.yaml"
)
def train(training_clean_file_path: str,
          validation_clean_file_path: str,
          model_dir: str):    
    # TODO: Move to train.py script
    import pandas as pd
    import numpy as np
    import logging
    from typing import Dict, Text
    import tensorflow as tf
    from tensorflow.keras import Model
    from tensorflow.keras import optimizers as opt
    from tensorflow.keras.layers import Embedding, multiply, concatenate, Flatten, Input, Dense

    logging.info("Reading train data...")    
    df_train = pd.read_csv(training_clean_file_path)
    logging.info("Reading valid data...")
    df_val = pd.read_csv(validation_clean_file_path)

    num_unique_users=len(set(list(df_train.userId.unique()) + list(df_val.userId.unique())))
    num_unique_movies=len(set(list(df_train.movieId.unique()) + list(df_val.movieId.unique())))

    users_input = Input(shape=(1,), name="users_input")
    users_embedding = Embedding(num_unique_users + 1, 50, name="users_embeddings")(users_input)
    users_bias = Embedding(num_unique_users + 1, 1, name="users_bias")(users_input)

    movies_input = Input(shape=(1,), name="movies_input")
    movies_embedding = Embedding(num_unique_movies + 1, 50, name="movies_embedding")(movies_input)
    movies_bias = Embedding(num_unique_movies + 1, 1, name="movies_bias")(movies_input)

    dot_product_users_movies = multiply([users_embedding, movies_embedding])
    input_terms = dot_product_users_movies + users_bias + movies_bias
    input_terms = Flatten(name="fl_inputs")(input_terms)
    output = Dense(1, activation="relu", name="output")(input_terms)
    model = Model(inputs=[users_input, movies_input], outputs=output)

    opt_adam = opt.Adam(lr = 0.005)
    model.compile(optimizer=opt_adam, loss= ['mse'], metrics=['mean_absolute_error'])

    model.fit(x=[df_train.userId, df_train.movieId], 
              y=df_train.rating, 
              batch_size=512, 
              epochs=1, 
              verbose=1, 
              validation_data=([df_val.userId, df_val.movieId], df_val.rating))

    # save model
    tf.saved_model.save(model, model_dir)


@component(
    base_image="python:3.8",
    packages_to_install=["google-cloud-aiplatform", "joblib", "sklearn", "google-cloud-bigquery"],
    output_component_file= f"{DATASET}_kfp_deploy.yaml",
)
def deploy(
    project: str,
    location: str,
    serving_container_uri: str,
    display_name:str,
    artifact_uri:str, 
):
    from google.cloud import aiplatform
    import os
    
    aiplatform.init(project=project)
    
    deployed_model = aiplatform.Model.upload(
        display_name= display_name,
        artifact_uri = artifact_uri,
        serving_container_image_uri= serving_container_uri
    )
    endpoint = deployed_model.deploy(
        traffic_split={"0": 100},
        machine_type="n1-standard-4"
    )

Overwriting match/training_lightweight_component.py


# Pipeline

In [154]:
%%writefile {FOLDER}/pipeline.py


import os
from kfp import dsl
# change the below imports if you change the module name
from data import data_prep
from training_lightweight_component import train, deploy


DATASET = os.getenv("DATASET")
DATA_ROOT = os.getenv("DATA_ROOT")
ARTIFACT_STORE = os.getenv("ARTIFACT_STORE")
PIPELINE_ROOT = os.getenv("PIPELINE_ROOT")
PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")

TRAINING_CONTAINER_IMAGE_URI = os.getenv("TRAINING_CONTAINER_IMAGE_URI")
SERVING_CONTAINER_IMAGE_URI = os.getenv("SERVING_CONTAINER_IMAGE_URI")

TRAINING_FILE_PATH = os.getenv("TRAINING_FILE_PATH")
VALIDATION_FILE_PATH = os.getenv("VALIDATION_FILE_PATH")
TRAINING_CLEAN_FILE_PATH = f"{DATA_ROOT}/training_clean/dataset.csv"
VALIDATION_CLEAN_FILE_PATH= f"{DATA_ROOT}/validation_clean/dataset.csv"

MAX_TRIAL_COUNT = int(os.getenv("MAX_TRIAL_COUNT", "5"))
PARALLEL_TRIAL_COUNT = int(os.getenv("PARALLEL_TRIAL_COUNT", "5"))
THRESHOLD = float(os.getenv("THRESHOLD", "0.6"))

MODEL_DIR = f"{ARTIFACT_STORE}/model"

@dsl.pipeline(
    name=f"{DATASET}-kfp-pipeline",
    description=f"The pipeline training and deploying the {DATASET} model",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
    training_container_uri: str = TRAINING_CONTAINER_IMAGE_URI,
    serving_container_uri: str = SERVING_CONTAINER_IMAGE_URI,
    training_file_path: str = TRAINING_FILE_PATH,
    validation_file_path: str = VALIDATION_FILE_PATH,
    training_clean_file_path: str = TRAINING_CLEAN_FILE_PATH,
    validation_clean_file_path: str = VALIDATION_CLEAN_FILE_PATH,
    model_dir: str = MODEL_DIR,
    accuracy_deployment_threshold: float = THRESHOLD,
    max_trial_count: int = MAX_TRIAL_COUNT,
    parallel_trial_count: int = PARALLEL_TRIAL_COUNT,
    pipeline_root: str = PIPELINE_ROOT,
):
    data_prep_op = data_prep(training_file_path=training_file_path,
                             validation_file_path=validation_file_path,
                             training_clean_file_path= training_clean_file_path,
                             validation_clean_file_path= validation_clean_file_path)
    
    train_op   = train(training_clean_file_path = training_clean_file_path,
                       validation_clean_file_path= validation_clean_file_path,
                       model_dir = model_dir)
    train_op.after(data_prep_op)
    
    deploy_op = deploy(
        project=PROJECT_ID,
        location=REGION,
        serving_container_uri=serving_container_uri,
        display_name='movie-recommender-keras',
        artifact_uri= model_dir
    )
    deploy_op.after(train_op)


Overwriting match/pipeline.py


In [155]:
# compile the pipeline

In [156]:
!dsl-compile-v2 --py {FOLDER}/pipeline.py --output $PIPELINE_JSON



In [157]:
!head {PIPELINE_JSON}

{
  "pipelineSpec": {
    "components": {
      "comp-data-prep": {
        "executorLabel": "exec-data-prep",
        "inputDefinitions": {
          "parameters": {
            "training_clean_file_path": {
              "type": "STRING"
            },


E0317 16:23:57.075449896   31284 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


# Deploy

In [158]:
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION)

pipeline = aiplatform.PipelineJob(
    display_name=f"{DATASET}_kfp_pipeline",
    template_path=PIPELINE_JSON,
    enable_caching=False,
    project=PROJECT_ID
)

pipeline.run(sync=False)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/1076138843678/locations/us-central1/pipelineJobs/movielens-kfp-pipeline-20220317162358
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/1076138843678/locations/us-central1/pipelineJobs/movielens-kfp-pipeline-20220317162358')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/movielens-kfp-pipeline-20220317162358?project=1076138843678
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/1076138843678/locations/us-central1/pipelineJobs/movielens-kfp-pipeline-20220317162238 current state:
PipelineState.PIPELINE_STATE_CANCELLING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/1076138843678/loca