<a href="https://colab.research.google.com/github/sayakpaul/CI-CD-for-Model-Training/blob/main/cloud_build_tfx.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## References

* https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_vertex_training
* https://github.com/GoogleCloudPlatform/mlops-with-vertex-ai/

## Setting up

In [None]:
!gcloud init

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
GOOGLE_CLOUD_PROJECT = "fast-ai-exploration"
GOOGLE_CLOUD_REGION = "us-central1"
GCS_BUCKET_NAME = "vertex-tfx-mlops"

PIPELINE_NAME = "penguin-vertex-training"
DATA_ROOT = "gs://{}/data/{}".format(GCS_BUCKET_NAME, PIPELINE_NAME)
MODULE_ROOT = "gs://{}/pipeline_module/{}".format(GCS_BUCKET_NAME, PIPELINE_NAME)

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging

    logging.error("Please set all required parameters.")

In [None]:
!gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/

## Training module for TFX

In [None]:
_trainer_module_file = 'penguin_trainer.py'

In [None]:
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    "culmen_length_mm",
    "culmen_depth_mm",
    "flipper_length_mm",
    "body_mass_g",
]
_LABEL_KEY = "species"

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64),
}


def _input_fn(
    file_pattern: List[str],
    data_accessor: tfx.components.DataAccessor,
    schema: schema_pb2.Schema,
    batch_size: int,
) -> tf.data.Dataset:
    """Generates features and label for training.

    Args:
      file_pattern: List of paths or patterns of input tfrecord files.
      data_accessor: DataAccessor for converting input to RecordBatch.
      schema: schema of the input data.
      batch_size: representing the number of consecutive elements of returned
        dataset to combine in a single batch

    Returns:
      A dataset that contains (features, indices) tuple where features is a
        dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    return data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(batch_size=batch_size, label_key=_LABEL_KEY),
        schema=schema,
    ).repeat()


def _make_keras_model(learning_rate: float) -> tf.keras.Model:
    """Creates a DNN Keras model for classifying penguin data.

    Returns:
      A Keras Model.
    """
    # The model below is built with Functional API, please refer to
    # https://www.tensorflow.org/guide/keras/overview for all API options.
    inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
    d = keras.layers.concatenate(inputs)
    for _ in range(2):
        d = keras.layers.Dense(8, activation="relu")(d)
    outputs = keras.layers.Dense(3)(d)

    model = keras.Model(inputs=inputs, outputs=outputs)
    optimizer = keras.optimizers.Adam(learning_rate)
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )

    model.summary(print_fn=logging.info)
    return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
    if fn_args.custom_config.get("use_gpu", False):
        logging.info("Using MirroredStrategy with one GPU.")
        return tf.distribute.MirroredStrategy(devices=["device:GPU:0"])
    return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
    """Train the model based on given args.

    Args:
      fn_args: Holds args used to train the model as name/value pairs.
    """

    # This schema is usually either an output of SchemaGen or a manually-curated
    # version provided by pipeline author. A schema can also derived from TFT
    # graph if a Transform component is used. In the case when either is missing,
    # `schema_from_feature_spec` could be used to generate schema from very simple
    # feature_spec, but the schema returned would be very primitive.
    schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
    hyperparameters = fn_args.hyperparameters
    logging.info("Hyperparameters:")
    logging.info(hyperparameters)

    train_dataset = _input_fn(
        fn_args.train_files, fn_args.data_accessor, schema, batch_size=_TRAIN_BATCH_SIZE
    )
    eval_dataset = _input_fn(
        fn_args.eval_files, fn_args.data_accessor, schema, batch_size=_EVAL_BATCH_SIZE
    )

    # NEW: If we have a distribution strategy, build a model in a strategy scope.
    strategy = _get_distribution_strategy(fn_args)
    if strategy is None:
        model = _make_keras_model(hyperparameters["learning_rate"])
    else:
        with strategy.scope():
            model = _make_keras_model(hyperparameters["learning_rate"])

    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        epochs=hyperparameters["num_epochs"],
    )

    # The result of the training should be saved in `fn_args.serving_model_dir`
    # directory.
    model.save(fn_args.serving_model_dir, save_format="tf")

In [None]:
!gsutil cp {_trainer_module_file} {MODULE_ROOT}/

## Cloud Build configurations

In [None]:
REPO_URL = "https://github.com/sayakpaul/CI-CD-for-Model-Training"
BRANCH = "dev"

PIPELINE_ROOT = "gs://{}/pipeline_root/{}".format(GCS_BUCKET_NAME, PIPELINE_NAME)

SERVING_MODEL_DIR = "gs://{}/serving_model/{}".format(GCS_BUCKET_NAME, PIPELINE_NAME)

VERSION = "1.0.0"
CICD_IMAGE_URI = f"gcr.io/tfx-oss-public/tfx:{VERSION}"
TFX_IMAGE_URI = f"gcr.io/{GOOGLE_CLOUD_PROJECT}/{PIPELINE_NAME}:{VERSION}"

In [None]:
SUBSTITUTIONS=f"""\
_REPO_URL='{REPO_URL}',\
_BRANCH={BRANCH},\
_PROJECT={GOOGLE_CLOUD_PROJECT},\
_REGION={GOOGLE_CLOUD_REGION},\
_PIPELINE_NAME={PIPELINE_NAME},\
_PIPELINE_ROOT={PIPELINE_ROOT},\
_MODULE_ROOT={MODULE_ROOT},\
_DATA_ROOT={DATA_ROOT},\
_SERVING_MODEL_DIR={SERVING_MODEL_DIR},\
_CICD_IMAGE_URI={CICD_IMAGE_URI},\
_TFX_IMAGE_URI={TFX_IMAGE_URI}
"""

!echo $SUBSTITUTIONS

## Submit to Cloud Build

The output of Cloud Build, in this case, is a compiled pipeline uploaded to GCS Bucket.

In [None]:
!git clone https://github.com/sayakpaul/CI-CD-for-Model-Training --quiet

In [None]:
!gcloud builds submit --no-source --timeout=60m \
    --config CI-CD-for-Model-Training/build/pipeline-deployment.yaml \
    --substitutions {SUBSTITUTIONS} \
    --machine-type=e2-highcpu-8

Output:

```shell
ID                                    CREATE_TIME                DURATION  SOURCE  IMAGES  STATUS
1619041e-a192-4de0-91f5-6799afa647ca  2021-08-24T08:16:37+00:00  7M45S     -       -       SUCCESS
```

In [None]:
!gsutil ls -lh {PIPELINE_ROOT}/

## Next steps

The output of the build is a compiled pipeline specification file (`.json`) uploaded to the specified GCS Bucket. This file can be provided like so in order to start the execution on Vertex AI:

```python
from kfp.v2.google import client

pipelines_client = client.AIPlatformClient(
    project_id=GOOGLE_CLOUD_PROJECT,
    region=GOOGLE_CLOUD_REGION,
)

_ = pipelines_client.create_run_from_job_spec(f"{PIPELINE_ROOT}/{PIPELINE_NAME}.json", 
    parameter_values={
        'num_epochs': 3, 'learning_rate': 0.01
    },
    enable_caching=True)
```

However, we will use this pipeline slightly differently in order to allow ***continuous training***. For more details, follow [this notebook](https://colab.research.google.com/github/sayakpaul/CI-CD-for-Model-Training/blob/dev/cloud_function_trigger.ipynb).