In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Fraudfinder - ML Pipeline

<table align="left">
  <td>
    <a href="https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/raw/main/06_model_training_pipeline.ipynb">
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Google Cloud Notebooks">Open in Cloud Notebook
    </a>
  </td> 
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/06_model_training_pipeline.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Open in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/06_model_training_pipeline.ipynb">
        <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

## Overview

[Fraudfinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the Fraudfinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.

### Objective

This notebook shows how to use Feature Store, Pipelines and Model Monitoring for building an end-to-end demo using both components defined in `google_cloud_pipeline_components` and custom components. 

This lab uses the following Google Cloud services and resources:

- [Vertex AI](https://cloud.google.com/vertex-ai/)
- [BigQuery](https://cloud.google.com/bigquery/)

Steps performed in this notebook:

    * Create a Feature Store for store and sharing features
    * Create a Pipeline to deploy the model
    * Create a Model Monitoring Job to check the status of the model

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* BigQuery

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing), [BigQuery pricing](https://cloud.google.com/bigquery/pricing) and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Load configuration settings from the setup notebook

Set the constants used in this notebook and load the config settings from the `00_environment_setup.ipynb` notebook.

In [52]:
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
BUCKET_NAME = f"{PROJECT_ID}-vision-workshop"
config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py
print(config.n)
exec(config.n)


BUCKET_NAME          = "temp-vision-workshop-vision-workshop"
PROJECT              = "temp-vision-workshop"
REGION               = "europe-west4"
ID                   = "7l3oe"
MODEL_NAME           = "vision_workshop_model"
ENDPOINT_NAME        = "vision_workshop_endpoint"



### Import libraries and define constants

#### Libraries

In [53]:
# General
import os
import sys
import random
from datetime import datetime, timedelta
import json

# # Vertex Pipelines
# from typing import NamedTuple
# import kfp
# from kfp.v2 import dsl
# from kfp.v2.dsl import Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, Metrics, ClassificationMetrics, Condition, component
# from kfp.v2 import compiler

from google.cloud import aiplatform as vertex_ai
from google_cloud_pipeline_components import aiplatform as vertex_ai_components
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
# from kfp.v2.google.client import AIPlatformClient as VertexAIClient


# import google.cloud.aiplatform as aip
# from google_cloud_pipeline_components.experimental.custom_job import utils
# from kfp.v2 import compiler, dsl
# from kfp.v2.dsl import component

import google.cloud.aiplatform as vertex_ai
import tensorflow as tf
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component

In [54]:
print("kfp version:", kfp.__version__)

kfp version: 1.8.14


#### Variables

In [55]:
# #Components
# # BASE_IMAGE="gcr.io/google.com/cloudsdktool/cloud-sdk:latest"
# BASE_IMAGE='python:3.7'
# 
# INGEST_FEATURE_STORE=f"{COMPONENTS_DIR}/ingest_feature_store_{ID}.yaml"
# EVALUATE=f"{COMPONENTS_DIR}/evaluate_{ID}.yaml"

# #Pipeline
PIPELINE_NAME = f'vision-workshop-tf-pipeline-{ID}'
PIPELINE_DIR=os.path.join(os.curdir, 'pipelines')
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipelines"
PIPELINE_PACKAGE_PATH = f"{PIPELINE_DIR}/pipeline_{ID}.json"
COMPONENTS_DIR=os.path.join(os.curdir, 'pipelines', 'components')

# #Feature Store component
# START_DATE_TRAIN = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
# END_DATE_TRAIN = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
# BQ_DATASET = "tx"
# READ_INSTANCES_TABLE = f"ground_truth"
# READ_INSTANCES_URI = f"bq://{PROJECT_ID}.{BQ_DATASET}.{READ_INSTANCES_TABLE}"

# #Dataset component
DATASET_NAME = f'vision_workshop_dataset'
# # GCS_SOURCE = [f'{BUCKET_NAME}/data/train/000000000000.csv', f'{BUCKET_NAME}/data/train/000000000001.csv', f'{BUCKET_NAME}/data/train/000000000002.csv']

# #Training component
JOB_NAME = f'image-classifier-train-tf-{ID}'
MODEL_NAME = f'image-classifier-tf-{ID}'
TRAIN_MACHINE_TYPE = 'n1-standard-4'
# CONTAINER_URI = 'us-docker.pkg.dev/vertex-ai/training/xgboost-cpu.1-1:latest'
MODEL_SERVING_IMAGE_URI = "europe-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest"
# PYTHON_MODULE = 'trainer.train_model'
ARGS=[ "--lr=0.003", "--epochs=5"]
IMAGE_REPOSITORY = f"vision-{ID}"
IMAGE_NAME="image-classifier"
IMAGE_TAG="v1"
IMAGE_URI=f"europe-west4-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REPOSITORY}/{IMAGE_NAME}:{IMAGE_TAG}"

# #Evaluation component
# METRICS_URI = f"gs://{BUCKET_NAME}/deliverables/metrics.json"
# AVG_PR_THRESHOLD = 0.8
# AVG_PR_CONDITION = 'avg_pr_condition'

# #endpoint
# ENDPOINT_NAME = 'fraudfinder_xgb_prediction'

#### Initiate Client

In [56]:
# Vertex AI client
vertex_ai.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

In [57]:
REGION

'europe-west4'

#### Set folders

In [16]:
COMPONENTS_DIR

'./pipelines/components'

In [17]:
!mkdir -p -m 777 $PIPELINE_DIR $COMPONENTS_DIR

### Define Custom Components

#### Define feature store component

Notice that the component assumes that containes the entities-timestamps "query" is already created.

In [None]:
# !gsutil ubla set on gs://{BUCKET_NAME}

In [None]:
# @component(output_component_file=INGEST_FEATURE_STORE, 
#            base_image=BASE_IMAGE, 
#            packages_to_install=["git+https://github.com/googleapis/python-aiplatform.git@main"])

# def ingest_features_gcs(project_id:str, region:str, bucket_name:str,
#                        feature_store_id: str, read_instances_uri:str) -> NamedTuple("Outputs",
#                                                                        [("snapshot_uri_paths", str),],):
    
#     # Libraries --------------------------------------------------------------------------------------------------------------------------
#     from datetime import datetime
#     import glob
#     import urllib
#     import json
    
#     #Feature Store
#     from google.cloud.aiplatform import Featurestore, EntityType, Feature
    
#     # Variables --------------------------------------------------------------------------------------------------------------------------
#     timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
#     api_endpoint = region + "-aiplatform.googleapis.com"
#     bucket = urllib.parse.urlsplit(bucket_name).netloc
#     export_uri = f'{bucket_name}/data/snapshots/{timestamp}' #format as new gsfuse requires
#     export_uri_path = f'/gcs/{bucket}/data/snapshots/{timestamp}'
#     customer_entity = 'customer'
#     terminal_entity = 'terminal'
#     serving_feature_ids = {customer_entity: ['*'], terminal_entity: ['*']}
    
#     # Main -------------------------------------------------------------------------------------------------------------------------------

#     ## Define the feature store resource path
#     feature_store_resource_path = f"projects/{project_id}/locations/{region}/featurestores/{feature_store_id}"
#     print("Feature Store: \t", feature_store_resource_path)
    
#     ## Run batch job request
#     try:
#         ff_feature_store = Featurestore(feature_store_resource_path)
#         ff_feature_store.batch_serve_to_gcs(
#             gcs_destination_output_uri_prefix = export_uri,
#             gcs_destination_type = 'csv',
#             serving_feature_ids = serving_feature_ids,
#             read_instances_uri = read_instances_uri,
#             pass_through_fields = ['tx_fraud','tx_amount']
#         )
#     except Exception as error:
#         print(error)
    
#     #Store metadata
#     snapshot_pattern = f'{export_uri_path}/*.csv'
#     snapshot_files = glob.glob(snapshot_pattern)
#     snapshot_files_fmt = [p.replace('/gcs/', 'gs://') for p in snapshot_files]
#     snapshot_files_string = json.dumps(snapshot_files_fmt)
    
#     component_outputs = NamedTuple("Outputs",
#                                 [("snapshot_uri_paths", str),],)
    
#     return component_outputs(snapshot_files_string)

#### Define an evaluate custom component

In [None]:
# @component(
#     output_component_file=EVALUATE
# )
# def evaluate_model(
#     model_in: Input[Artifact],
#     metrics_uri: str,
#     meta_metrics: Output[Metrics],
#     graph_metrics: Output[ClassificationMetrics],
#     model_out: Output[Artifact]) -> NamedTuple("Outputs",
#                                             [("metrics_thr", float),],):
    
#     # Libraries --------------------------------------------------------------------------------------------------------------------------
#     import json
    
#     # Variables --------------------------------------------------------------------------------------------------------------------------
#     metrics_path = metrics_uri.replace('gs://', '/gcs/')
#     labels = ['not fraud', 'fraud']
    
#     # Main -------------------------------------------------------------------------------------------------------------------------------
#     with open(metrics_path, mode='r') as json_file:
#         metrics = json.load(json_file)

#     ## metrics
#     fpr = metrics['fpr']
#     tpr = metrics['tpr']
#     thrs = metrics['thrs']
#     c_matrix = metrics['confusion_matrix']
#     avg_precision_score = metrics['avg_precision_score']
#     f1 = metrics['f1_score']
#     lg_loss = metrics['log_loss']
#     prec_score = metrics['precision_score']
#     rec_score = metrics['recall_score']
    
#     meta_metrics.log_metric('avg_precision_score', avg_precision_score)
#     meta_metrics.log_metric('f1_score', f1)
#     meta_metrics.log_metric('log_loss', lg_loss)
#     meta_metrics.log_metric('precision_score', prec_score)
#     meta_metrics.log_metric('recall_score', rec_score)
#     graph_metrics.log_roc_curve(fpr, tpr, thrs)
#     graph_metrics.log_confusion_matrix(labels, c_matrix)
    
#     ## model metadata
#     model_framework = 'xgb.dask'
#     model_type = 'DaskXGBClassifier'
#     model_user = 'author' 
#     model_function = 'classification'
#     model_out.metadata["framework"] = model_framework
#     model_out.metadata["type"] = model_type
#     model_out.metadata["model function"] = model_function
#     model_out.metadata["modified by"] = model_user
    
#     component_outputs = NamedTuple("Outputs",
#                                 [("metrics_thr", float),],)
    
#     return component_outputs(float(avg_precision_score))

### Define the pipeline using ```kfp``` and ```google_cloud_pipeline_components```

#### Build pipeline

In [58]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=PIPELINE_NAME,
)
def pipeline(project_id:str = PROJECT_ID, 
             region:str = REGION, 
             bucket_name:str = f"gs://{BUCKET_NAME}",
             replica_count:int = 1,
             machine_type:str = "n1-standard-4",
            ):
    
    #create dataset 
    dataset_create_op = vertex_ai_components.ImageDatasetCreateOp(project=project_id,
                                                       location=region,
                                                       display_name=DATASET_NAME,
                                                       import_schema_uri=vertex_ai.schema.dataset.ioformat.image.single_label_classification,
                                                       gcs_source=f"gs://{BUCKET_NAME}/prod/flowers.csv")
    
    #custom training job component - script
    train_model_op = vertex_ai_components.CustomContainerTrainingJobRunOp(
        display_name=JOB_NAME,
        model_display_name=MODEL_NAME,
        container_uri=IMAGE_URI,
        staging_bucket=bucket_name,
        dataset=dataset_create_op.outputs['dataset'],
        annotation_schema_uri=vertex_ai.schema.dataset.annotation.image.classification,
        base_output_dir=bucket_name,
        args = ARGS,
        replica_count= replica_count,
        machine_type= machine_type,
        accelerator_type="NVIDIA_TESLA_T4",
        accelerator_count=1,
        model_serving_container_image_uri=MODEL_SERVING_IMAGE_URI,
        project=project_id,
        location=region).after(dataset_create_op)
    
    batch_op = ModelBatchPredictOp(
        project=project_id,
        location=region,
        job_display_name="batch_predict_job",
        model=train_model_op.outputs["model"],
        gcs_source_uris=[f"gs://{BUCKET_NAME}/test2.jsonl"],
        gcs_destination_output_uri_prefix=f"gs://{BUCKET_NAME}",
        instances_format="jsonl",
        predictions_format="jsonl",
        model_parameters={},
        machine_type=machine_type,
        starting_replica_count=1,
        max_replica_count=1,
    )

    
    #create endpoint
    create_endpoint_op = vertex_ai_components.EndpointCreateOp(
        display_name=ENDPOINT_NAME,
        project=project_id, 
        location=region).after(train_model_op)

    #deploy th model
    custom_model_deploy_op = vertex_ai_components.ModelDeployOp(
        model=train_model_op.outputs["model"],
        endpoint=create_endpoint_op.outputs["endpoint"],
        deployed_model_display_name=MODEL_NAME,
        dedicated_resources_machine_type=machine_type,
        dedicated_resources_min_replica_count=replica_count
    ).after(create_endpoint_op)
    

#### Compile and run the pipeline

In [59]:
pipeline_compiler = compiler.Compiler()
pipeline_compiler.compile(
    pipeline_func=pipeline,
    package_path=PIPELINE_PACKAGE_PATH)

In [60]:
#instantiate pipeline representation
pipeline_job = vertex_ai.PipelineJob(
    location=REGION,
    display_name=PIPELINE_NAME,
    template_path=PIPELINE_PACKAGE_PATH,
    pipeline_root=PIPELINE_ROOT,
    enable_caching=True)

In [61]:
#!gsutil uniformbucketlevelaccess set on gs://{BUCKET_NAME}

In [None]:
pipeline_job.run(sync=True)

Creating PipelineJob
PipelineJob created. Resource name: projects/446303513828/locations/europe-west4/pipelineJobs/vision-workshop-tf-pipeline-7l3oe-20221118123432
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/446303513828/locations/europe-west4/pipelineJobs/vision-workshop-tf-pipeline-7l3oe-20221118123432')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west4/pipelines/runs/vision-workshop-tf-pipeline-7l3oe-20221118123432?project=446303513828
PipelineJob projects/446303513828/locations/europe-west4/pipelineJobs/vision-workshop-tf-pipeline-7l3oe-20221118123432 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/446303513828/locations/europe-west4/pipelineJobs/vision-workshop-tf-pipeline-7l3oe-20221118123432 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/446303513828/locations/europe-west4/pipelineJobs/vision-workshop-tf-pipeline-7l3oe-20221118123432 current s

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

{TODO: Include commands to delete individual resources below}

In [None]:
# Delete endpoint resource
! gcloud ai endpoints delete $ENDPOINT_NAME --quiet --region $REGION_NAME

# Delete model resource
! gcloud ai models delete $MODEL_NAME --quiet

# Delete Cloud Storage objects that were created
! gsutil -m rm -r $JOB_DIR