# RL-specific MLOps workflow

**Critical steps for an end-to-end, RL-based MLOps workflow:**

* RL-specific implementation for training and prediction
* Simulation for initial training data, prediction requests and re-training
* Closing of the feedback loop from prediction results back to training
* Customizable, reusable and shareable KFP components

### Objectives

In this notebook, we will locally build and test all components of an RL-specifc MLOps workflow

**Components of (re)train-deploy pipeline:**
* Generator to generate MovieLens simulation data
* Ingester to ingest data
* Trainer to train the RL policy
* Deployer to deploy the trained policy to a Vertex AI endpoint

**Helper modules for simulating production traffic/monitoring:**
* `Simulator` (which utilizes Cloud Functions, Cloud Scheduler and Pub/Sub) to send simulated MovieLens prediction requests, 
* `Logger` to asynchronously log prediction inputs and results (which utilizes Cloud Functions, Pub/Sub and a hook in the prediction code)
* `Trigger` to trigger recurrent re-training.

## Notebook setup

In [1]:
!pwd

/home/jupyter/jt-github/tf_vertex_agents


### set vars

In [2]:
PREFIX = 'e2ev4'

In [3]:
# creds, PROJECT_ID = google.auth.default()
GCP_PROJECTS             = !gcloud config get-value project
PROJECT_ID               = GCP_PROJECTS[0]

PROJECT_NUM              = !gcloud projects describe $PROJECT_ID --format="value(projectNumber)"
PROJECT_NUM              = PROJECT_NUM[0]

VERTEX_SA                = f'{PROJECT_NUM}-compute@developer.gserviceaccount.com'

VPC_NETWORK_NAME         = "ucaip-haystack-vpc-network"

# locations / regions for cloud resources
LOCATION                 = 'us-central1'        
REGION                   = LOCATION
BQ_LOCATION              = 'us'

print(f"PROJECT_ID       = {PROJECT_ID}")
print(f"PROJECT_NUM      = {PROJECT_NUM}")
print(f"VPC_NETWORK_NAME = {VPC_NETWORK_NAME}")
print(f"LOCATION         = {LOCATION}")
print(f"REGION           = {REGION}")
print(f"BQ_LOCATION      = {BQ_LOCATION}")

PROJECT_ID       = hybrid-vertex
PROJECT_NUM      = 934903580331
VPC_NETWORK_NAME = ucaip-haystack-vpc-network
LOCATION         = us-central1
REGION           = us-central1
BQ_LOCATION      = us


In [4]:
# GCS bucket and paths
BUCKET_NAME              = f'{PREFIX}-{PROJECT_ID}-bucket'
BUCKET_URI               = f'gs://{BUCKET_NAME}'

# Location of the MovieLens 100K dataset's "u.data" file.
RAW_DATA_PATH            = f"{BUCKET_URI}/raw_data/u.data"
DATA_PATH                = f"{BUCKET_URI}/artifacts/u.data"
ARTIFACTS_DIR            = f"{BUCKET_URI}/artifacts"

VPC_NETWORK_FULL         = f"projects/{PROJECT_NUM}/global/networks/{VPC_NETWORK_NAME}"

# BigQuery parameters (used for the Generator, Ingester, Logger)
BIGQUERY_DATASET_ID      = f"{PROJECT_ID}.movielens_dataset_{PREFIX}"
BIGQUERY_TABLE_ID        = f"{BIGQUERY_DATASET_ID}.training_dataset"

print(f"BUCKET_NAME         : {BUCKET_NAME}")
print(f"BUCKET_URI          : {BUCKET_URI}")
print(f"RAW_DATA_PATH       : {RAW_DATA_PATH}")
print(f"DATA_PATH           : {DATA_PATH}")
print(f"VPC_NETWORK_FULL    : {VPC_NETWORK_FULL}")
print(f"BIGQUERY_DATASET_ID : {BIGQUERY_DATASET_ID}")
print(f"BIGQUERY_TABLE_ID   : {BIGQUERY_TABLE_ID}")

BUCKET_NAME         : e2ev4-hybrid-vertex-bucket
BUCKET_URI          : gs://e2ev4-hybrid-vertex-bucket
RAW_DATA_PATH       : gs://e2ev4-hybrid-vertex-bucket/raw_data/u.data
DATA_PATH           : gs://e2ev4-hybrid-vertex-bucket/artifacts/u.data
VPC_NETWORK_FULL    : projects/934903580331/global/networks/ucaip-haystack-vpc-network
BIGQUERY_DATASET_ID : hybrid-vertex.movielens_dataset_e2ev4
BIGQUERY_TABLE_ID   : hybrid-vertex.movielens_dataset_e2ev4.training_dataset


### create GCS bucket

In [49]:
# create bucket
! gsutil mb -l $REGION $BUCKET_URI

In [50]:
! gsutil ls -al $BUCKET_URI

### copy sample data

In [5]:
SAMPLE_DATA_URI = "gs://cloud-samples-data/vertex-ai/community-content/tf_agents_bandits_movie_recommendation_with_kfp_and_vertex_sdk/u.data"

! gsutil cp $SAMPLE_DATA_URI $RAW_DATA_PATH

Copying gs://cloud-samples-data/vertex-ai/community-content/tf_agents_bandits_movie_recommendation_with_kfp_and_vertex_sdk/u.data [Content-Type=application/octet-stream]...
/ [1 files][  1.9 MiB/  1.9 MiB]                                                
Operation completed over 1 objects/1.9 MiB.                                      


### imports

In [6]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

In [7]:
import functools
import json
import collections
from collections import defaultdict
from typing import Callable, Dict, List, Optional, TypeVar, NamedTuple, Any
from datetime import datetime
import time
import sys

import logging
logging.disable(logging.WARNING)

import matplotlib.pyplot as plt
import numpy as np

# google cloud
from google.cloud import aiplatform, storage, bigquery

# Pipelines
from google_cloud_pipeline_components import aiplatform as gcc_aip
# from google_cloud_pipeline_components.experimental.custom_job import utils
from google_cloud_pipeline_components.v1.custom_job import utils
from google_cloud_pipeline_components.types import artifact_types

# Kubeflow SDK
# TODO: fix these
from kfp.v2 import dsl, compiler
import kfp
import kfp.v2.dsl
# from kfp.v2.google import client as pipelines_client
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component)

# tensorflow
import tensorflow as tf
from tf_agents.agents import TFAgent
from tf_agents.bandits.agents import lin_ucb_agent
from tf_agents.bandits.agents.examples.v2 import trainer
from tf_agents.bandits.environments import (environment_utilities,
                                            movielens_py_environment)
from tf_agents.bandits.metrics import tf_metrics as tf_bandit_metrics
from tf_agents.drivers import dynamic_step_driver
from tf_agents.environments import TFEnvironment, tf_py_environment
from tf_agents.eval import metric_utils
from tf_agents.metrics import tf_metrics
from tf_agents.metrics.tf_metric import TFStepMetric
from tf_agents.policies import policy_saver


if tf.__version__[0] != "2":
    raise Exception("The trainer only runs with TensorFlow version 2.")

T = TypeVar("T")

In [8]:
print(f'kfp version: {kfp.__version__}')
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"
print(f'vertex_ai SDK version: {aiplatform.__version__}')
print(f'bigquery SDK version: {bigquery.__version__}')

kfp version: 2.0.1
google_cloud_pipeline_components version: 2.0.0
vertex_ai SDK version: 1.26.0
bigquery SDK version: 3.11.1


### initialize SDK clients

In [9]:
# cloud storage client
storage_client = storage.Client(project=PROJECT_ID)

# Vertex client
aiplatform.init(project=PROJECT_ID, location=LOCATION)

# # bigquery client
bqclient = bigquery.Client(
    project=PROJECT_ID,
    # location=LOCATION
)

### set addition parameters

* `BigQuery` parameters (used for the `Generator`, `Ingester`, `Logger`)
* `Dataset` parameters (TFRecords)
* `Logger` parameters (also used for the` Logger` hook in the prediction container)

In [10]:
# BigQuery parameters
BIGQUERY_TMP_FILE = (
    "tmp.json"         # Temporary file for storing data to be loaded into BigQuery.
)
BIGQUERY_MAX_ROWS = 5  # Maximum number of rows of data in BigQuery to ingest.

# Dataset parameters
TFRECORD_FILE = (
    f"{BUCKET_URI}/trainer_input_path/data.tfrecord"  # TFRecord file to be used for training.
)

# Logger parameters 
LOGGER_PUBSUB_TOPIC = f"logger-pubsub-topic-{PREFIX}"  # Pub/Sub topic name for the Logger
LOGGER_CLOUD_FUNCTION = f"logger-cloud-function-{PREFIX}"  # Cloud Functions name for the Logger

print(f"BIGQUERY_TMP_FILE     : {BIGQUERY_TMP_FILE}")
print(f"BIGQUERY_MAX_ROWS     : {BIGQUERY_MAX_ROWS}")
print(f"TFRECORD_FILE         : {TFRECORD_FILE}")
print(f"LOGGER_PUBSUB_TOPIC   : {LOGGER_PUBSUB_TOPIC}")
print(f"LOGGER_CLOUD_FUNCTION : {LOGGER_CLOUD_FUNCTION}")

BIGQUERY_TMP_FILE     : tmp.json
BIGQUERY_MAX_ROWS     : 5
TFRECORD_FILE         : gs://e2ev4-hybrid-vertex-bucket/trainer_input_path/data.tfrecord
LOGGER_PUBSUB_TOPIC   : logger-pubsub-topic-e2ev4
LOGGER_CLOUD_FUNCTION : logger-cloud-function-e2ev4


## RL components

* Create the `Generator` to generate MovieLens simulation data
* Create the `Ingester` to ingest data
* Create the `Trainer` to train the RL policy
* Create the `Deployer` to deploy the trained policy to a Vertex AI endpoint

### Generator component

* Create the `Generator` component to generate the initial set of training data using a MovieLens simulation environment and a random data-collecting policy
* Store the generated data in BigQuery.

In [11]:
REPO_DOCKER_PATH_PREFIX = 'src'
COMPONENT_SUBDIR = "generator"

In [12]:
# Make the generator subfolder
! rm -rf {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}
! mkdir {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}

In [13]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}/generator_component.py

import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (
    Artifact, Dataset, Input, InputPath, 
    Model, Output, OutputPath, component, Metrics
)
@kfp.v2.dsl.component(
    base_image="tensorflow/tensorflow:2.12.0",
    packages_to_install=[
        "google-cloud-bigquery"
        , "tensorflow==2.12.0"
        , "tf-agents==0.16.0"
    ],
    # output_component_file="./pipelines/train_custom_model.yaml",
)
def generate_movielens_dataset_for_bigquery(
    project_id: str
    , raw_data_path: str
    , batch_size: int
    , rank_k: int
    , num_actions: int
    , driver_steps: int
    , bigquery_tmp_file: str
    , bigquery_dataset_id: str
    , bigquery_location: str
    , bigquery_table_id: str
) -> NamedTuple("Outputs", [
    ("bigquery_dataset_id", str)
    , ("bigquery_location", str)
    , ("bigquery_table_id", str)
]):
    """
    Generates BigQuery training data using a MovieLens simulation environment.

    Serves as the Generator pipeline component:
    1. Generates `trajectories.Trajectory` data by applying a random policy on
        MovieLens simulation environment.
    2. Converts `trajectories.Trajectory` data to JSON format.
    3. Loads JSON-formatted data into BigQuery.

    This function is to be built into a Kubeflow Pipelines (KFP) component. As a
    result, this function must be entirely self-contained. This means that the
    import statements and helper functions must reside within itself.

    Args:
      project_id: GCP project ID. This is required because otherwise the BigQuery
        client will use the ID of the tenant GCP project created as a result of
        KFP, which doesn't have proper access to BigQuery.
      raw_data_path: Path to MovieLens 100K's "u.data" file.
      batch_size: Batch size of environment generated quantities eg. rewards.
      rank_k: Rank for matrix factorization in the MovieLens environment; also
        the observation dimension.
      num_actions: Number of actions (movie items) to choose from.
      driver_steps: Number of steps to run per batch.
      bigquery_tmp_file: Path to a JSON file containing the training dataset.
      bigquery_dataset_id: A string of the BigQuery dataset ID in the format of
        "project.dataset".
      bigquery_location: A string of the BigQuery dataset location.
      bigquery_table_id: A string of the BigQuery table ID in the format of
        "project.dataset.table".

    Returns:
      A NamedTuple of (`bigquery_dataset_id`, `bigquery_location`,
        `bigquery_table_id`).
    """
    # pylint: disable=g-import-not-at-top
    import collections
    import json
    from typing import Any, Dict
    import logging

    from google.cloud import bigquery

    from tf_agents import replay_buffers
    from tf_agents import trajectories
    from tf_agents.bandits.agents.examples.v2 import trainer
    from tf_agents.bandits.environments import movielens_py_environment
    from tf_agents.drivers import dynamic_step_driver
    from tf_agents.environments import tf_py_environment
    from tf_agents.policies import random_tf_policy

    def generate_simulation_data(
        raw_data_path: str
        , batch_size: int
        , rank_k: int
        , num_actions: int
        , driver_steps: int
    ) -> replay_buffers.TFUniformReplayBuffer:
        """
        Generates `trajectories.Trajectory` data from the simulation environment.

        Constructs a MovieLens simulation environment, and generates a set of
        `trajectories.Trajectory` data using a random policy.

        Args:
          raw_data_path: Path to MovieLens 100K's "u.data" file.
          batch_size: Batch size of environment generated quantities eg. rewards.
          rank_k: Rank for matrix factorization in the MovieLens environment; also
            the observation dimension.
          num_actions: Number of actions (movie items) to choose from.
          driver_steps: Number of steps to run per batch.

        Returns:
          A replay buffer holding randomly generated`trajectories.Trajectory` data.
        """
        # Create MovieLens simulation environment.
        env = movielens_py_environment.MovieLensPyEnvironment(
            raw_data_path,
            rank_k,
            batch_size,
            num_movies=num_actions,
            csv_delimiter="\t"
        )
        environment = tf_py_environment.TFPyEnvironment(env)

        # Define random policy for collecting data.
        random_policy = random_tf_policy.RandomTFPolicy(
            action_spec=environment.action_spec()
            , time_step_spec=environment.time_step_spec()
        )

        # Use replay buffer and observers to keep track of Trajectory data.
        data_spec = random_policy.trajectory_spec
        replay_buffer = trainer._get_replay_buffer(
            data_spec
            , environment.batch_size
            , driver_steps
            , 1
        )
        observers = [replay_buffer.add_batch]

        # Run driver to apply the random policy in the simulation environment.
        driver = dynamic_step_driver.DynamicStepDriver(
            env=environment
            , policy=random_policy
            , num_steps=driver_steps * environment.batch_size
            , observers=observers
        )
        driver.run()

        return replay_buffer

    def build_dict_from_trajectory(
        trajectory: trajectories.Trajectory
    ) -> Dict[str, Any]:
        """
        Builds a dict from `trajectory` data.

        Args:
          trajectory: A `trajectories.Trajectory` object.

        Returns:
          A dict holding the same data as `trajectory`.
        """
        trajectory_dict = {
            "step_type": trajectory.step_type.numpy().tolist()
            , "observation": [
                {"observation_batch": batch} for batch in trajectory.observation.numpy().tolist()]
            , "action": trajectory.action.numpy().tolist()
            , "policy_info": trajectory.policy_info
            , "next_step_type": trajectory.next_step_type.numpy().tolist()
            , "reward": trajectory.reward.numpy().tolist()
            , "discount": trajectory.discount.numpy().tolist()
        }
        
        return trajectory_dict

    def write_replay_buffer_to_file(
        replay_buffer: replay_buffers.TFUniformReplayBuffer
        , batch_size: int
        , dataset_file: str
    ) -> None:
        """
        Writes replay buffer data to a file, each JSON in one line.

        Each `trajectories.Trajectory` object in `replay_buffer` will be written as
        one line to the `dataset_file` in JSON format. I.e., the `dataset_file`
        would be a newline-delimited JSON file.

        Args:
          replay_buffer: A `replay_buffers.TFUniformReplayBuffer` holding
            `trajectories.Trajectory` objects.
          batch_size: Batch size of environment generated quantities eg. rewards.
          dataset_file: File path. Will be overwritten if already exists.
        """
        dataset = replay_buffer.as_dataset(sample_batch_size=batch_size)
        dataset_size = replay_buffer.num_frames().numpy()

        with open(dataset_file, "w") as f:
            for example in dataset.take(count=dataset_size):
                traj_dict = build_dict_from_trajectory(example[0])
                f.write(json.dumps(traj_dict) + "\n")

    def load_dataset_into_bigquery(
        project_id: str
        , dataset_file: str
        , bigquery_dataset_id: str
        , bigquery_location: str
        , bigquery_table_id: str
    ) -> None:
        """
        Loads training dataset into BigQuery table.

        Loads training dataset of `trajectories.Trajectory` in newline delimited
        JSON into a BigQuery dataset and table, using a BigQuery client.

        Args:
          project_id: GCP project ID. This is required because otherwise the
            BigQuery client will use the ID of the tenant GCP project created as a
            result of KFP, which doesn't have proper access to BigQuery.
          dataset_file: Path to a JSON file containing the training dataset.
          bigquery_dataset_id: A string of the BigQuery dataset ID in the format of
            "project.dataset".
          bigquery_location: A string of the BigQuery dataset location.
          bigquery_table_id: A string of the BigQuery table ID in the format of
            "project.dataset.table".
        """
        # Construct a BigQuery client object.
        client = bigquery.Client(project=project_id)

        # Construct a full Dataset object to send to the API.
        dataset = bigquery.Dataset(bigquery_dataset_id)

        # Specify the geographic location where the dataset should reside.
        dataset.location = bigquery_location

        # Create the dataset, or get the dataset if it exists.
        dataset = client.create_dataset(dataset, exists_ok=True, timeout=30)
        _BIGQUERY_DATASET_ID = dataset.full_dataset_id
        logging.info(f"_BIGQUERY_DATASET_ID: {_BIGQUERY_DATASET_ID}")

        job_config = bigquery.LoadJobConfig(
            schema=[
                bigquery.SchemaField("step_type", "INT64", mode="REPEATED")
                , bigquery.SchemaField(
                    "observation"
                    , "RECORD"
                    , mode="REPEATED"
                    , fields=[
                        bigquery.SchemaField(
                            "observation_batch", "FLOAT64", "REPEATED"
                        )
                    ]
                )                
                , bigquery.SchemaField("action", "INT64", mode="REPEATED")
                , bigquery.SchemaField("policy_info", "FLOAT64", mode="REPEATED")
                , bigquery.SchemaField("next_step_type", "INT64", mode="REPEATED")
                , bigquery.SchemaField("reward", "FLOAT64", mode="REPEATED")
                , bigquery.SchemaField("discount", "FLOAT64", mode="REPEATED")
            ]
            , source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
        )

        with open(dataset_file, "rb") as source_file:
            load_job = client.load_table_from_file(
                source_file
                , bigquery_table_id
                , job_config=job_config
            )

        load_job.result()  # Wait for the job to complete.

    replay_buffer = generate_simulation_data(
        raw_data_path=raw_data_path
        , batch_size=batch_size
        , rank_k=rank_k
        , num_actions=num_actions
        , driver_steps=driver_steps
    )

    write_replay_buffer_to_file(
        replay_buffer=replay_buffer
        , batch_size=batch_size
        , dataset_file=bigquery_tmp_file
    )

    load_dataset_into_bigquery(
        project_id
        , bigquery_tmp_file
        , bigquery_dataset_id
        , bigquery_location
        , bigquery_table_id
    )
        
    return (
        bigquery_dataset_id
        , bigquery_location
        , bigquery_table_id
    )

Writing src/generator/generator_component.py


### Ingester component

* Create the `Ingester` component to ingest data from BigQuery, package them as `tf.train.Example` objects, and output TFRecord files
* Read more about `tf.train.Example` and TFRecord [here](https://www.tensorflow.org/tutorials/load_data/tfrecord)

In [14]:
COMPONENT_SUBDIR = "ingester"

# Make the generator subfolder
! rm -rf {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}
! mkdir {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}

In [15]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}/ingester_component.py
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The Ingester component for ingesting BigQuery data into TFRecords.
"""
import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (
    Artifact, Dataset, Input, InputPath, 
    Model, Output, OutputPath, component, Metrics
)
@kfp.v2.dsl.component(
    base_image="tensorflow/tensorflow:2.12.0",
    packages_to_install=[
        "google-cloud-bigquery"
        , "tensorflow==2.12.0"
    ],
    # output_component_file="./pipelines/train_custom_model.yaml",
)
def ingest_bigquery_dataset_into_tfrecord(
    project_id: str
    , bigquery_table_id: str
    , tfrecord_file_input: str
    , tfrecord_file: OutputPath(str)
    , bigquery_max_rows: int = None
# ) -> NamedTuple("Outputs", [
    # ("tfrecord_file", str)]
):
    """
    Ingests data from BigQuery, formats them and outputs TFRecord files.

    Serves as the Ingester pipeline component:
    1. Reads data in BigQuery that contains 7 pieces of data: `step_type`,
      `observation`, `action`, `policy_info`, `next_step_type`, `reward`,
      `discount`.
    2. Packages the data as `tf.train.Example` objects and outputs them as
      TFRecord files.

    This function is to be built into a Kubeflow Pipelines (KFP) component. As a
    result, this function must be entirely self-contained. This means that the
    import statements and helper functions must reside within itself.

    Args:
      project_id: GCP project ID. This is required because otherwise the BigQuery
        client will use the ID of the tenant GCP project created as a result of
        KFP, which doesn't have proper access to BigQuery.
      bigquery_table_id: A string of the BigQuery table ID in the format of
        "project.dataset.table".
      tfrecord_file_input: Path to file to write the ingestion result TFRecords.
      bigquery_max_rows: Optional; maximum number of rows to ingest.

    Returns:
      A NamedTuple of the path to the output TFRecord file.
    """
    # pylint: disable=g-import-not-at-top
    import logging
    import collections
    from typing import Optional
    from google.cloud import bigquery
    import tensorflow as tf

    def read_data_from_bigquery(
        project_id: str
        , bigquery_table_id: str
        , bigquery_max_rows: Optional[int]
    ) -> bigquery.table.RowIterator:
        """
        Reads data from BigQuery at `bigquery_table_id` and creates an iterator.

        The table contains 7 columns that form `trajectories.Trajectory` objects:
        `step_type`, `observation`, `action`, `policy_info`, `next_step_type`,
        `reward`, `discount`.

        Args:
          project_id: GCP project ID. This is required because otherwise the
            BigQuery client will use the ID of the tenant GCP project created as a
            result of KFP, which doesn't have proper access to BigQuery.
          bigquery_table_id: A string of the BigQuery table ID in the format of
            "project.dataset.table".
          bigquery_max_rows: Optional; maximum number of rows to fetch.

        Returns:
          A row iterator over all data at `bigquery_table_id`.
        """
        # Construct a BigQuery client object.
        client = bigquery.Client(project=project_id)

        # Get dataset.
        query_job = client.query(
            f"""
            SELECT * FROM {bigquery_table_id}
            """
        )
        table = query_job.result(max_results=bigquery_max_rows)

        return table

    def _bytes_feature(tensor: tf.Tensor) -> tf.train.Feature:
        """
        Returns a `tf.train.Feature` with bytes from `tensor`.

        Args:
          tensor: A `tf.Tensor` object.

        Returns:
          A `tf.train.Feature` object containing bytes that represent the content of
          `tensor`.
        """
        value = tf.io.serialize_tensor(tensor)
        if isinstance(value, type(tf.constant(0))):
            value = value.numpy()
        
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

    def build_example(data_row: bigquery.table.Row) -> tf.train.Example:
        """
        Builds a `tf.train.Example` from `data_row` content.

        Args:
          data_row: A `bigquery.table.Row` object that contains 7 pieces of data:
            `step_type`, `observation`, `action`, `policy_info`, `next_step_type`,
            `reward`, `discount`. Each piece of data except `observation` is a 1D
            array; `observation` is a 1D array of `{"observation_batch": 1D array}.`

        Returns:
          A `tf.train.Example` object holding the same data as `data_row`.
        """
        feature = {
            "step_type":
                _bytes_feature(data_row.get("step_type")),
            "observation":
                _bytes_feature([
                    observation["observation_batch"]
                    for observation in data_row.get("observation")
                ]),
            "action":
                _bytes_feature(data_row.get("action")),
            "policy_info":
                _bytes_feature(data_row.get("policy_info")),
            "next_step_type":
                _bytes_feature(data_row.get("next_step_type")),
            "reward":
                _bytes_feature(data_row.get("reward")),
            "discount":
                _bytes_feature(data_row.get("discount")),
        }
        example_proto = tf.train.Example(
            features=tf.train.Features(feature=feature)
        )
        return example_proto

    def write_tfrecords(
        tfrecord_file: str,
        table: bigquery.table.RowIterator
    ) -> None:
        """
        Writes the row data in `table` into TFRecords in `tfrecord_file`.

        Args:
          tfrecord_file: Path to file to write the TFRecords.
          table: A row iterator over all data to be written.
        """
        with tf.io.TFRecordWriter(tfrecord_file) as writer:
            for data_row in table:
                example = build_example(data_row)
                writer.write(example.SerializeToString())

    table = read_data_from_bigquery(
        project_id=project_id
        , bigquery_table_id=bigquery_table_id
        , bigquery_max_rows=bigquery_max_rows
    )
    logging.info(f"table: {table}")
    logging.info("writting TF Records...")

    write_tfrecords(tfrecord_file_input, table)
    
    logging.info("writting output_parameter_path...")
    with open(tfrecord_file, "w") as f:
        f.write(f"{tfrecord_file_input}")

Writing src/ingester/ingester_component.py


### Trainer component

* Create the `Trainer` component to train a RL policy on the training dataset, and then submit a remote custom training job to Vertex AI. * This component trains a policy using the TF-Agents LinUCB agent on the MovieLens simulation dataset, 
* Saves the trained policy as a SavedModel

> The Trainer performs *off-policy training*, where you train a policy on a static set of pre-collected data records containing information including observation, action and reward. For a data record, the policy in training might not output the same action given the observation in that data record.

In [16]:
COMPONENT_SUBDIR = "trainer"

# Make the generator subfolder
! rm -rf {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}
! mkdir {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}

In [17]:
# %%writefile {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}/trainer_component.py
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The Trainer component for training a policy on TFRecord files."""
import kfp
from typing import Any, Callable, Dict, NamedTuple, Optional, List
from kfp.v2.dsl import (
    Artifact, Dataset, Input, InputPath, 
    Model, Output, OutputPath, component, Metrics
)
@kfp.v2.dsl.component(
    base_image="tensorflow/tensorflow:2.12.0",
    packages_to_install=[
        "tf-agents==0.16.0"
        , "tensorflow==2.12.0"
    ],
    # output_component_file="./pipelines/train_custom_model.yaml",
)
def train_reinforcement_learning_policy(
    training_artifacts_dir_input: str
    , tfrecord_file: str
    , num_epochs: int
    , rank_k: int
    , num_actions: int
    , tikhonov_weight: float
    , agent_alpha: float
    , training_artifacts_dir: OutputPath(str)
    , training_artifact_dir: OutputPath(Artifact)
):
# ) -> NamedTuple("Outputs", [
#     ("training_artifacts_dir", str),]
    """
    Implements off-policy training for a policy on dataset of TFRecord files.

    The Trainer's task is to submit a remote training job to Vertex AI, with the
    training logic of a specified custom training container. The task will be
    handled by: `kfp.v2.google.experimental.run_as_aiplatform_custom_job` (which
    takes in the component made from this placeholder function)

    This function is to be built into a Kubeflow Pipelines (KFP) component. As a
    result, this function must be entirely self-contained. This means that the
    import statements and helper functions must reside within itself.

    Args:
      training_artifacts_dir: Path to store the Trainer artifacts (trained
        policy).
      tfrecord_file: Path to file to write the ingestion result TFRecords.
      num_epochs: Number of training epochs.
      rank_k: Rank for matrix factorization in the MovieLens environment; also
        the observation dimension.
      num_actions: Number of actions (movie items) to choose from.
      tikhonov_weight: LinUCB Tikhonov regularization weight of the Trainer.
      agent_alpha: LinUCB exploration parameter that multiplies the confidence
        intervals of the Trainer.

    Returns:
      A NamedTuple of (`training_artifacts_dir`).
    """
    # pylint: disable=g-import-not-at-top
    import collections
    from typing import Dict, List, NamedTuple  # pylint: disable=redefined-outer-name,reimported

    import tensorflow as tf

    from tf_agents import agents
    from tf_agents import policies
    from tf_agents import trajectories
    from tf_agents.bandits.agents import lin_ucb_agent
    from tf_agents.policies import policy_saver
    from tf_agents.specs import tensor_spec

    import logging

    per_arm = False  # Using the non-per-arm version of the MovieLens environment.

    # Mapping from feature name to serialized value
    feature_description = {
        "step_type": tf.io.FixedLenFeature((), tf.string)
        , "observation": tf.io.FixedLenFeature((), tf.string)
        , "action": tf.io.FixedLenFeature((), tf.string)
        , "policy_info": tf.io.FixedLenFeature((), tf.string)
        , "next_step_type": tf.io.FixedLenFeature((), tf.string)
        , "reward": tf.io.FixedLenFeature((), tf.string)
        , "discount": tf.io.FixedLenFeature((), tf.string)
    }

    def _parse_record(raw_record: tf.Tensor) -> Dict[str, tf.Tensor]:
        """
        Parses a serialized `tf.train.Example` proto.

        Args:
          raw_record: A serialized data record of a `tf.train.Example` proto.

        Returns:
          A dict mapping feature names to values as `tf.Tensor` objects of type
          string containing serialized protos, following `feature_description`.
        """
        return tf.io.parse_single_example(raw_record, feature_description)

    def build_trajectory(
        parsed_record: Dict[str, tf.Tensor]
        , policy_info: policies.utils.PolicyInfo
    ) -> trajectories.Trajectory:
        """
        Builds a `trajectories.Trajectory` object from `parsed_record`.

        Args:
          parsed_record: A dict mapping feature names to values as `tf.Tensor`
            objects of type string containing serialized protos.
          policy_info: Policy information specification.

        Returns:
          A `trajectories.Trajectory` object that contains values as de-serialized
          `tf.Tensor` objects from `parsed_record`.
        """
        return trajectories.Trajectory(
            step_type=tf.expand_dims(
                tf.io.parse_tensor(parsed_record["step_type"], out_type=tf.int32)
                , axis=1)
            , observation=tf.expand_dims(
                tf.io.parse_tensor(
                    parsed_record["observation"], out_type=tf.float32)
                , axis=1)
            , action=tf.expand_dims(
                tf.io.parse_tensor(parsed_record["action"], out_type=tf.int32)
                , axis=1)
            , policy_info=policy_info
            , next_step_type=tf.expand_dims(
                tf.io.parse_tensor(
                    parsed_record["next_step_type"], out_type=tf.int32)
                , axis=1)
            , reward=tf.expand_dims(
                tf.io.parse_tensor(parsed_record["reward"], out_type=tf.float32)
                , axis=1)
            , discount=tf.expand_dims(
                tf.io.parse_tensor(parsed_record["discount"], out_type=tf.float32)
                , axis=1)
        )

    def train_policy_on_trajectory(
        agent: agents.TFAgent
        , tfrecord_file: str
        , num_epochs: int
  ) -> NamedTuple("TrainOutputs", [
        ("policy", policies.TFPolicy)
        , ("train_loss", Dict[str, List[float]])
    ]):
        """
        Trains the policy in `agent` on the dataset of `tfrecord_file`.

        Parses `tfrecord_file` as `tf.train.Example` objects, packages them into
        `trajectories.Trajectory` objects, and trains the agent's policy on these
        trajectory objects.

        Args:
          agent: A TF-Agents agent that carries the policy to train.
          tfrecord_file: Path to the TFRecord file containing the training dataset.
          num_epochs: Number of epochs to train the policy.

        Returns:
          A NamedTuple of (a trained TF-Agents policy, a dict mapping from
          "epoch<i>" to lists of loss values produced at each training step).
        """
        raw_dataset = tf.data.TFRecordDataset([tfrecord_file])
        parsed_dataset = raw_dataset.map(_parse_record)

        train_loss = collections.defaultdict(list)
        for epoch in range(num_epochs):
            for parsed_record in parsed_dataset:
                trajectory = build_trajectory(parsed_record, agent.policy.info_spec)
                loss, _ = agent.train(trajectory)
                train_loss[f"epoch{epoch + 1}"].append(loss.numpy())

        train_outputs = collections.namedtuple(
            "TrainOutputs"
            , ["policy", "train_loss"]
        )
        return train_outputs(agent.policy, train_loss)

    def execute_training_and_save_policy(
        training_artifacts_dir: str
        , tfrecord_file: str
        , num_epochs: int
        , rank_k: int
        , num_actions: int
        , tikhonov_weight: float
        , agent_alpha: float
    ) -> None:
        """
        Executes training for the policy and saves the policy.

        Args:
          training_artifacts_dir: Path to store the Trainer artifacts (trained
            policy).
          tfrecord_file: Path to file to write the ingestion result TFRecords.
          num_epochs: Number of training epochs.
          rank_k: Rank for matrix factorization in the MovieLens environment; also
            the observation dimension.
          num_actions: Number of actions (movie items) to choose from.
          tikhonov_weight: LinUCB Tikhonov regularization weight of the Trainer.
          agent_alpha: LinUCB exploration parameter that multiplies the confidence
            intervals of the Trainer.
        """
        # Define time step and action specs for one batch.
        time_step_spec = trajectories.TimeStep(
            step_type=tensor_spec.TensorSpec(
                shape=(), dtype=tf.int32, name="step_type"
            )
            , reward=tensor_spec.TensorSpec(
                shape=(), dtype=tf.float32, name="reward"
            )
            , discount=tensor_spec.BoundedTensorSpec(
                shape=(), dtype=tf.float32, name="discount", minimum=0.
                , maximum=1.
            )
            , observation=tensor_spec.TensorSpec(
                shape=(rank_k,), dtype=tf.float32
                , name="observation"
            )
        )

        action_spec = tensor_spec.BoundedTensorSpec(
            shape=()
            , dtype=tf.int32
            , name="action"
            , minimum=0
            , maximum=num_actions - 1
        )

        # Define RL agent/algorithm.
        agent = lin_ucb_agent.LinearUCBAgent(
            time_step_spec=time_step_spec
            , action_spec=action_spec
            , tikhonov_weight=tikhonov_weight
            , alpha=agent_alpha
            , dtype=tf.float32
            , accepts_per_arm_features=per_arm
        )
        agent.initialize()
        logging.info("TimeStep Spec (for each batch):\n%s\n", agent.time_step_spec)
        logging.info("Action Spec (for each batch):\n%s\n", agent.action_spec)

        # Perform off-policy training.
        policy, _ = train_policy_on_trajectory(
            agent=agent
            , tfrecord_file=tfrecord_file
            , num_epochs=num_epochs
        )

        # Save trained policy.
        saver = policy_saver.PolicySaver(policy)
        saver.save(training_artifacts_dir_input)

    execute_training_and_save_policy(
        training_artifacts_dir=training_artifacts_dir_input
        , tfrecord_file=tfrecord_file
        , num_epochs=num_epochs
        , rank_k=rank_k
        , num_actions=num_actions
        , tikhonov_weight=tikhonov_weight
        , agent_alpha=agent_alpha
    )
    
    logging.info("writting training_artifacts_dir...")
    with open(training_artifacts_dir, "w") as f:
        f.write(f"{training_artifacts_dir_input}")
        
    logging.info("writting training_artifact_dir...")
    with open(training_artifact_dir, "w") as f:
        f.write(f"{training_artifacts_dir_input}")
    
    # return (training_artifacts_dir)

In [18]:
TRAINING_MACHINE_TYPE = 'n1-standard-4'
TRAINING_ACCELERATOR_TYPE = 'ACCELERATOR_TYPE_UNSPECIFIED'
TRAINING_ACCELERATOR_COUNT = 0

### Convert self-contained training component

In [19]:
from google_cloud_pipeline_components.v1.custom_job import \
    create_custom_training_job_from_component

custom_job_op = create_custom_training_job_from_component(
    train_reinforcement_learning_policy,
    display_name=f'custom-train-job-pipe-{PREFIX}',
    machine_type=TRAINING_MACHINE_TYPE,
    accelerator_type=TRAINING_ACCELERATOR_TYPE,
    accelerator_count=TRAINING_ACCELERATOR_COUNT,
)

In [20]:
custom_job_op

<kfp.components.yaml_component.YamlComponent at 0x7f3d189b12a0>

## Prediction Container

In [21]:
# Prediction container parameters
PREDICTION_CONTAINER = "prediction_container"
# PREDICTION_CONTAINER_DIR = "src/prediction_container"

DOCKERNAME = 'prediction'

# Make the generator subfolder
! rm -rf {REPO_DOCKER_PATH_PREFIX}/{PREDICTION_CONTAINER}
! mkdir {REPO_DOCKER_PATH_PREFIX}/{PREDICTION_CONTAINER}

### CloudBuild YAML

In [22]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PREDICTION_CONTAINER}/cloudbuild.yaml

steps:
- name: 'gcr.io/cloud-builders/docker'
  args: ['build', '-t', '$_IMAGE_URI', '$_FILE_LOCATION', '-f', '$_FILE_LOCATION/Dockerfile_$_DOCKERNAME']
  env: ['AIP_STORAGE_URI=$_ARTIFACTS_DIR', 'PROJECT_ID=$_PROJECT_ID', 'LOGGER_PUBSUB_TOPIC=$_LOGGER_PUBSUB_TOPIC']
images:
- '$_IMAGE_URI'

Writing src/prediction_container/cloudbuild.yaml


In [23]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PREDICTION_CONTAINER}/Dockerfile_{DOCKERNAME}
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.10

COPY ./ /app/

RUN pip3 install -r /app/requirements.txt

Writing src/prediction_container/Dockerfile_prediction


In [24]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PREDICTION_CONTAINER}/prestart.sh
#!/bin/bash
export PORT=$AIP_HTTP_PORT

Writing src/prediction_container/prestart.sh


In [25]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PREDICTION_CONTAINER}/requirements.txt
google-cloud-pubsub
pillow
tf-agents==0.16.0
tensorflow==2.12.0

Writing src/prediction_container/requirements.txt


In [26]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PREDICTION_CONTAINER}/main.py
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Prediction server that uses a trained policy to give predicted actions."""
import json
import os
from typing import Dict, List

import fastapi

from google.cloud import pubsub_v1

import tensorflow as tf
import tf_agents
from tf_agents import policies


app = fastapi.FastAPI()
app_vars = {"trained_policy": None}


def _startup_event() -> None:
    """
    Loads the trained policy at startup
    """
    app_vars["trained_policy"] = tf.saved_model.load(
        os.environ["AIP_STORAGE_URI"]
    )

@app.on_event("startup")
async def startup_event() -> None:
    """
    Loads the trained policy at startup
    """
    _startup_event()

def _health() -> Dict[str, str]:
    """
    Handles server health check requests.
    
    Returns:
      An empty dict.
    """
    return {}


@app.get(os.environ["AIP_HEALTH_ROUTE"], status_code=200)
def health() -> Dict[str, str]:
    """
    Handles server health check requests

    Returns:
      An empty dict.
    """
    return _health()


def _message_logger_via_pubsub(
    project_id: str
    , logger_pubsub_topic: str
    , observations: List[Dict[str, List[List[float]]]]
    , predicted_actions: List[Dict[str, List[float]]]
) -> None:
    """
    Send a message to the Pub/Sub topic which triggers the Logger.

    Package observations and the corresponding predicted actions in a message JSON
    and send to Pub/Sub topic.

    Args:
      project_id: GCP project ID.
      logger_pubsub_topic: Name of Pub/Sub topic that triggers the Logger.
      observations: List of `{"observation": <observation>}` in the prediction
        request.
      predicted_actions: List of `{"predicted_action": <predicted_action>}`
        corresponding to the observations.
    """
    # Create message with observations and predicted actions.
    message_json = json.dumps({
        "observations": observations
        , "predicted_actions": predicted_actions
    })
    message_bytes = message_json.encode("utf-8")

    # Instantiate a Pub/Sub client.
    publisher = pubsub_v1.PublisherClient()

    # Get the Logger's Pub/Sub topic.
    topic_path = publisher.topic_path(project_id, logger_pubsub_topic)

    # Send message.
    publish_future = publisher.publish(topic_path, data=message_bytes)
    publish_future.result()

def _predict(
    instances: List[Dict[str, List[List[float]]]]
    , trained_policy: policies.TFPolicy
) -> Dict[str, List[Dict[str, List[int]]]]:
    """
    Gets predictions for the observations in `instances`; triggers the Logger.

    Unpacks observations in `instances` and queries the trained policy for
    predicted actions. Triggers the Logger with observations and predicted
    actions.

    Args:
      instances: List of `{"observation": <observation>}` for which to generate
        predictions.
      trained_policy: Trained policy to generate predictions.

    Returns:
      A dict with the key "predictions" mapping to a list of predicted actions
      corresponding to each observation in the prediction request.
    """
    predictions = []
    predicted_actions = []
    for index, instance in enumerate(instances):
        # Unpack observation and reconstruct TimeStep. Rewards default to 0.
        batch_size = len(instance["observation"])
        time_step = tf_agents.trajectories.restart(
            observation=instance["observation"]
            , batch_size=tf.convert_to_tensor([batch_size]))
        policy_step = trained_policy.action(time_step)

        predicted_action = policy_step.action.numpy().tolist()
        predictions.append(
            {f"PolicyStep {index}": predicted_action}
        )
        predicted_actions.append({"predicted_action": predicted_action})

    # Trigger the Logger to log prediction inputs and results.
    _message_logger_via_pubsub(
        project_id=os.environ["PROJECT_ID"]
        , logger_pubsub_topic=os.environ["LOGGER_PUBSUB_TOPIC"]
        , observations=instances
        , predicted_actions=predicted_actions
    )
    return {"predictions": predictions}

@app.post(os.environ["AIP_PREDICT_ROUTE"])
async def predict(
    request: fastapi.Request
) -> Dict[str, List[Dict[str, List[int]]]]:
    """
    Handles prediction requests.

    Unpacks observations in prediction requests and queries the trained policy for
    predicted actions.

    Args:
      request: Incoming prediction requests that contain observations.

    Returns:
      A dict with the key "predictions" mapping to a list of predicted actions
      corresponding to each observation in the prediction request.
    """
    body = await request.json()
    instances = body["instances"]
    
    return _predict(
        instances, app_vars["trained_policy"]
    )

Writing src/prediction_container/main.py


In [27]:
# Docker definitions for training
IMAGE_URI = f'gcr.io/{PROJECT_ID}/{PREDICTION_CONTAINER}'
MACHINE_TYPE ='e2-highcpu-32'
FILE_LOCATION = f'./{REPO_DOCKER_PATH_PREFIX}/{PREDICTION_CONTAINER}'

print(f"export DOCKERNAME={DOCKERNAME}")
print(f"export IMAGE_URI={IMAGE_URI}")
print(f"export FILE_LOCATION={FILE_LOCATION}")
print(f"export MACHINE_TYPE={MACHINE_TYPE}")
print(f"export ARTIFACTS_DIR={ARTIFACTS_DIR}")

export DOCKERNAME=prediction
export IMAGE_URI=gcr.io/hybrid-vertex/prediction_container
export FILE_LOCATION=./src/prediction_container
export MACHINE_TYPE=e2-highcpu-32
export ARTIFACTS_DIR=gs://e2ev4-hybrid-vertex-bucket/artifacts


In [28]:
# ! gcloud builds submit --config {REPO_DOCKER_PATH_PREFIX}/{PREDICTION_CONTAINER}/cloudbuild.yaml -q \
#     --substitutions _DOCKERNAME=$DOCKERNAME,_IMAGE_URI=$IMAGE_URI,_FILE_LOCATION=$FILE_LOCATION,_ARTIFACTS_DIR=$ARTIFACTS_DIR,_PROJECT_ID=$PROJECT_ID,_LOGGER_PUBSUB_TOPIC=$LOGGER_PUBSUB_TOPIC \
#     --timeout=2h \
#     --machine-type=$MACHINE_TYPE

## Create and run pipeline

> Here, we build a "startup" pipeline that generates randomly sampled training data (with the Generator) as the first step. This pipeline runs only once.

### set pipeline args

In [29]:
ENABLE_CACHING = True
CPU_LIMIT = "8"    # vCPUs
MEMORY_LIMIT = "8G"

# =====================================
# Trainer parameters
# =====================================
# Root directory for training artifacts.
TRAINING_ARTIFACTS_DIR = (
    f"{BUCKET_URI}/artifacts"
)
# Type of machine & number of replica to run the custom training job
TRAINING_REPLICA_COUNT = 1
TRAINING_MACHINE_TYPE = (
    "n1-standard-4"
)
# Type and count of accelerators to run the custom training job.
TRAINING_ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
TRAINING_ACCELERATOR_COUNT = 0

# =====================================
# Deployer parameters
# =====================================
# Display name of the uploaded and deployed policy.
TRAINED_POLICY_DISPLAY_NAME = (
    "movielens-trained-policy" 
)
TRAFFIC_SPLIT = {"0": 100}
ENDPOINT_DISPLAY_NAME = "movielens-endpoint"                # Display name of the prediction endpoint.
ENDPOINT_MACHINE_TYPE = "n1-standard-4"                     # Type of machine of the prediction endpoint.
ENDPOINT_REPLICA_COUNT = 1                                  # Number of replicas of prediction endpoint.
ENDPOINT_ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"  # Type of accelerators for prediction endpoint
ENDPOINT_ACCELERATOR_COUNT = 0                              # Number of accelerators for prediction endpoint

print(f"ENABLE_CACHING              : {ENABLE_CACHING}")
print(f"CPU_LIMIT                   : {CPU_LIMIT}")
print(f"MEMORY_LIMIT                : {MEMORY_LIMIT}")
print(f"TRAINING_ARTIFACTS_DIR      : {TRAINING_ARTIFACTS_DIR}")
print(f"TRAINING_REPLICA_COUNT      : {TRAINING_REPLICA_COUNT}")
print(f"TRAINING_MACHINE_TYPE       : {TRAINING_MACHINE_TYPE}")
print(f"TRAINING_ACCELERATOR_TYPE   : {TRAINING_ACCELERATOR_TYPE}")
print(f"TRAINING_ACCELERATOR_COUNT  : {TRAINING_ACCELERATOR_COUNT}")
print(f"TRAINED_POLICY_DISPLAY_NAME : {TRAINED_POLICY_DISPLAY_NAME}")
print(f"TRAFFIC_SPLIT               : {TRAFFIC_SPLIT}")
print(f"ENDPOINT_DISPLAY_NAME       : {ENDPOINT_DISPLAY_NAME}")
print(f"ENDPOINT_MACHINE_TYPE       : {ENDPOINT_MACHINE_TYPE}")
print(f"ENDPOINT_REPLICA_COUNT      : {ENDPOINT_REPLICA_COUNT}")
print(f"ENDPOINT_ACCELERATOR_TYPE   : {ENDPOINT_ACCELERATOR_TYPE}")
print(f"ENDPOINT_ACCELERATOR_COUNT  : {ENDPOINT_ACCELERATOR_COUNT}")

ENABLE_CACHING              : True
CPU_LIMIT                   : 8
MEMORY_LIMIT                : 8G
TRAINING_ARTIFACTS_DIR      : gs://e2ev4-hybrid-vertex-bucket/artifacts
TRAINING_REPLICA_COUNT      : 1
TRAINING_MACHINE_TYPE       : n1-standard-4
TRAINING_ACCELERATOR_TYPE   : ACCELERATOR_TYPE_UNSPECIFIED
TRAINING_ACCELERATOR_COUNT  : 0
TRAINED_POLICY_DISPLAY_NAME : movielens-trained-policy
TRAFFIC_SPLIT               : {'0': 100}
ENDPOINT_DISPLAY_NAME       : movielens-endpoint
ENDPOINT_MACHINE_TYPE       : n1-standard-4
ENDPOINT_REPLICA_COUNT      : 1
ENDPOINT_ACCELERATOR_TYPE   : ACCELERATOR_TYPE_UNSPECIFIED
ENDPOINT_ACCELERATOR_COUNT  : 0


In [36]:
PIPELINE_VERSION = 'v17'

PIPELINE_TAG = f'movielens-pipeline-startup-{PIPELINE_VERSION}'
print("PIPELINE_TAG:", PIPELINE_TAG)

PIPELINE_NAME = f'tfab-{PREFIX}-{PIPELINE_TAG}'.replace('_', '-')
print("PIPELINE_NAME:", PIPELINE_NAME)

PIPELINE_TAG: movielens-pipeline-startup-v17
PIPELINE_NAME: tfab-e2ev4-movielens-pipeline-startup-v17


In [37]:
import kfp
from kfp.v2 import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component)

from src.generator import generator_component
from src.ingester import ingester_component
# from src.trainer import trainer_component

@kfp.v2.dsl.pipeline(
    name=f'{PIPELINE_NAME}'.replace('_', '-')
)
def pipeline(
    # Pipeline configs
    project_id: str,
    raw_data_path: str,
    training_artifacts_dir: str,
    pipeline_version: str,
    # BigQuery configs
    bigquery_dataset_id: str,
    bigquery_location: str,
    bigquery_table_id: str,
    bigquery_max_rows: int = 10000,
    # TF-Agents RL configs
    batch_size: int = 8,
    rank_k: int = 20,
    num_actions: int = 20,
    driver_steps: int = 3,
    num_epochs: int = 5,
    tikhonov_weight: float = 0.01,
    agent_alpha: float = 10,
):
    """Authors a RL pipeline for MovieLens movie recommendation system.

    Integrates the Generator, Ingester, Trainer and Deployer components. This
    pipeline generates initial training data with a random policy and runs once
    as the initiation of the system.

    Args:
      project_id: GCP project ID. This is required because otherwise the BigQuery
        client will use the ID of the tenant GCP project created as a result of
        KFP, which doesn't have proper access to BigQuery.
      raw_data_path: Path to MovieLens 100K's "u.data" file.
      training_artifacts_dir: Path to store the Trainer artifacts (trained policy).

      bigquery_dataset: A string of the BigQuery dataset ID in the format of
        "project.dataset".
      bigquery_location: A string of the BigQuery dataset location.
      bigquery_table_id: A string of the BigQuery table ID in the format of
        "project.dataset.table".
      bigquery_max_rows: Optional; maximum number of rows to ingest.

      batch_size: Optional; batch size of environment generated quantities eg.
        rewards.
      rank_k: Optional; rank for matrix factorization in the MovieLens environment;
        also the observation dimension.
      num_actions: Optional; number of actions (movie items) to choose from.
      driver_steps: Optional; number of steps to run per batch.
      num_epochs: Optional; number of training epochs.
      tikhonov_weight: Optional; LinUCB Tikhonov regularization weight of the
        Trainer.
      agent_alpha: Optional; LinUCB exploration parameter that multiplies the
        confidence intervals of the Trainer.
    """
    
    # ========================================================================
    # generator
    # ========================================================================
    
    generate_task = (
        generator_component.generate_movielens_dataset_for_bigquery(
            project_id = project_id
            , raw_data_path = raw_data_path
            , batch_size = batch_size
            , rank_k = rank_k
            , num_actions = num_actions
            , driver_steps = driver_steps
            , bigquery_tmp_file = BIGQUERY_TMP_FILE
            , bigquery_dataset_id = bigquery_dataset_id
            , bigquery_location = bigquery_location
            , bigquery_table_id = bigquery_table_id
        )
        .set_display_name("generator")
        .set_caching_options(False)
    )
    
    # ========================================================================
    # ingester
    # ========================================================================
    
    ingest_task = (
        ingester_component.ingest_bigquery_dataset_into_tfrecord(
            project_id = project_id
            , bigquery_table_id = generate_task.outputs['bigquery_table_id']
            , tfrecord_file_input = TFRECORD_FILE
            , bigquery_max_rows = bigquery_max_rows
        )
        .set_display_name("ingester")
        .set_caching_options(True)
    )
            
    # ========================================================================
    # trainer
    # ========================================================================

    train_task = (
        custom_job_op(
            training_artifacts_dir_input=training_artifacts_dir
            , tfrecord_file=ingest_task.outputs["tfrecord_file"]
            , num_epochs=num_epochs
            , rank_k=rank_k
            , num_actions=num_actions
            , tikhonov_weight=tikhonov_weight
            , agent_alpha=agent_alpha
            , project=PROJECT_ID
            , location=REGION
        )
        .set_display_name("trainer")
        .set_caching_options(True)
    )
            
    # ========================================================================
    # deployer
    # ========================================================================
    
    # Upload the trained policy as a model.
    model_upload_op = (
        gcc_aip.ModelUploadOp(
            project=project_id
            , display_name=TRAINED_POLICY_DISPLAY_NAME
            , artifact_uri=train_task.outputs["training_artifacts_dir"]
            , serving_container_image_uri=f"gcr.io/{PROJECT_ID}/{PREDICTION_CONTAINER}:latest"
        )
        .set_display_name("register model")
        .set_caching_options(True)
    )
    
    # Create a Vertex AI endpoint
    endpoint_create_op = (
        gcc_aip.EndpointCreateOp(
            project=project_id, display_name=ENDPOINT_DISPLAY_NAME
        )
        .set_display_name("create endpoint")
        .set_caching_options(True)
    )
    
    # Deploy the uploaded, trained policy to the created endpoint
    deploy_model_op = (
        gcc_aip.ModelDeployOp(
            endpoint=endpoint_create_op.outputs["endpoint"]
            , model=model_upload_op.outputs["model"]
            , deployed_model_display_name=TRAINED_POLICY_DISPLAY_NAME
            , traffic_split=TRAFFIC_SPLIT
            , dedicated_resources_machine_type=ENDPOINT_MACHINE_TYPE
            , dedicated_resources_accelerator_type=ENDPOINT_ACCELERATOR_TYPE
            , dedicated_resources_accelerator_count=ENDPOINT_ACCELERATOR_COUNT
            , dedicated_resources_min_replica_count=ENDPOINT_REPLICA_COUNT
        )
        .set_display_name("deploy model")
    )

### compile pipeline

In [38]:
# ! rm -f custom_container_pipeline_spec.json

PIPELINE_JSON_SPEC_LOCAL = "custom_pipeline_spec.json"

! rm -f $PIPELINE_JSON_SPEC_LOCAL

kfp.v2.compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=PIPELINE_JSON_SPEC_LOCAL,
)

### save pipeline spec json

In [39]:
# !gsutil cp custom_container_pipeline_spec.json $PIPELINE_ROOT_PATH/pipeline_spec.json

PIPELINE_ROOT = f"{BUCKET_URI}/pipeline"
print("PIPELINE_ROOT:", PIPELINE_ROOT)

PIPELINES_FILEPATH = f'{PIPELINE_ROOT}/pipeline_spec.json'
print("PIPELINES_FILEPATH:", PIPELINES_FILEPATH)

!gsutil -q cp $PIPELINE_JSON_SPEC_LOCAL $PIPELINES_FILEPATH

PIPELINE_ROOT: gs://e2ev4-hybrid-vertex-bucket/pipeline
PIPELINES_FILEPATH: gs://e2ev4-hybrid-vertex-bucket/pipeline/pipeline_spec.json


In [40]:
!gsutil ls $PIPELINE_ROOT

gs://e2ev4-hybrid-vertex-bucket/pipeline/pipeline_spec.json
gs://e2ev4-hybrid-vertex-bucket/pipeline/934903580331/


## Submit pipeline to Vertex

In [41]:
# Create a pipeline run job.
job = aiplatform.PipelineJob(
    display_name=f"{PIPELINE_NAME}-startup"
    , template_path=PIPELINE_JSON_SPEC_LOCAL
    , pipeline_root=PIPELINE_ROOT
    , failure_policy='fast'
    , parameter_values={
        # Pipeline configs
        "project_id": PROJECT_ID
        , "raw_data_path": RAW_DATA_PATH
        , "training_artifacts_dir": TRAINING_ARTIFACTS_DIR
        , "pipeline_version" : PIPELINE_VERSION
        # BigQuery configs
        , "bigquery_dataset_id": BIGQUERY_DATASET_ID
        , "bigquery_location": BQ_LOCATION
        , "bigquery_table_id": BIGQUERY_TABLE_ID
    }
    , enable_caching=ENABLE_CACHING,
)

job.run(
    sync=False
    , service_account=VERTEX_SA
    , network=VPC_NETWORK_FULL
)

## Create Simulator 

* sends simulated MovieLens prediction requests

Create the Simulator to [obtain observations](https://github.com/tensorflow/agents/blob/v0.8.0/tf_agents/bandits/environments/movielens_py_environment.py#L118-L125) from the MovieLens simulation environment, formats them, and sends prediction requests to the Vertex AI endpoint.

The workflow is: Cloud Scheduler --> Pub/Sub --> Cloud Functions --> Endpoint

In production, this Simulator logic can be modified to that of gathering real-world input features as observations, getting prediction results from the endpoint and communicating those results to real-world users.

In [42]:
# Simulator parameters
SIMULATOR_PUBSUB_TOPIC = (
    "simulator-pubsub-topic"  # Pub/Sub topic name for the Simulator.
)
SIMULATOR_CLOUD_FUNCTION = (
    "simulator-cloud-function"  # Cloud Functions name for the Simulator.
)
SIMULATOR_SCHEDULER_JOB = (
    "simulator-scheduler-job"  # Cloud Scheduler cron job name for the Simulator.
)
SIMULATOR_SCHEDULE = "*/5 * * * *"  # Cloud Scheduler cron job schedule for the Simulator. Eg. "*/5 * * * *" means every 5 mins.
SIMULATOR_SCHEDULER_MESSAGE = (
    "simulator-message"  # Cloud Scheduler message for the Simulator.
)
# TF-Agents RL configs
BATCH_SIZE = 8
RANK_K = 20
NUM_ACTIONS = 20

In [43]:
COMPONENT_SUBDIR = "simulator"

# Make the generator subfolder
! rm -rf {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}
! mkdir {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}

In [44]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}/main.py
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The Simulator component for sending recurrent prediction requests."""
import logging
import os
from typing import Any, Dict

import dataclasses
from google import cloud  # For patch of google.cloud.aiplatform to work.
from google.cloud import aiplatform  # For using the module.  # pylint: disable=unused-import
import tensorflow as tf  # For tf_agents to work.  # pylint: disable=unused-import
from tf_agents.bandits.environments import movielens_py_environment


@dataclasses.dataclass
class EnvVars:
    """A class containing environment variables and their values.

    Attributes:
      project_id: A string of the GCP project ID.
      region: A string of the GCP service region.
      endpoint_id: A string of the Vertex AI endpoint ID.
      raw_data_path: A string of the path to MovieLens 100K's "u.data" file.
      rank_k: An integer of the rank for matrix factorization in the MovieLens
        environment; also the observation dimension.
      batch_size: A integer of the batch size of environment generated quantities.
      num_actions: A integer of the number of actions (movie items) to choose
        from.
    """
    project_id: str
    region: str
    endpoint_id: str
    raw_data_path: str
    rank_k: int
    batch_size: int
    num_actions: int


def get_env_vars() -> EnvVars:
    """
    Gets a set of environment variables necessary for `simulate`.

    Returns:
      A `EnvVars` of environment variables for configuring `simulate`.
    """
    return EnvVars(
        project_id=os.getenv("PROJECT_ID")
        , region=os.getenv("REGION")
        , endpoint_id=os.getenv("ENDPOINT_ID")
        , raw_data_path=os.getenv("RAW_DATA_PATH")
        , rank_k=int(os.getenv("RANK_K"))
        , batch_size=int(os.getenv("BATCH_SIZE"))
        , num_actions=int(os.getenv("NUM_ACTIONS"))
    )


def simulate(event: Dict[str, Any], context) -> None:  # pylint: disable=unused-argument
    """
    Gets observations and sends prediction requests to endpoints.

    Queries the MovieLens simulation environment for observations and sends
    prediction requests with the observations to the Vertex endpoint.

    Serves as the Simulator and the entrypoint of Cloud Functions.

    Note: In production, this function can be modified to hold the logic of
    gathering real-world input features as observations, getting prediction
    results from the endpoint and communicating those results to real-world
    users.

    Args:
      event: Triggering event of this function.
      context: Trigerring context of this function.
        This is of type `functions_v1.context.Context` but not specified since
        it is not importable for a local environment that wants to run unit
        tests.
    """
    env_vars = get_env_vars()

    # Create MovieLens simulation environment.
    env = movielens_py_environment.MovieLensPyEnvironment(
        env_vars.raw_data_path, env_vars.rank_k, env_vars.batch_size
        , num_movies=env_vars.num_actions, csv_delimiter="\t"
    )

    # Get environment observation.
    observation_array = env._observe()  # pylint: disable=protected-access
    # Convert to nested list to be sent to the endpoint for prediction.
    observation = [
        list(observation_batch) for observation_batch in observation_array
    ]

    cloud.aiplatform.init(
        project=env_vars.project_id, location=env_vars.region
    )
    endpoint = cloud.aiplatform.Endpoint(env_vars.endpoint_id)

    # Send prediction request to endpoint and get prediction result.
    predictions = endpoint.predict(
        instances=[
            {"observation": observation},
        ]
    )

    logging.info("prediction result: %s", predictions[0])
    logging.info("prediction model ID: %s", predictions[1])

Writing src/simulator/main.py


In [45]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}/requirements.txt
dataclasses
google-cloud-aiplatform
tensorflow==2.12.0
pillow
tf-agents==0.16.0

Writing src/simulator/requirements.txt


### Create Pub/Sub topic
* Read more about creating Pub/Sub topics [here](https://cloud.google.com/functions/docs/tutorials/pubsub)

In [46]:
! gcloud pubsub topics create $SIMULATOR_PUBSUB_TOPIC

Created topic [projects/hybrid-vertex/topics/simulator-pubsub-topic].


### Set up a recurrent Cloud Scheduler job for the Pub/Sub topic
* Read more about possible ways to create cron jobs [here](https://cloud.google.com/scheduler/docs/creating#gcloud).
* Read about the cron job schedule format [here](https://man7.org/linux/man-pages/man5/crontab.5.html)

In [47]:
scheduler_job_args = " ".join(
    [
        SIMULATOR_SCHEDULER_JOB,
        f"--schedule='{SIMULATOR_SCHEDULE}'",
        f"--topic={SIMULATOR_PUBSUB_TOPIC}",
        f"--message-body={SIMULATOR_SCHEDULER_MESSAGE}",
    ]
)

! echo $scheduler_job_args

simulator-scheduler-job --schedule=*/5 * * * * --topic=simulator-pubsub-topic --message-body=simulator-message


In [50]:
! gcloud scheduler jobs create pubsub $scheduler_job_args --location=$REGION

name: projects/hybrid-vertex/locations/us-central1/jobs/simulator-scheduler-job
pubsubTarget:
  data: c2ltdWxhdG9yLW1lc3NhZ2U=
  topicName: projects/hybrid-vertex/topics/simulator-pubsub-topic
retryConfig:
  maxBackoffDuration: 3600s
  maxDoublings: 16
  maxRetryDuration: 0s
  minBackoffDuration: 5s
schedule: '*/5 * * * *'
state: ENABLED
timeZone: Etc/UTC
userUpdateTime: '2023-07-05T10:48:05Z'


### Define Simulator logic in Cloud Function to be triggered periodically, deploy this Function

* Specify dependencies of the Function in `src/simulator/requirements.txt`.
* Read more about the available configurable arguments for deploying a Function [here](https://cloud.google.com/sdk/gcloud/reference/functions/deploy). For instance, based on the complexity of your Function, you may want to adjust its memory and timeout.
* Note that the environment variables in `ENV_VARS` should be comma-separated; there should not be additional spaces, or other characters in between. Read more about setting/updating/deleting environment variables [here](https://cloud.google.com/functions/docs/env-var).
* Read more about sending predictions to Vertex endpoints [here](https://cloud.google.com/vertex-ai/docs/predictions/online-predictions-custom-models)

In [51]:
endpoints = ! gcloud ai endpoints list --region=$REGION \
    --filter=display_name=$ENDPOINT_DISPLAY_NAME

print("\n".join(endpoints), "\n")

ENDPOINT_ID = endpoints[2].split(" ")[0]
print(f"ENDPOINT_ID={ENDPOINT_ID}")

Using endpoint [https://us-central1-aiplatform.googleapis.com/]
ENDPOINT_ID          DISPLAY_NAME
1283028914544836608  movielens-endpoint 

ENDPOINT_ID=1283028914544836608


In [52]:
ENV_VARS = ",".join(
    [
        f"PROJECT_ID={PROJECT_ID}",
        f"REGION={REGION}",
        f"ENDPOINT_ID={ENDPOINT_ID}",
        f"RAW_DATA_PATH={RAW_DATA_PATH}",
        f"BATCH_SIZE={BATCH_SIZE}",
        f"RANK_K={RANK_K}",
        f"NUM_ACTIONS={NUM_ACTIONS}",
    ]
)

! echo $ENV_VARS

PROJECT_ID=hybrid-vertex,REGION=us-central1,ENDPOINT_ID=1283028914544836608,RAW_DATA_PATH=gs://e2ev4-hybrid-vertex-bucket/raw_data/u.data,BATCH_SIZE=8,RANK_K=20,NUM_ACTIONS=20


In [56]:
! gcloud functions deploy $SIMULATOR_CLOUD_FUNCTION --ingress-settings=all \
    --region=$REGION \
    --trigger-topic=$SIMULATOR_PUBSUB_TOPIC \
    --runtime=python310 \
    --memory=2048MB \
    --timeout=200s \
    --source=src/simulator \
    --entry-point=simulate \
    --stage-bucket=$BUCKET_NAME \
    --update-env-vars=$ENV_VARS

Deploying function (may take a while - up to 2 minutes)...⠹                    
For Cloud Build Logs, visit: https://console.cloud.google.com/cloud-build/builds;region=us-central1/16b62bcf-69ab-47b3-af80-24084472b02e?project=934903580331
Deploying function (may take a while - up to 2 minutes)...done.                
availableMemoryMb: 512
buildId: 16b62bcf-69ab-47b3-af80-24084472b02e
buildName: projects/934903580331/locations/us-central1/builds/16b62bcf-69ab-47b3-af80-24084472b02e
dockerRegistry: CONTAINER_REGISTRY
entryPoint: simulate
environmentVariables:
  BATCH_SIZE: '8'
  ENDPOINT_ID: '1283028914544836608'
  NUM_ACTIONS: '20'
  PROJECT_ID: hybrid-vertex
  RANK_K: '20'
  RAW_DATA_PATH: gs://e2ev4-hybrid-vertex-bucket/raw_data/u.data
  REGION: us-central1
eventTrigger:
  eventType: google.pubsub.topic.publish
  failurePolicy: {}
  resource: projects/hybrid-vertex/topics/simulator-pubsub-topic
  service: pubsub.googleapis.com
ingressSettings: ALLOW_ALL
labels:
  deployment-tool: cli-

## Create Logger

> asynchronously log prediction inputs and results

**The Logger** 
* gets environment feedback as rewards from the MovieLens simulation environment based on prediction observations and predicted actions, formulates trajectory data, and stores said data back to BigQuery 
* closes the RL feedback loop from prediction to training data, and allows re-training of the policy on new training data
* triggered by a hook in the prediction code. At each prediction request, the prediction code messages a Pub/Sub topic, which triggers the Logger code.

The workflow is: prediction container code (at prediction request) --> Pub/Sub --> Cloud Functions (logging predictions back to BigQuery)

In production, this Logger logic can be modified to that of gathering real-world feedback (rewards) based on observations and predicted actions

In [57]:
COMPONENT_SUBDIR = "logger"

# Make the generator subfolder
! rm -rf {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}
! mkdir {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}

In [58]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}/main.py
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The Logger component for logging prediction inputs and results."""
import base64
import dataclasses
import json
import os
import tempfile
from typing import Any, Dict, List

from google.cloud import bigquery
import tensorflow as tf
from tf_agents import trajectories
from tf_agents.bandits.environments import movielens_py_environment
from tf_agents.environments import tf_py_environment


@dataclasses.dataclass
class EnvVars:
    """
    A class containing environment variables and their values.

    Attributes:
      project_id: A string of the GCP project ID.
      raw_data_path: A string of the path to MovieLens 100K's "u.data" file.
      batch_size: A integer of the batch size of environment generated quantities.
      rank_k: An integer of the rank for matrix factorization in the MovieLens
        environment; also the observation dimension.
      num_actions: A integer of the number of actions (movie items) to choose
        from.
      bigquery_tmp_file: Path to a JSON file containing the training dataset.
      bigquery_dataset_id: A string of the BigQuery dataset ID as
        `project_id.dataset_id`.
      bigquery_location: A string of the BigQuery dataset region.
      bigquery_table_id: A string of the BigQuery table ID as
        `project_id.dataset_id.table_id`.
    """
    project_id: str
    raw_data_path: str
    batch_size: int
    rank_k: int
    num_actions: int
    bigquery_tmp_file: str
    bigquery_dataset_id: str
    bigquery_location: str
    bigquery_table_id: str


def get_env_vars() -> EnvVars:
    """
    Gets a set of environment variables necessary for `log`.

    Returns:
      A `EnvVars` of environment variables for configuring `log`.
    """
    return EnvVars(
        project_id=os.getenv("PROJECT_ID"),
        raw_data_path=os.getenv("RAW_DATA_PATH"),
        batch_size=int(os.getenv("BATCH_SIZE")),
        rank_k=int(os.getenv("RANK_K")),
        num_actions=int(os.getenv("NUM_ACTIONS")),
        bigquery_tmp_file=os.getenv("BIGQUERY_TMP_FILE"),
        bigquery_dataset_id=os.getenv("BIGQUERY_DATASET_ID"),
        bigquery_location=os.getenv("BIGQUERY_LOCATION"),
        bigquery_table_id=os.getenv("BIGQUERY_TABLE_ID")
    )


def replace_observation_in_time_step(
    original_time_step: trajectories.TimeStep
    , observation: tf.Tensor
) -> trajectories.TimeStep:
    """
    Returns a `trajectories.TimeStep` with the observation field replaced.

    Args:
      original_time_step: The original `trajectories.TimeStep` in which the
        `observation` will be filled in.
      observation: A single, batched observation.

    Returns:
      A `trajectories.TimeStep` with `observation` filled into
      `original_time_step`.
    """
    return trajectories.TimeStep(
        step_type=original_time_step[0]
        , reward=original_time_step[1]
        , discount=original_time_step[2]
        , observation=observation
    )


def get_trajectory_from_environment(
    environment: tf_py_environment.TFPyEnvironment
    , observation: List[List[float]]
    , predicted_action: int
) -> trajectories.Trajectory:
    """
    Gets trajectory data from `environment` based on observation and action.

    Aligns `environment` observation to `observation` so that its feedback align
    with `observation`. The `trajectories.Trajectory` object contains time step
    information before and after applying `predicted_action` and feedback in the
    form of a reward.

    In production, this function can be replaced to actually pull feedback from
    some real-world environment.

    Args:
      environment: A TF-Agents environment that holds observations, apply actions
        and returns rewards.
      observation: A single, batched observation.
      predicted_action: A predicted action corresponding to the observation.

    Returns:
      A dict holding the same data as `trajectory`.
    """
    environment.reset()

    # Align environment to observation.
    original_time_step = environment.current_time_step()
    time_step = replace_observation_in_time_step(original_time_step, observation)
    environment._time_step = time_step  # pylint: disable=protected-access

    # Apply predicted action to environment.
    environment.step(action=predicted_action)

    # Get next time step.
    next_time_step = environment.current_time_step()

    # Get trajectory as an encapsulation of all feedback from the environment.
    trajectory = trajectories.from_transition(
        time_step=time_step,
        action_step=trajectories.PolicyStep(action=predicted_action),
        next_time_step=next_time_step
    )
    return trajectory


def build_dict_from_trajectory(
    trajectory: trajectories.Trajectory) -> Dict[str, Any]:
    """
    Builds a dict from `trajectory` data.

    Args:
      trajectory: A `trajectories.Trajectory` object.

    Returns:
      A dict holding the same data as `trajectory`.
    """
    trajectory_dict = {
        "step_type": trajectory.step_type.numpy().tolist()
        , "observation": [
            {"observation_batch": batch} for batch in trajectory.observation.numpy().tolist()
        ]
        , "action": trajectory.action.numpy().tolist()
        , "policy_info": trajectory.policy_info
        , "next_step_type": trajectory.next_step_type.numpy().tolist()
        , "reward": trajectory.reward.numpy().tolist()
        , "discount": trajectory.discount.numpy().tolist()
    }
    return trajectory_dict


def write_trajectories_to_file(
    dataset_file: str,
    environment: tf_py_environment.TFPyEnvironment,
    observations: List[Dict[str, List[List[float]]]],
    predicted_actions: List[Dict[str, List[float]]]) -> None:
    """
    Writes trajectory data to a file, each JSON in one line.

    Gets `trajectories.Trajectory` objects that encapsulate environment
    feedback eg. rewards based on `observations` and `predicted_actions`.
    Each `trajectories.Trajectory` object gets written as one line to
    `dataset_file` in JSON format. I.e., the `dataset_file` would be a
    newline-delimited JSON file.

    Args:
      dataset_file: Path to a JSON file containing the training dataset.
      environment: A TF-Agents environment that holds observations, apply actions
        and returns rewards.
      observations: List of `{"observation": <observation>}` in the prediction
        request.
      predicted_actions: List of `{"predicted_action": <predicted_action>}`
        corresponding to the observations.
    """
    with open(dataset_file, "w") as f:
        for observation, predicted_action in zip(observations, predicted_actions):
            trajectory = get_trajectory_from_environment(
                environment=environment
                , observation=tf.constant(observation["observation"])
                , predicted_action=tf.constant(predicted_action["predicted_action"]))
            trajectory_dict = build_dict_from_trajectory(trajectory)
            f.write(json.dumps(trajectory_dict) + "\n")


def append_dataset_to_bigquery(
    project_id: str,
    dataset_file: str,
    bigquery_dataset_id: str,
    bigquery_location: str,
    bigquery_table_id: str) -> None:
    """
    Appends training dataset to BigQuery table.

    Appends training dataset of `trajectories.Trajectory` in newline delimited
    JSON to a BigQuery dataset and table, using a BigQuery client.

    Args:
      project_id: GCP project ID. This is required because otherwise the
        BigQuery client will use the ID of the tenant GCP project created as a
        result of KFP, which doesn't have proper access to BigQuery.
      dataset_file: Path to a JSON file containing the training dataset.
      bigquery_dataset_id: A string of the BigQuery dataset ID in the format of
        "project.dataset".
      bigquery_location: A string of the BigQuery dataset location.
      bigquery_table_id: A string of the BigQuery table ID in the format of
        "project.dataset.table".
    """
    # Construct a BigQuery client object.
    client = bigquery.Client(project=project_id)

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(bigquery_dataset_id)

    # Specify the geographic location where the dataset should reside.
    dataset.location = bigquery_location

    # Create the dataset, or get the dataset if it exists.
    dataset = client.create_dataset(dataset, exists_ok=True, timeout=30)

    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND
        , schema=[
            bigquery.SchemaField("step_type", "INT64", mode="REPEATED")
            , bigquery.SchemaField(
                "observation"
                , "RECORD"
                , mode="REPEATED"
                , fields=[
                    bigquery.SchemaField(
                        "observation_batch", "FLOAT64", "REPEATED"
                    )
                ]
            )
            , bigquery.SchemaField("action", "INT64", mode="REPEATED")
            , bigquery.SchemaField("policy_info", "FLOAT64", mode="REPEATED")
            , bigquery.SchemaField("next_step_type", "INT64", mode="REPEATED")
            , bigquery.SchemaField("reward", "FLOAT64", mode="REPEATED")
            , bigquery.SchemaField("discount", "FLOAT64", mode="REPEATED")
        ]
        , source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    )

    with open(dataset_file, "rb") as source_file:
        load_job = client.load_table_from_file(
            source_file, bigquery_table_id, job_config=job_config
        )

    load_job.result()  # Wait for the job to complete.


def log_prediction_to_bigquery(event: Dict[str, Any], context) -> None:  # pylint: disable=unused-argument
    """
    Logs prediction inputs and results to BigQuery.

    Queries the MovieLens simulation environment for rewards and other info based
    on observations and predicted actions, and logs trajectory data to BigQuery.

    Serves as the Logger and the entrypoint of Cloud Functions. The Logger closes
    the feedback loop from prediction results to training data, and allows
    re-training of the policy with new training data.

    Note: In production, this function can be modified to hold the logic of
    gathering real-world feedback for observations and predicted actions,
    formulating trajectory data, and storing back into BigQuery.

    Args:
      event: Triggering event of this function.
      context: Trigerring context of this function.
        This is of type `functions_v1.context.Context` but not specified since
        it is not importable for a local environment that wants to run unit
        tests.
    """
    env_vars = get_env_vars()
    # Get a file path with permission for writing.
    dataset_file = os.path.join(tempfile.gettempdir(), env_vars.bigquery_tmp_file)

    data_bytes = base64.b64decode(event["data"])
    data_json = data_bytes.decode("utf-8")
    data = json.loads(data_json)
    observations = data["observations"]
    predicted_actions = data["predicted_actions"]

    # Create MovieLens simulation environment.
    env = movielens_py_environment.MovieLensPyEnvironment(
        env_vars.raw_data_path
        , env_vars.rank_k
        , env_vars.batch_size
        , num_movies=env_vars.num_actions
        , csv_delimiter="\t"
    )
    environment = tf_py_environment.TFPyEnvironment(env)

    # Get environment feedback and write trajectory data.
    write_trajectories_to_file(
        dataset_file=dataset_file
        , environment=environment
        , observations=observations
        , predicted_actions=predicted_actions
    )

    # Add trajectory data as new training data to BigQuery.
    append_dataset_to_bigquery(
        project_id=env_vars.project_id
        , dataset_file=dataset_file
        , bigquery_dataset_id=env_vars.bigquery_dataset_id
        , bigquery_location=env_vars.bigquery_location
        , bigquery_table_id=env_vars.bigquery_table_id
    )

Writing src/logger/main.py


In [59]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{COMPONENT_SUBDIR}/requirements.txt
google-cloud-bigquery
tensorflow==2.12.0
pillow
tf-agents==0.16.0

Writing src/logger/requirements.txt


In [61]:
ENV_VARS = ",".join(
    [
        f"PROJECT_ID={PROJECT_ID}",
        f"RAW_DATA_PATH={RAW_DATA_PATH}",
        f"BATCH_SIZE={BATCH_SIZE}",
        f"RANK_K={RANK_K}",
        f"NUM_ACTIONS={NUM_ACTIONS}",
        f"BIGQUERY_TMP_FILE={BIGQUERY_TMP_FILE}",
        f"BIGQUERY_DATASET_ID={BIGQUERY_DATASET_ID}",
        f"BIGQUERY_LOCATION={BQ_LOCATION}",
        f"BIGQUERY_TABLE_ID={BIGQUERY_TABLE_ID}",
    ]
)

! echo $ENV_VARS

PROJECT_ID=hybrid-vertex,RAW_DATA_PATH=gs://e2ev4-hybrid-vertex-bucket/raw_data/u.data,BATCH_SIZE=8,RANK_K=20,NUM_ACTIONS=20,BIGQUERY_TMP_FILE=tmp.json,BIGQUERY_DATASET_ID=hybrid-vertex.movielens_dataset_e2ev4,BIGQUERY_LOCATION=us,BIGQUERY_TABLE_ID=hybrid-vertex.movielens_dataset_e2ev4.training_dataset


In [64]:
! gcloud functions deploy $LOGGER_CLOUD_FUNCTION --ingress-settings=all \
    --region=$REGION \
    --trigger-topic=$LOGGER_PUBSUB_TOPIC \
    --runtime=python310 \
    --memory=2048MB \
    --timeout=200s \
    --source=src/logger \
    --entry-point=log_prediction_to_bigquery \
    --stage-bucket=$BUCKET_NAME \
    --update-env-vars=$ENV_VARS

Deploying function (may take a while - up to 2 minutes)...⠹                    
For Cloud Build Logs, visit: https://console.cloud.google.com/cloud-build/builds;region=us-central1/a7e6fce8-2452-4f64-96be-182dbbb7d426?project=934903580331
Deploying function (may take a while - up to 2 minutes)...done.                
availableMemoryMb: 512
buildId: a7e6fce8-2452-4f64-96be-182dbbb7d426
buildName: projects/934903580331/locations/us-central1/builds/a7e6fce8-2452-4f64-96be-182dbbb7d426
dockerRegistry: CONTAINER_REGISTRY
entryPoint: log_prediction_to_bigquery
environmentVariables:
  BATCH_SIZE: '8'
  BIGQUERY_DATASET_ID: hybrid-vertex.movielens_dataset_e2ev4
  BIGQUERY_LOCATION: us
  BIGQUERY_TABLE_ID: hybrid-vertex.movielens_dataset_e2ev4.training_dataset
  BIGQUERY_TMP_FILE: tmp.json
  NUM_ACTIONS: '20'
  PROJECT_ID: hybrid-vertex
  RANK_K: '20'
  RAW_DATA_PATH: gs://e2ev4-hybrid-vertex-bucket/raw_data/u.data
eventTrigger:
  eventType: google.pubsub.topic.publish
  failurePolicy: {}
  reso

## Create (re)training pipeline

* The Trigger recurrently re-runs the pipeline to re-train the policy on new training data, using `kfp.v2.google.client.AIPlatformClient.create_schedule_from_job_spec`
* Create a pipeline for orchestration on Vertex Pipelines, and a Cloud Scheduler job that recurrently triggers the pipeline
* The method also automatically creates a Cloud Function that acts as an intermediary between the Scheduler and Pipelines. You can find the source code [here](https://github.com/kubeflow/pipelines/blob/v1.7.0-alpha.3/sdk/python/kfp/v2/google/client/client.py#L347-L391).

When the Simulator sends prediction requests to the endpoint, the Logger is triggered by the hook in the prediction code to log prediction results to BigQuery, as new training data. As this pipeline has a recurrent schedule, it utlizes the new training data in training a new policy, therefore closing the feedback loop. Theoretically speaking, if you set the pipeline scheduler to be infinitely frequent, then you would be approaching real-time, continuous training.

In [65]:
# TRIGGER_SCHEDULE = "*/30 * * * *"  # Schedule to trigger the pipeline. Eg. "*/30 * * * *" means every 30 mins.

In [70]:
RE_TRAIN_PIPE_VERSION = 'v1'

PIPELINE_TAG = f'movie-pipe-retraining-{RE_TRAIN_PIPE_VERSION}'
print("PIPELINE_TAG:", PIPELINE_TAG)

PIPELINE_NAME = f'tfab-{PREFIX}-{PIPELINE_TAG}'.replace('_', '-')
print("PIPELINE_NAME:", PIPELINE_NAME)

PIPELINE_TAG: movie-pipe-retraining-v1
PIPELINE_NAME: tfab-e2ev4-movie-pipe-retraining-v1


In [71]:
import kfp
from kfp.v2 import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component)

from src.generator import generator_component
from src.ingester import ingester_component
# from src.trainer import trainer_component

@kfp.v2.dsl.pipeline(
    name=f'{PIPELINE_NAME}'.replace('_', '-')
)
def pipeline(
    # Pipeline configs
    project_id: str,
    training_artifacts_dir: str,
    # BigQuery configs
    bigquery_table_id: str,
    bigquery_max_rows: int = 10000,
    # TF-Agents RL configs
    rank_k: int = 20,
    num_actions: int = 20,
    num_epochs: int = 5,
    tikhonov_weight: float = 0.01,
    agent_alpha: float = 10,
):  
    # ========================================================================
    # ingester
    # ========================================================================
    # Run the Ingester component.
    ingest_task = (
        ingester_component.ingest_bigquery_dataset_into_tfrecord(
            project_id=project_id,
            bigquery_table_id=bigquery_table_id,
            bigquery_max_rows=bigquery_max_rows,
            tfrecord_file_input=TFRECORD_FILE,
        )
    )
            
    # ========================================================================
    # trainer
    # ========================================================================

    train_task = (
        custom_job_op(
            training_artifacts_dir_input=training_artifacts_dir
            , tfrecord_file=ingest_task.outputs["tfrecord_file"]
            , num_epochs=num_epochs
            , rank_k=rank_k
            , num_actions=num_actions
            , tikhonov_weight=tikhonov_weight
            , agent_alpha=agent_alpha
            , project=PROJECT_ID
            , location=REGION
        )
        .set_display_name("trainer")
        .set_caching_options(True)
    )
    
    # ========================================================================
    # deployer
    # ========================================================================
    
    # Upload the trained policy as a model.
    model_upload_op = (
        gcc_aip.ModelUploadOp(
            project=project_id
            , display_name=TRAINED_POLICY_DISPLAY_NAME
            , artifact_uri=train_task.outputs["training_artifacts_dir"]
            , serving_container_image_uri=f"gcr.io/{PROJECT_ID}/{PREDICTION_CONTAINER}:latest"
        )
        .set_display_name("register model")
        .set_caching_options(True)
    )
    
    # Create a Vertex AI endpoint
    endpoint_create_op = (
        gcc_aip.EndpointCreateOp(
            project=project_id
            , display_name=ENDPOINT_DISPLAY_NAME
        )
        .set_display_name("create endpoint")
        .set_caching_options(True)
    )
    
    # Deploy the uploaded, trained policy to the created endpoint
    deploy_model_op = (
        gcc_aip.ModelDeployOp(
            endpoint=endpoint_create_op.outputs["endpoint"]
            , model=model_upload_op.outputs["model"]
            , deployed_model_display_name=TRAINED_POLICY_DISPLAY_NAME
            , traffic_split=TRAFFIC_SPLIT
            , dedicated_resources_machine_type=ENDPOINT_MACHINE_TYPE
            , dedicated_resources_accelerator_type=ENDPOINT_ACCELERATOR_TYPE
            , dedicated_resources_accelerator_count=ENDPOINT_ACCELERATOR_COUNT
            , dedicated_resources_min_replica_count=ENDPOINT_REPLICA_COUNT
        )
        .set_display_name("deploy model")
    )

### compile pipeline

In [72]:
# ! rm -f custom_container_pipeline_spec.json

RT_PIPELINE_JSON_SPEC_LOCAL = "retraining_pipeline_spec.json"

! rm -f $RT_PIPELINE_JSON_SPEC_LOCAL

kfp.v2.compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=RT_PIPELINE_JSON_SPEC_LOCAL,
)

### save json spec

In [73]:
RT_PIPELINE_ROOT = f"{BUCKET_URI}/retrain_pipeline"
print("RT_PIPELINE_ROOT:", RT_PIPELINE_ROOT)

RT_PIPELINES_FILEPATH = f'{RT_PIPELINE_ROOT}/retraining_pipeline_spec.json'
print("RT_PIPELINES_FILEPATH:", RT_PIPELINES_FILEPATH)

!gsutil -q cp $RT_PIPELINE_JSON_SPEC_LOCAL $RT_PIPELINES_FILEPATH

RT_PIPELINE_ROOT: gs://e2ev4-hybrid-vertex-bucket/retrain_pipeline
RT_PIPELINES_FILEPATH: gs://e2ev4-hybrid-vertex-bucket/retrain_pipeline/retraining_pipeline_spec.json


### submit pipeline to Vertex

In [75]:
# Create a pipeline run job.
job = aiplatform.PipelineJob(
    display_name=f"{PIPELINE_NAME}-retrain"
    , template_path=RT_PIPELINE_JSON_SPEC_LOCAL
    , pipeline_root=RT_PIPELINE_ROOT
    , failure_policy='fast'
    , parameter_values={
        # Pipeline configs
        "project_id": PROJECT_ID
        , "training_artifacts_dir": TRAINING_ARTIFACTS_DIR
        # BigQuery configs
        , "bigquery_table_id": BIGQUERY_TABLE_ID
    }
    , enable_caching=False
)

job.run(
    sync=False
    , service_account=VERTEX_SA
    # , network=VPC_NETWORK_FULL
)

## clean-up

In [None]:
# ! rm -rf custom_pipeline_spec.json