# 07 - Continuous Training

After testing, compiling, and uploading the pipeline definition to Cloud Storage, the pipeline is executed with respect to a trigger. We use [Cloud Functions](https://cloud.google.com/functions) and [Cloud Pub/Sub](https://cloud.google.com/pubsub) as a triggering mechanism. The triggering can be scheduled using [Cloud Schedular](https://cloud.google.com/scheduler). The trigger source sends a message to a Cloud Pub/Sub topic that the Cloud Function listens to, and then it submits the pipeline to AI Platform Managed Pipelines to be executed.

This notebook covers the following steps:
1. Create the Cloud Pub/Sub topic.
2. Deploy the Cloud Function 
3. Test triggering a pipeline.

## Setup

In [None]:
import os
import logging
import tensorflow as tf
import tfx

logging.getLogger().setLevel(logging.INFO)

print("Tensorflow Version:", tfx.__version__)

In [None]:
PROJECT = 'ksalama-cloudml' # Change to your project Id.
REGION = 'us-central1'
BUCKET = 'ksalama-cloudml-us' # Change to your bucket.

PIPELINE_NAME = 'chicago_taxi-tips-train-pipeline'
GCS_PIPELINE_FILE_LOCATION = f'gs://{BUCKET}/ucaip_demo/chicago_taxi/complied_pipelines/{PIPELINE_NAME}.json'
PUBSUB_TOPIC = f'trigger-{PIPELINE_NAME}'
CLOUD_FUNCTION_NAME = f'trigger-{PIPELINE_NAME}-fn'
PARAMETER_NAMES='num_epochs,hidden_units,learning_rate'

## (Optional) Create a Dummy Pipeline for Testing

In [None]:
DUMMY_PIPELINE_ROOT =  f"gs://{BUCKET}/ucaip_demo/dummy_pipeline/local_runner"
PIPELINE_NAME = 'my_dummy_pipeline'
PARAMETER_NAMES = 'source_uri'

### Implement the pipeline

In [None]:
import tfx
from ml_metadata.proto import metadata_store_pb2
from tfx.components.common_nodes.importer_node import ImporterNode
from tfx.types.experimental.simple_artifacts import File

def create_dummy_pipeline(
    metadata_connection_config: metadata_store_pb2.ConnectionConfig, 
    pipeline_root: str,
    source_uri: tfx.orchestration.data_types.RuntimeParameter = 'default_dummy_path',
):
    importer = ImporterNode(
        source_uri=source_uri,
        artifact_type=File
    )
    
    return tfx.orchestration.pipeline.Pipeline(
        pipeline_name='my-dummy-pipeline',
        pipeline_root=pipeline_root,
        components=[importer],
        metadata_connection_config=metadata_connection_config
    )
    

### Run the pipeline locally

In [None]:

MLMD_SQLLITE = 'mlmd.sqllite'

print(f"artifacts location: {DUMMY_PIPELINE_ROOT}")

if tf.io.gfile.exists(DUMMY_PIPELINE_ROOT):
    print("Removing previous artifacts...")
    tf.io.gfile.rmtree(DUMMY_PIPELINE_ROOT)

if tf.io.gfile.exists(MLMD_SQLLITE):
    print("Removing local mlmd SQLite...")
    tf.io.gfile.remove(MLMD_SQLLITE)

metadata_connection_config = metadata_store_pb2.ConnectionConfig()
metadata_connection_config.sqlite.filename_uri = MLMD_SQLLITE
metadata_connection_config.sqlite.connection_mode = 3
print("ML metadata store is ready.")

In [None]:
from tfx.orchestration.local.local_dag_runner import LocalDagRunner
runner = LocalDagRunner()

dummy_pipeline = create_dummy_pipeline(
    metadata_connection_config=metadata_connection_config,
    pipeline_root=DUMMY_PIPELINE_ROOT,
    source_uri='path/to/dummpy/file.txt'
)

runner.run(dummy_pipeline)

print("Pipeline finished exection.")

### Compile the pipeline

In [None]:
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner

dummy_pipeline_definition_file = f'{PIPELINE_NAME}.json'

dummy_pipeline = create_dummy_pipeline(
    metadata_connection_config=metadata_connection_config,
    pipeline_root=DUMMY_PIPELINE_ROOT,
    source_uri=tfx.orchestration.data_types.RuntimeParameter(
        name='source_uri',
        default='path/to/default/dummy.txt',
        ptype=str,
    )
)

runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config=kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(),
    output_filename=dummy_pipeline_definition_file
)
    
runner.run(dummy_pipeline, write_out=True)

### Upload pipeline to Cloud Storage

In [None]:
GCS_PIPELINE_FILE_LOCATION = f'gs://{BUCKET}/ucaip_demo/dummy_pipeline/complied_pipelines/{PIPELINE_NAME}.json'
!gsutil cp {PIPELINE_NAME}.json {GCS_PIPELINE_FILE_LOCATION}

### Trigger the pipeline on AI Platform Managed Pipeline

In [None]:
import json
from src.pipeline_triggering import main
import base64

os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION
os.environ['GCS_PIPELINE_FILE_LOCATION'] = GCS_PIPELINE_FILE_LOCATION
os.environ['PARAMETER_NAMES'] = PARAMETER_NAMES

parameters = {
    'source_uri': 'source_uri_parameter_value',
    'unused_param': 0}

message = base64.b64encode(json.dumps(parameters).encode())
main.trigger_pipeline(
    event={'data': message},
    context=None
)

## 1. Create a Pub/Sub topic

In [None]:
!gcloud pubsub topics create {PUBSUB_TOPIC}

## 2. Deploy the Cloud Function

In [None]:
ENV_VARS=f"""\
PROJECT={PROJECT},\
REGION={REGION},\
GCS_PIPELINE_FILE_LOCATION={GCS_PIPELINE_FILE_LOCATION},\
PARAMETER_NAMES={PARAMETER_NAMES}
"""

!echo {ENV_VARS}

In [None]:
!rm -r src/pipeline_triggering/.ipynb_checkpoints

In [None]:
!gcloud functions deploy {CLOUD_FUNCTION_NAME} \
    --region={REGION} \
    --trigger-topic={PUBSUB_TOPIC} \
    --runtime=python37 \
    --source=src/pipeline_triggering\
    --entry-point=trigger_pipeline\
    --stage-bucket={BUCKET}\
    --update-env-vars={ENV_VARS}

## 3. Test Triggering the Pipeline

In [None]:
from google.cloud import pubsub
import json

publish_client = pubsub.PublisherClient()
topic = f'projects/{PROJECT}/topics/{PUBSUB_TOPIC}'
data = {
    'source_uri': 'pubsub/function/pipline'
}
message = json.dumps(data)

_ = publish_client.publish(topic, message.encode())