# Impact Analytics Sandbox Demo

The purpose of this demo is to demonstrate creation of kubeflow ML pipelines for Impact analytics.

## Step 1 : Build and Push Base Container image
We have created a base container which have all dependencies installed. Since, all dependencies are installed in the base container, we do not have to install the dependencies for every custom component repeatedly.

Build and Push container to Container Registry

In [None]:
IMAGE_URI="gcr.io/impact-analytics-sandbox/base_container:v3"

In [None]:
cd base_container

In [None]:
!docker build ./ -t $IMAGE_URI

In [None]:
!docker push $IMAGE_URI

## Step 2: Defining the pipeline components

In [13]:
# importing libraries
import matplotlib.pyplot as plt
import pandas as pd
from google.cloud import aiplatform
from google.cloud import bigquery
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google.cloud import aiplatform_v1
from typing import NamedTuple
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)

Since we are training a custom model using DARTS package we have created custom components

### a) Fetching data

Below component is used to fetch data from bigquery and its output will be passed as input to the training component

In [14]:
# Custom component to fetch data from BigQuery
@component(
    base_image="gcr.io/impact-analytics-sandbox/create_base_image:v1",
    output_component_file="create_dataset.yaml"
)
def get_air_data(
    #bq_table: str,
    output_data_path: OutputPath("Dataset")
):
    from google.cloud import bigquery
    import pandas as pd
    bqclient = bigquery.Client(project="impact-analytics-sandbox")

    # Download query results.
    query_string = """
    SELECT *
    FROM `impact-analytics-sandbox.poc_dataset.AirPassengersDataset`
    """
    # get dataframe by querying bigquery table
    air_df = (
        bqclient.query(query_string)
            .result()
            .to_dataframe(
            create_bqstorage_client=True,
        )
    )
    
    air_df.to_csv(output_data_path,index=False)
    print(output_data_path)

### b) Training and Evaluation of model

Sequential_model component is used for model training and evaluation. We are transforming data, creating and saving the darts model based on model type parameter and evaluate model based on MAPE, MSE, and RMSE metrics. 

In [15]:
# Custom component for Model training and evaluation
@component(
    base_image="gcr.io/impact-analytics-sandbox/create_base_image:v1",
    output_component_file="beans_model_component.yaml",
)
def sequential_model(
    dataset:  Input[Dataset],
    model_type: str,
    model: Output[Model],
    metrics: Output[Metrics],
    model_path: OutputPath("Model"),
) -> NamedTuple('ExampleOutputs', [('tar_path', str)]):
    import pandas as pd
    import numpy as np
    import torch
    import matplotlib.pyplot as plt
    import json
    from darts import TimeSeries
    from darts.utils.timeseries_generation import (
        gaussian_timeseries,
        linear_timeseries,
        sine_timeseries,
    )
    from darts.models import RNNModel
    from darts.metrics import mape, mse, rmse
    from darts.dataprocessing.transformers import Scaler
    from darts.utils.timeseries_generation import datetime_attribute_timeseries
    from google.cloud import storage
    import glob
    import shutil
    from collections import namedtuple
    from typing import NamedTuple
    def upload_blob(bucket_name, source_file_name, destination_blob_name):
        """Uploads a file to the bucket."""

        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)

        blob.upload_from_filename(source_file_name)

        print(
            "File {} uploaded to {}.".format(
                source_file_name, destination_blob_name
            )
        )    
    # Data Preparation
    air_df = pd.read_csv(dataset.path)
    air_df['Month']=pd.to_datetime(air_df['Month'])
    air_df.sort_values(by="Month",inplace=True)
    air_df.index = air_df['Month']
    air_df.drop("Month",inplace=True,axis=1)
    series = TimeSeries.from_dataframe(air_df)
    # Create training and validation sets:
    train, val = series.split_after(pd.Timestamp("19590101"))

    # Normalize the time series (note: we avoid fitting the transformer on the validation set)
    transformer = Scaler()
    train_transformed = transformer.fit_transform(train)
    val_transformed = transformer.transform(val)
    series_transformed = transformer.transform(series)

    # create month and year covariate series
    year_series = datetime_attribute_timeseries(
        pd.date_range(start=series.start_time(), freq=series.freq_str, periods=1000),
        attribute="year",
        one_hot=False,
    )
    year_series = Scaler().fit_transform(year_series)
    month_series = datetime_attribute_timeseries(
        year_series, attribute="month", one_hot=True
    )
    covariates = year_series.stack(month_series)
    cov_train, cov_val = covariates.split_after(pd.Timestamp("19590101"))
    
    
    #setting hyperparameters
    hidden_dim=20
    dropout=0
    batch_size=16
    epochs=300
    learning_rate=1e-3
    optimizer_kwargs={"lr":learning_rate }
    model_name="Air_"+model_type
    log_tensorboard=True
    random_state=42
    training_length=20
    input_chunk_length=14
    force_reset=True
    save_checkpoints=True
    
    # Model Creation
    my_model = RNNModel(
        model=model_type,
        hidden_dim=hidden_dim,
        dropout=dropout,
        batch_size=batch_size,
        n_epochs=epochs,
        optimizer_kwargs=optimizer_kwargs,
        model_name=model_name,
        log_tensorboard=log_tensorboard,
        random_state=random_state,
        training_length=training_length,
        input_chunk_length=input_chunk_length,
        force_reset=True,
        save_checkpoints=True,
    )
    
    my_model.fit(
        train_transformed,
        future_covariates=covariates,
        val_series=val_transformed,
        val_future_covariates=covariates,
        verbose=False,
    )
    
    # metadata about model
    model.metadata["hidden_dim"] = hidden_dim
    model.metadata["dropout"] = dropout
    model.metadata["batch_size"] = batch_size
    model.metadata["n_epochs"]=  epochs
    model.metadata["learning rate"] = learning_rate
    #model.metadata["model_name"]="Air_"+model_type
    
    model.metadata["random_state"] = random_state
    model.metadata["training_length"] = training_length
    model.metadata["input_chunk_length"] = input_chunk_length
     
    from datetime import datetime
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    k = glob.glob('darts_logs' + "/**", recursive=True)
    blobs_list = []
    for i in k:
        if '.' in i:
            blobs_list.append(i)

    bucket_name = 'impact-analytics-experiments-bucket01'
    
    # Saving ML models to bucket
    for blob in blobs_list:
        print(blob)    
        source_file_name = blob 
        destination_blob_name = 'model_logs{}/'.format(TIMESTAMP)+blob
        upload_blob(bucket_name, source_file_name, destination_blob_name)
        if blob.endswith(".pth.tar"):
            path = bucket_name + "/" + destination_blob_name

    my_model.save_model(model_path+ ".pth.tar")
    
    

    # Evaluating model
    def eval_model(model):
        pred_series = model.predict(n=26, future_covariates=covariates)
        mape1 = mape(pred_series, val_transformed)
        mse1 = mse(pred_series, val_transformed)
        rmse1 = rmse(pred_series, val_transformed)
        print("MAPE: {:.2f}%".format(mape1))
        print("MSE: {:.2f}%".format(mse1))
        print("RMSE: {:.2f}%".format(rmse1))
        metrics.log_metric("MAPE","{:.2f}%".format(mape1))
        metrics.log_metric("MSE", "{}".format(mse1))
        metrics.log_metric("RMSE", "{}".format(rmse1))
    
    eval_model(my_model)
    example_output = namedtuple('ExampleOutputs', ['tar_path'])
    return example_output(path)
    

### c) Model Deployment

Deploy model component is for deploying model. However, we are getting a permission error on deploying the model in the pipeline. But we are able to deploy the model manually and the model deployment can be worked upon.

In [16]:
@component(
    base_image="gcr.io/impact-analytics-sandbox/base_container:v3",
    output_component_file="deployment_component.yaml",
    )
def deploy_to_run(
    tar_path: str,
    output_data_path: OutputPath("Dataset")
    ):
    from google.cloud import storage
    import os
    from github import Github

   
    storage_client = storage.Client()

    bucket = storage_client.get_bucket(tar_path.split("/")[0])
    blob = bucket.blob("/".join(tar_path.split("/")[1:]))
    blob.download_to_filename("/tmp/model.pth.tar")

    g = Github("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")

    repo = g.get_repo("munadkatSearce/darts-model-serving")
    contents = repo.get_contents("model.pth.tar", ref="main")

    model_file = open("/tmp/model.pth.tar", "rb")
    file_content=model_file.read()
    model_file.close()

    repo.update_file(path=contents.path, message="new_model_trained", content=file_content, sha=contents.sha)


In [17]:
from datetime import datetime
TIMESTAMP =datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'air-job{}'.format(TIMESTAMP)

We are setting the global variables to pass in the pipeline

## Step 3: Defining the pipeline

In [18]:
# Setting Global variables
PROJECT_ID="impact-analytics-sandbox"
REGION = "us-central1"
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

# Set bucket name
BUCKET_NAME="gs://"+PROJECT_ID+"-bucket"

# Create bucket
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root_air/"
PIPELINE_ROOT

env: PATH=/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin:/home/jupyter/.local/bin


'gs://impact-analytics-sandbox-bucket/pipeline_root_air/'

Below is the code to define ML pipeline. The pipeline first fetches data from bigquery source, it then trains 3 models sequentially and deploys them to the endpoint.

In [19]:
# Defining pipeline
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="sequential-pipeline",
    
)
def pipeline(
    #url: str = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv",
    project: str = PROJECT_ID,
    region: str = REGION, 
    display_name: str = DISPLAY_NAME,
    api_endpoint: str = REGION+"-aiplatform.googleapis.com",
    #thresholds_dict_str: str = '{"roc":0.8}',
    serving_container_image_uri: str = "gcr.io/impact-analytics-sandbox/base_container:v3"
    ):
    
    # fetch data
    data_op = get_air_data()
    
    # train and evaluate mulitple models
    train_model_op_lstm = sequential_model(model_type="LSTM",dataset= data_op.output)
    train_model_op_GRU = sequential_model(model_type="GRU", dataset=data_op.output).after(train_model_op_lstm)
    train_model_op_RNN = sequential_model(model_type="RNN", dataset=data_op.output).after(train_model_op_GRU)
    
    # deploy models
    deploy_task1 = deploy_to_run(
        train_model_op_lstm.outputs["tar_path"],
    )
    deploy_task2 = deploy_to_run(
        train_model_op_GRU.outputs["tar_path"],
    )
    deploy_task3 = deploy_to_run(
        train_model_op_RNN.outputs["tar_path"]
    )

Pipeline compiler will compile the pipeline and store the pipeline configuration inside a JSON file.

In [20]:
# Compile the pipeline
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="custom_train_pipeline.json"
)

In [21]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

## Step 4: Running the Pipeline Job

Here we are deploying pipeline job which will be submitted for execution.

In [22]:
pipeline_job = aiplatform.PipelineJob(
    display_name="custom-train-pipeline",
    template_path="custom_train_pipeline.json",
    job_id="custom-train-pipeline-{0}".format(TIMESTAMP),
    enable_caching=False,
)

Below code will submit job to create the pipeline, you can use the link at the bottom to view pipeline status. Below link will help us monitor the execution of the pipeline.

In [23]:
# Submit pipeline job
pipeline_job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/90786640424/locations/us-central1/pipelineJobs/custom-train-pipeline-20220301111833
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/90786640424/locations/us-central1/pipelineJobs/custom-train-pipeline-20220301111833')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-train-pipeline-20220301111833?project=90786640424
