# Training at scale with the Vertex AI Training Service
**Learning Objectives:**
  1. Learn how to organize your training code into a Python package
  1. Train your model using cloud infrastructure via Google Cloud Vertex AI Training Service
  1. (optional) Learn how to run your training package using Docker containers and push training Docker images on a Docker registry

## Introduction

In this notebook we'll make the jump from training locally, to do training in the cloud. We'll take advantage of Google Cloud's [Vertex AI Training Service](https://cloud.google.com/vertex-ai/). 

Vertex AI Training Service is a managed service that allows the training and deployment of ML models without having to provision or maintain servers. The infrastructure is handled seamlessly by the managed service for us.

Each learning objective will correspond to a __#TODO__ in this student lab notebook -- try to complete this notebook first and then review the [Solution Notebook](../solutions/1_training_at_scale.ipynb) for reference. 


Specify your project name and bucket name in the cell below.

In [1]:
from google import api_core
from google.cloud import bigquery

Change the following cell as necessary:

In [2]:
# Change below if necessary
PROJECT = !gcloud config get-value project  # noqa: E999
PROJECT = PROJECT[0]
BUCKET = PROJECT
REGION = "us-central1"

OUTDIR = f"gs://{BUCKET}/taxifare/data"

%env PROJECT=$PROJECT
%env BUCKET=$BUCKET
%env REGION=$REGION
%env OUTDIR=$OUTDIR
%env TFVERSION=2.5

env: PROJECT=qwiklabs-gcp-04-24f0cc8000a3
env: BUCKET=qwiklabs-gcp-04-24f0cc8000a3
env: REGION=us-central1
env: OUTDIR=gs://qwiklabs-gcp-04-24f0cc8000a3/taxifare/data
env: TFVERSION=2.5


Confirm below that the bucket is regional and its region equals to the specified region:

In [3]:
%%bash
gsutil ls -Lb gs://$BUCKET | grep "gs://\|Location"
echo $REGION

gs://qwiklabs-gcp-04-24f0cc8000a3/ :
	Location type:			region
	Location constraint:		US-CENTRAL1
us-central1


In [4]:
%%bash
gcloud config set project $PROJECT
gcloud config set ai/region $REGION

Updated property [core/project].
Updated property [ai/region].


## Create BigQuery tables

If you have not already created a BigQuery dataset for our data, run the following cell:

In [5]:
bq = bigquery.Client(project=PROJECT)
dataset = bigquery.Dataset(bq.dataset("taxifare"))

try:
    bq.create_dataset(dataset)
    print("Dataset created")
except api_core.exceptions.Conflict:
    print("Dataset already exists")

Dataset already exists


Let's create a table with 1 million examples.

Note that the order of columns is exactly what was in our CSV files.

In [6]:
%%bigquery

CREATE OR REPLACE TABLE taxifare.feateng_training_data AS

SELECT
    (tolls_amount + fare_amount) AS fare_amount,
    pickup_datetime,
    pickup_longitude AS pickuplon,
    pickup_latitude AS pickuplat,
    dropoff_longitude AS dropofflon,
    dropoff_latitude AS dropofflat,
    passenger_count*1.0 AS passengers,
    'unused' AS key
FROM `nyc-tlc.yellow.trips`
WHERE ABS(MOD(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING)), 1000)) = 1
AND
    trip_distance > 0
    AND fare_amount >= 2.5
    AND pickup_longitude > -78
    AND pickup_longitude < -70
    AND dropoff_longitude > -78
    AND dropoff_longitude < -70
    AND pickup_latitude > 37
    AND pickup_latitude < 45
    AND dropoff_latitude > 37
    AND dropoff_latitude < 45
    AND passenger_count > 0

Query complete after 0.01s: 100%|██████████| 3/3 [00:00<00:00, 2086.37query/s]                        


Make the validation dataset be 1/10 the size of the training dataset.

In [7]:
%%bigquery

CREATE OR REPLACE TABLE taxifare.feateng_valid_data AS

SELECT
    (tolls_amount + fare_amount) AS fare_amount,
    pickup_datetime,
    pickup_longitude AS pickuplon,
    pickup_latitude AS pickuplat,
    dropoff_longitude AS dropofflon,
    dropoff_latitude AS dropofflat,
    passenger_count*1.0 AS passengers,
    'unused' AS key
FROM `nyc-tlc.yellow.trips`
WHERE ABS(MOD(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING)), 10000)) = 2
AND
    trip_distance > 0
    AND fare_amount >= 2.5
    AND pickup_longitude > -78
    AND pickup_longitude < -70
    AND dropoff_longitude > -78
    AND dropoff_longitude < -70
    AND pickup_latitude > 37
    AND pickup_latitude < 45
    AND dropoff_latitude > 37
    AND dropoff_latitude < 45
    AND passenger_count > 0

Query complete after 0.00s: 100%|██████████| 3/3 [00:00<00:00, 1777.00query/s]                        


## Export the tables as CSV files

In [8]:
%%bash

echo "Deleting current contents of $OUTDIR"
gsutil -m -q rm -rf $OUTDIR

echo "Extracting training data to $OUTDIR"
bq --location=US extract \
   --destination_format CSV  \
   --field_delimiter "," --noprint_header \
   taxifare.feateng_training_data \
   $OUTDIR/taxi-train-*.csv

echo "Extracting validation data to $OUTDIR"
bq --location=US extract \
   --destination_format CSV  \
   --field_delimiter "," --noprint_header \
   taxifare.feateng_valid_data \
   $OUTDIR/taxi-valid-*.csv

gsutil ls -l $OUTDIR

Deleting current contents of gs://qwiklabs-gcp-04-24f0cc8000a3/taxifare/data
Extracting training data to gs://qwiklabs-gcp-04-24f0cc8000a3/taxifare/data
Extracting validation data to gs://qwiklabs-gcp-04-24f0cc8000a3/taxifare/data
  88345235  2021-12-06T14:16:21Z  gs://qwiklabs-gcp-04-24f0cc8000a3/taxifare/data/taxi-train-000000000000.csv
   8725746  2021-12-06T14:16:32Z  gs://qwiklabs-gcp-04-24f0cc8000a3/taxifare/data/taxi-valid-000000000000.csv
TOTAL: 2 objects, 97070981 bytes (92.57 MiB)


Waiting on bqjob_r390e45c404eed62e_0000017d901a0900_1 ... (22s) Current status: DONE   
Waiting on bqjob_r2a157bd1596a6922_0000017d901a6abe_1 ... (2s) Current status: DONE   


Confirm that you have created both the training and validation datasets in Google Cloud Storage.

In [9]:
!gsutil cat gs://$BUCKET/taxifare/data/taxi-train-000000000000.csv | head -2

57.33,2013-06-08 14:30:00 UTC,-73.985512,40.757657,-73.77637,40.64547,2,unused
62.66,2015-03-20 09:21:00 UTC,-73.981071472167969,40.7790641784668,-73.781478881835938,40.648330688476562,2,unused


In [10]:
!gsutil cat gs://$BUCKET/taxifare/data/taxi-valid-000000000000.csv | head -2

2.5,2011-04-14 20:48:49 UTC,-73.989506,40.736013,-73.988979,40.70337,2,unused
2.5,2012-07-14 17:15:31 UTC,-73.996098,40.739591,-73.996754,40.739967,2,unused


## Make code compatible with Vertex AI Training Service
In order to make our code compatible with Vertex AI Training Service we need to make the following changes:

1. Upload data to Google Cloud Storage 
2. Move code into a trainer Python package
4. Submit training job with `gcloud` to train on Vertex AI

### Upload data to Google Cloud Storage (GCS)

Cloud services don't have access to our local files, so we need to upload them to a location the Cloud servers can read from. In this case we'll use GCS.

To do this run the notebook [0_export_data_from_bq_to_gcs.ipynb](./0_export_data_from_bq_to_gcs.ipynb), which will export the taxifare data from BigQuery directly into a GCS bucket. If all ran smoothly, you should be able to list the data bucket by running the following command:

In [11]:
!gsutil ls gs://$BUCKET/taxifare/data

gs://qwiklabs-gcp-04-24f0cc8000a3/taxifare/data/taxi-train-000000000000.csv
gs://qwiklabs-gcp-04-24f0cc8000a3/taxifare/data/taxi-valid-000000000000.csv


### Move code into a python package

The first thing to do is to convert your training code snippets into a regular Python package. 

A Python package is simply a collection of one or more `.py` files along with an `__init__.py` file to identify the containing directory as a package. The `__init__.py` sometimes contains initialization code but for our purposes an empty file suffices.

#### Create the package directory

Our package directory contains 3 files:

In [12]:
ls ./taxifare/trainer/

__init__.py  [0m[01;34m__pycache__[0m/  model.py  task.py


#### Paste existing code into model.py

A Python package requires our code to be in a .py file, as opposed to notebook cells. So, we simply copy and paste our existing code for the previous notebook into a single file.

In the cell below, we write the contents of the cell into `model.py` packaging the model we 
developed in the previous labs so that we can deploy it to Vertex AI Training Service.  

**Lab Task #1**: Organizing your training code into a Python package

There are two places to fill in TODOs in `model.py`. 

 * in the `build_dnn_model` function, add code to use a optimizer with a custom learning rate.
 * in the `train_and_evaluate` function, add code to define variables using the `hparams` dictionary.

In [13]:
%%writefile ./taxifare/trainer/model.py
"""Data prep, train and evaluate DNN model."""

import datetime
import logging
import os

import numpy as np
import tensorflow as tf
from tensorflow import feature_column as fc
from tensorflow.keras import activations, callbacks, layers, models

logging.info(tf.version.VERSION)


CSV_COLUMNS = [
    "fare_amount",
    "pickup_datetime",
    "pickup_longitude",
    "pickup_latitude",
    "dropoff_longitude",
    "dropoff_latitude",
    "passenger_count",
    "key",
]

# inputs are all float except for pickup_datetime which is a string
STRING_COLS = ["pickup_datetime"]
LABEL_COLUMN = "fare_amount"
DEFAULTS = [[0.0], ["na"], [0.0], [0.0], [0.0], [0.0], [0.0], ["na"]]
DAYS = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"]


def features_and_labels(row_data):
    for unwanted_col in ["key"]:
        row_data.pop(unwanted_col)
    label = row_data.pop(LABEL_COLUMN)
    return row_data, label


def load_dataset(pattern, batch_size, num_repeat):
    dataset = tf.data.experimental.make_csv_dataset(
        file_pattern=pattern,
        batch_size=batch_size,
        column_names=CSV_COLUMNS,
        column_defaults=DEFAULTS,
        num_epochs=num_repeat,
        shuffle_buffer_size=1000000,
    )
    return dataset.map(features_and_labels)


def create_train_dataset(pattern, batch_size):
    dataset = load_dataset(pattern, batch_size, num_repeat=None)
    return dataset.prefetch(1)


def create_eval_dataset(pattern, batch_size):
    dataset = load_dataset(pattern, batch_size, num_repeat=1)
    return dataset.prefetch(1)


def parse_datetime(s):
    if not isinstance(s, str):
        s = s.numpy().decode("utf-8")
    return datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S %Z")


def euclidean(params):
    lon1, lat1, lon2, lat2 = params
    londiff = lon2 - lon1
    latdiff = lat2 - lat1
    return tf.sqrt(londiff * londiff + latdiff * latdiff)


def get_dayofweek(s):
    ts = parse_datetime(s)
    return DAYS[ts.weekday()]


@tf.function
def dayofweek(ts_in):
    return tf.map_fn(
        lambda s: tf.py_function(get_dayofweek, inp=[s], Tout=tf.string), ts_in
    )


@tf.function
def fare_thresh(x):
    return 60 * activations.relu(x)


def transform(inputs, numeric_cols, nbuckets):
    # Pass-through columns
    transformed = inputs.copy()
    del transformed["pickup_datetime"]

    feature_columns = {
        colname: fc.numeric_column(colname) for colname in numeric_cols
    }

    # Scaling longitude from range [-70, -78] to [0, 1]
    for lon_col in ["pickup_longitude", "dropoff_longitude"]:
        transformed[lon_col] = layers.Lambda(
            lambda x: (x + 78) / 8.0, name=f"scale_{lon_col}"
        )(inputs[lon_col])

    # Scaling latitude from range [37, 45] to [0, 1]
    for lat_col in ["pickup_latitude", "dropoff_latitude"]:
        transformed[lat_col] = layers.Lambda(
            lambda x: (x - 37) / 8.0, name=f"scale_{lat_col}"
        )(inputs[lat_col])

    # Adding Euclidean dist (no need to be accurate: NN will calibrate it)
    transformed["euclidean"] = layers.Lambda(euclidean, name="euclidean")(
        [
            inputs["pickup_longitude"],
            inputs["pickup_latitude"],
            inputs["dropoff_longitude"],
            inputs["dropoff_latitude"],
        ]
    )
    feature_columns["euclidean"] = fc.numeric_column("euclidean")

    # hour of day from timestamp of form '2010-02-08 09:17:00+00:00'
    transformed["hourofday"] = layers.Lambda(
        lambda x: tf.strings.to_number(
            tf.strings.substr(x, 11, 2), out_type=tf.dtypes.int32
        ),
        name="hourofday",
    )(inputs["pickup_datetime"])
    feature_columns["hourofday"] = fc.indicator_column(
        fc.categorical_column_with_identity("hourofday", num_buckets=24)
    )

    latbuckets = np.linspace(0, 1, nbuckets).tolist()
    lonbuckets = np.linspace(0, 1, nbuckets).tolist()
    b_plat = fc.bucketized_column(
        feature_columns["pickup_latitude"], latbuckets
    )
    b_dlat = fc.bucketized_column(
        feature_columns["dropoff_latitude"], latbuckets
    )
    b_plon = fc.bucketized_column(
        feature_columns["pickup_longitude"], lonbuckets
    )
    b_dlon = fc.bucketized_column(
        feature_columns["dropoff_longitude"], lonbuckets
    )
    ploc = fc.crossed_column([b_plat, b_plon], nbuckets * nbuckets)
    dloc = fc.crossed_column([b_dlat, b_dlon], nbuckets * nbuckets)
    pd_pair = fc.crossed_column([ploc, dloc], nbuckets ** 4)
    feature_columns["pickup_and_dropoff"] = fc.embedding_column(pd_pair, 100)

    return transformed, feature_columns


def rmse(y_true, y_pred):
    return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))


def build_dnn_model(nbuckets, nnsize, lr, string_cols):
    numeric_cols = set(CSV_COLUMNS) - {LABEL_COLUMN, "key"} - set(string_cols)
    inputs = {
        colname: layers.Input(name=colname, shape=(), dtype="float32")
        for colname in numeric_cols
    }
    inputs.update(
        {
            colname: layers.Input(name=colname, shape=(), dtype="string")
            for colname in string_cols
        }
    )

    # transforms
    transformed, feature_columns = transform(inputs, numeric_cols, nbuckets)
    dnn_inputs = layers.DenseFeatures(feature_columns.values())(transformed)

    x = dnn_inputs
    for layer, nodes in enumerate(nnsize):
        x = layers.Dense(nodes, activation="relu", name=f"h{layer}")(x)
    output = layers.Dense(1, name="fare")(x)

    model = models.Model(inputs, output)

    # TODO 1a: Your code here
    model.compile(optimizer="adam",
                  loss="mse",
                  metrics=[rmse, "mse"])

    return model


def train_and_evaluate(hparams):
    # TODO 1b: Your code here
    nbuckets = hparams["nbuckets"]
    lr = hparams["lr"]
    batch_size = hparams["batch_size"]
    nnsize = [int(s) for s in hparams["nnsize"].split()]
    eval_data_path = hparams["eval_data_path"]
    num_evals = hparams["num_evals"]
    num_examples_to_train_on = hparams["num_examples_to_train_on"]
    output_dir = hparams["output_dir"]
    train_data_path = hparams["train_data_path"]

    model_export_path = os.path.join(output_dir, "savedmodel")
    checkpoint_path = os.path.join(output_dir, "checkpoints")
    tensorboard_path = os.path.join(output_dir, "tensorboard")

    if tf.io.gfile.exists(output_dir):
        tf.io.gfile.rmtree(output_dir)

    model = build_dnn_model(nbuckets, nnsize, lr, STRING_COLS)
    logging.info(model.summary())

    trainds = create_train_dataset(train_data_path, batch_size)
    evalds = create_eval_dataset(eval_data_path, batch_size)

    steps_per_epoch = num_examples_to_train_on // (batch_size * num_evals)

    checkpoint_cb = callbacks.ModelCheckpoint(
        checkpoint_path, save_weights_only=True, verbose=1
    )
    tensorboard_cb = callbacks.TensorBoard(tensorboard_path, histogram_freq=1)

    history = model.fit(
        trainds,
        validation_data=evalds,
        epochs=num_evals,
        steps_per_epoch=max(1, steps_per_epoch),
        verbose=2,  # 0=silent, 1=progress bar, 2=one line per epoch
        callbacks=[checkpoint_cb, tensorboard_cb],
    )

    # Exporting the model with default serving function.
    model.save(model_export_path)
    return history


Overwriting ./taxifare/trainer/model.py


### Modify code to read data from and write checkpoint files to GCS 

If you look closely above, you'll notice a new function, `train_and_evaluate` that wraps the code that actually trains the model. This allows us to parametrize the training by passing a dictionary of parameters to this function (e.g, `batch_size`, `num_examples_to_train_on`, `train_data_path` etc.)

This is useful because the output directory, data paths and number of train steps will be different depending on whether we're training locally or in the cloud. Parametrizing allows us to use the same code for both.

We specify these parameters at run time via the command line. Which means we need to add code to parse command line parameters and invoke `train_and_evaluate()` with those params. This is the job of the `task.py` file. 

In [14]:
%%writefile taxifare/trainer/task.py
"""Argument definitions for model training code in `trainer.model`."""

import argparse

from trainer import model

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--batch_size",
        help="Batch size for training steps",
        type=int,
        default=32,
    )
    parser.add_argument(
        "--eval_data_path",
        help="GCS location pattern of eval files",
        required=True,
    )
    parser.add_argument(
        "--nnsize",
        help="Hidden layer sizes (provide space-separated sizes)",
        default="32 8",
    )
    parser.add_argument(
        "--nbuckets",
        help="Number of buckets to divide lat and lon with",
        type=int,
        default=10,
    )
    parser.add_argument(
        "--lr", help="learning rate for optimizer", type=float, default=0.001
    )
    parser.add_argument(
        "--num_evals",
        help="Number of times to evaluate model on eval data training.",
        type=int,
        default=5,
    )
    parser.add_argument(
        "--num_examples_to_train_on",
        help="Number of examples to train on.",
        type=int,
        default=100,
    )
    parser.add_argument(
        "--output_dir",
        help="GCS location to write checkpoints and export models",
        required=True,
    )
    parser.add_argument(
        "--train_data_path",
        help="GCS location pattern of train files containing eval URLs",
        required=True,
    )
    args = parser.parse_args()
    hparams = args.__dict__

    model.train_and_evaluate(hparams)


Overwriting taxifare/trainer/task.py


### Run trainer module package locally

Now we can test our training code locally as follows using the local test data. We'll run a very small training job over a single file with a small batch size and one eval step.

In [15]:
%%bash

EVAL_DATA_PATH=./taxifare/tests/data/taxi-valid*
TRAIN_DATA_PATH=./taxifare/tests/data/taxi-train*
OUTPUT_DIR=./taxifare-model

test ${OUTPUT_DIR} && rm -rf ${OUTPUT_DIR}
export PYTHONPATH=${PYTHONPATH}:${PWD}/taxifare
    
python3 -m trainer.task \
--eval_data_path $EVAL_DATA_PATH \
--output_dir $OUTPUT_DIR \
--train_data_path $TRAIN_DATA_PATH \
--batch_size 5 \
--num_examples_to_train_on 100 \
--num_evals 1 \
--nbuckets 10 \
--lr 0.001 \
--nnsize "32 8"

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
dropoff_latitude (InputLayer)   [(None,)]            0                                            
__________________________________________________________________________________________________
dropoff_longitude (InputLayer)  [(None,)]            0                                            
__________________________________________________________________________________________________
pickup_longitude (InputLayer)   [(None,)]            0                                            
__________________________________________________________________________________________________
pickup_latitude (InputLayer)    [(None,)]            0                                            
______________________________________________________________________________________________

2021-12-06 14:16:41.402430: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
2021-12-06 14:16:41.737219: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-12-06 14:16:41.737262: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-12-06 14:16:41.737328: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-12-06 14:16:41.969128: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
2021-12-06 14:16:43.186977: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-12-06 14:16:43.187110: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-12-06 14:16:43.202767: I tensorflow/core/profiler/lib/profiler_session.cc:

### Run your training package on Vertex AI using a pre-built container

Once the code works in standalone mode locally, you can run it on the Cloud using Vertex AI and use pre-built containers. First, we need to package our code as a source distribution. For this, we can use `setuptools`. 

In [16]:
%%writefile taxifare/setup.py
from setuptools import find_packages
from setuptools import setup

setup(
    name="taxifare_trainer",
    version="0.1",
    packages=find_packages(),
    include_package_data=True,
    description="Taxifare model training application.",
)

Overwriting taxifare/setup.py


In [17]:
%%bash
cd taxifare
python ./setup.py sdist --formats=gztar
cd ..

running sdist
running egg_info
writing taxifare_trainer.egg-info/PKG-INFO
writing dependency_links to taxifare_trainer.egg-info/dependency_links.txt
writing top-level names to taxifare_trainer.egg-info/top_level.txt
reading manifest file 'taxifare_trainer.egg-info/SOURCES.txt'
writing manifest file 'taxifare_trainer.egg-info/SOURCES.txt'
running check
creating taxifare_trainer-0.1
creating taxifare_trainer-0.1/taxifare_trainer.egg-info
creating taxifare_trainer-0.1/trainer
copying files to taxifare_trainer-0.1...
copying setup.py -> taxifare_trainer-0.1
copying taxifare_trainer.egg-info/PKG-INFO -> taxifare_trainer-0.1/taxifare_trainer.egg-info
copying taxifare_trainer.egg-info/SOURCES.txt -> taxifare_trainer-0.1/taxifare_trainer.egg-info
copying taxifare_trainer.egg-info/dependency_links.txt -> taxifare_trainer-0.1/taxifare_trainer.egg-info
copying taxifare_trainer.egg-info/top_level.txt -> taxifare_trainer-0.1/taxifare_trainer.egg-info
copying trainer/__init__.py -> taxifare_trainer-






We will store our package in the Cloud Storage bucket.

In [18]:
%%bash
gsutil cp taxifare/dist/taxifare_trainer-0.1.tar.gz gs://${BUCKET}/taxifare/

Copying file://taxifare/dist/taxifare_trainer-0.1.tar.gz [Content-Type=application/x-tar]...
/ [1 files][  3.6 KiB/  3.6 KiB]                                                
Operation completed over 1 objects/3.6 KiB.                                      


#### Submit Custom Job using the `gcloud` CLI

To submit this source distribution the Cloud we use [`gcloud ai custom-jobs create`](https://cloud.google.com/sdk/gcloud/reference/ai/custom-jobs/create) and simply specify some additional parameters for Vertex AI Training service:
- job_name: A unique identifier for the Cloud job. We usually append system time to ensure uniqueness
- region: Cloud region to train in. See [here](https://cloud.google.com/vertex-ai/docs/general/locations) for supported Vertex AI Custom model training regions

The arguments within `--args` are sent to our `task.py`.

Because this is on the entire dataset, it will take a while. You can monitor the job from the GCP console in the Vertex AI Training section.

**Lab Task #2**: Train your model using cloud infrastructure via Google Cloud Vertex AI Training Service
Fill in the TODOs in the code below to submit your job for training on Vertex AI. 

In [29]:
%%bash

# Output directory and jobID
TIMESTAMP=$(date -u +%Y%m%d_%H%M%S)
OUTDIR=gs://${BUCKET}/taxifare/trained_model_$TIMESTAMP
JOB_NAME=taxifare_$TIMESTAMP
echo ${OUTDIR} ${REGION} ${JOB_NAME}

PYTHON_PACKAGE_URIS=gs://${BUCKET}/taxifare/taxifare_trainer-0.1.tar.gz
MACHINE_TYPE=n1-standard-4
REPLICA_COUNT=1
PYTHON_PACKAGE_EXECUTOR_IMAGE_URI="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-5:latest"
PYTHON_MODULE=trainer.task

# Model and training hyperparameters
BATCH_SIZE=50
NUM_EXAMPLES_TO_TRAIN_ON=5000
NUM_EVALS=100
NBUCKETS=10
LR=0.001
NNSIZE="32 8"

# GCS paths
GCS_PROJECT_PATH=gs://$BUCKET/taxifare
DATA_PATH=$GCS_PROJECT_PATH/data
TRAIN_DATA_PATH=$DATA_PATH/taxi-train*
EVAL_DATA_PATH=$DATA_PATH/taxi-valid*

WORKER_POOL_SPEC="machine-type=$MACHINE_TYPE,\
replica-count=$REPLICA_COUNT,\
executor-image-uri=$PYTHON_PACKAGE_EXECUTOR_IMAGE_URI,\
python-module=$PYTHON_MODULE"

ARGS="--eval_data_path=$EVAL_DATA_PATH,\
--output_dir=$OUTDIR,\
--train_data_path=$TRAIN_DATA_PATH,\
--batch_size=$BATCH_SIZE,\
--num_examples_to_train_on=100,\
--num_evals=$NUM_EVALS,\
--nbuckets=$NBUCKETS,\
--lr=$LR,\
--nnsize=$NNSIZE"

gcloud ai custom-jobs create \
    --region=$REGION \
    --display-name=$JOB_NAME \
    --python-package-uris=$PYTHON_PACKAGE_URIS \
    --worker-pool-spec=$WORKER_POOL_SPEC \
    --args="$ARGS"
# TODO 2: Your code here

gs://qwiklabs-gcp-04-24f0cc8000a3/taxifare/trained_model_20211206_151443 us-central1 taxifare_20211206_151443


Using endpoint [https://us-central1-aiplatform.googleapis.com/]
CustomJob [projects/961854438162/locations/us-central1/customJobs/1882329821892050944] is submitted successfully.

Your job is still active. You may view the status of your job with the command

  $ gcloud ai custom-jobs describe projects/961854438162/locations/us-central1/customJobs/1882329821892050944

or continue streaming the logs with the command

  $ gcloud ai custom-jobs stream-logs projects/961854438162/locations/us-central1/customJobs/1882329821892050944


#### Submit Custom Job using the Vertex AI Python SDK

The `gcloud` CLI is just one of multiple ways to interact with Vertex AI, which also include the Console GUI, directly calling the REST APIs (e.g. using `curl`), and the most flexible interface being the Vertex AI SDK available in multiple languages.

Below, we use the Vertex AI Python SDK to accomplish the same Pre-Built Container training as above -- getting familiar with the SDK will come in handy later when we use advanced features such as hyperparameter tuning.

### Run your training package using a custom container

Vertex AI Training also supports training in custom containers, allowing users to bring their own Docker containers with any pre-installed ML framework or algorithm to run on Vertex AI Training. 

In this last section, we'll see how to submit a Cloud training job using a customized Docker image. 

Containerizing our `./taxifare/trainer` package involves 3 steps:

* Writing a Dockerfile in `./taxifare`
* Building the Docker image
* Pushing it to the Google Cloud container registry in our GCP project

The `Dockerfile` specifies
1. How the container needs to be provisioned so that all the dependencies in our code are satisfied
2. Where to copy our trainer Package in the container
3. What command to run when the container is ran (the `ENTRYPOINT` line)

**Lab Task #3**: Running your training package using Docker containers.
Fill in the TODOs in the code below for Dockerfile

In [22]:
%%writefile ./taxifare/Dockerfile
FROM us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-5:latest
# TODO 3: Your code here

COPY . /code

WORKDIR /code

ENTRYPOINT ["python3", "-m", "trainer.task"]

Writing ./taxifare/Dockerfile


In [23]:
%%bash 

PROJECT_DIR=$(cd ./taxifare && pwd)
IMAGE_NAME=taxifare_training_container
DOCKERFILE=$PROJECT_DIR/Dockerfile
IMAGE_URI=gcr.io/$PROJECT/$IMAGE_NAME

docker build $PROJECT_DIR -f $DOCKERFILE -t $IMAGE_URI

docker push $IMAGE_URI

Sending build context to Docker daemon  123.9kB
Step 1/4 : FROM us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-5:latest
latest: Pulling from vertex-ai/training/tf-cpu.2-5
25fa05cd42bd: Pulling fs layer
334312f3cce5: Pulling fs layer
69ae64a48940: Pulling fs layer
fe5bbb0e39a4: Pulling fs layer
4f4fb700ef54: Pulling fs layer
80ddd5a9f1f3: Pulling fs layer
76405ca8c0d7: Pulling fs layer
96b547bd5851: Pulling fs layer
165d356cc142: Pulling fs layer
1f14d2578dc7: Pulling fs layer
05c08f7ba2fb: Pulling fs layer
b1c9ec7e36f6: Pulling fs layer
94e3f0ba3fb3: Pulling fs layer
00f212e657e4: Pulling fs layer
7d32fad557e1: Pulling fs layer
ea77f6786456: Pulling fs layer
3e6559c347aa: Pulling fs layer
1005710da1b9: Pulling fs layer
1cb94f682bbc: Pulling fs layer
f13c8af07f6a: Pulling fs layer
fe5bbb0e39a4: Waiting
648aea4595d4: Pulling fs layer
4f4fb700ef54: Waiting
80ddd5a9f1f3: Waiting
96b547bd5851: Waiting
a988524b4d09: Pulling fs layer
885e92cf2b8f: Pulling fs layer
e2e7332254a4: Pulling fs laye

**Remark:** If you prefer to build the container image from the command line, we have written a script for that `./taxifare/scripts/build.sh`. This script reads its configuration from the file `./taxifare/scripts/env.sh`. You can configure these arguments the way you want in that file. You can also simply type `make build` from within `./taxifare` to build the image (which will invoke the build script). Similarly, we wrote the script `./taxifare/scripts/push.sh` to push the Docker image, which you can also trigger by typing `make push` from within `./taxifare`.

#### Train using a custom container on Vertex AI

TODO: To submit to the Cloud we use [`gcloud ai custom-jobs create`](https://cloud.google.com/sdk/gcloud/reference/ai/custom-jobs/create) and simply specify some additional parameters for Vertex AI Training Service:
- job_name: A unique identifier for the Cloud job. We usually append system time to ensure uniqueness
- image-uri: The uri of the Docker image we pushed in the Google Cloud registry
- region: Cloud region to train in. See [here](https://cloud.google.com/vertex-ai/docs/general/locations) for supported Vertex AI Training Service regions

The arguments within `--args` are sent to our `task.py`.

You can track your job and view logs using [cloud console](https://console.cloud.google.com/mlengine/jobs).

In [24]:
%%bash

# Output directory and jobID
TIMESTAMP=$(date -u +%Y%m%d_%H%M%S)
OUTDIR=gs://${BUCKET}/taxifare/trained_model_$TIMESTAMP
JOB_NAME=taxifare_container_$TIMESTAMP
echo ${OUTDIR} ${REGION} ${JOB_NAME}

# Model and training hyperparameters
BATCH_SIZE=50
NUM_EXAMPLES_TO_TRAIN_ON=5000
NUM_EVALS=100
NBUCKETS=10
LR=0.001
NNSIZE="32 8"

# Vertex AI machines to use for training
MACHINE_TYPE=n1-standard-4
REPLICA_COUNT=1

# GCS paths.
GCS_PROJECT_PATH=gs://$BUCKET/taxifare
DATA_PATH=$GCS_PROJECT_PATH/data
TRAIN_DATA_PATH=$DATA_PATH/taxi-train*
EVAL_DATA_PATH=$DATA_PATH/taxi-valid*

IMAGE_NAME=taxifare_training_container
IMAGE_URI=gcr.io/$PROJECT/$IMAGE_NAME

WORKER_POOL_SPEC="machine-type=$MACHINE_TYPE,\
replica-count=$REPLICA_COUNT,\
container-image-uri=$IMAGE_URI"

ARGS="--eval_data_path=$EVAL_DATA_PATH,\
--output_dir,=$OUTDIR,\
--train_data_path=$TRAIN_DATA_PATH,\
--batch_size=$BATCH_SIZE,\
--num_examples_to_train_on=$NUM_EXAMPLES_TO_TRAIN_ON,\
--num_evals=$NUM_EVALS,\
--nbuckets=$NBUCKETS,\
--lr=$LR,\
--nnsize=$NNSIZE"

gcloud ai custom-jobs create \
  --region=$REGION \
  --display-name=$JOB_NAME \
  --worker-pool-spec=$WORKER_POOL_SPEC \
  --args="$ARGS"

gs://qwiklabs-gcp-04-24f0cc8000a3/taxifare/trained_model_20211206_143325 us-central1 taxifare_container_20211206_143325


Using endpoint [https://us-central1-aiplatform.googleapis.com/]
CustomJob [projects/961854438162/locations/us-central1/customJobs/8687268858848870400] is submitted successfully.

Your job is still active. You may view the status of your job with the command

  $ gcloud ai custom-jobs describe projects/961854438162/locations/us-central1/customJobs/8687268858848870400

or continue streaming the logs with the command

  $ gcloud ai custom-jobs stream-logs projects/961854438162/locations/us-central1/customJobs/8687268858848870400


As seen with the Pre-built Container, we can use the Vertex AI Python SDK also here as an alternative to the `gcloud` CLI as below.

Copyright 2021 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License