## Installation

Install the latest version of Vertex SDK for Python.

In [1]:
import os

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# Google Cloud Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_GOOGLE_CLOUD_NOTEBOOK:
    USER_FLAG = "--user"

Install pandas BigQuery connector

In [23]:
! pip install {USER_FLAG} -U "pandas-gbq"

Collecting pandas-gbq
  Downloading pandas_gbq-0.15.0-py3-none-any.whl (25 kB)
Collecting pydata-google-auth
  Downloading pydata_google_auth-1.2.0-py2.py3-none-any.whl (13 kB)
Installing collected packages: pydata-google-auth, pandas-gbq
Successfully installed pandas-gbq-0.15.0 pydata-google-auth-1.2.0


### Restart the kernel

Once you've installed everything, you need to restart the notebook kernel so it can find the packages.

In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Extract project ID

In [2]:
import os

PROJECT_ID = ""

if not os.getenv("IS_TESTING"):
    # Get your Google Cloud project ID from gcloud
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  dataorg-hackweek-2021


Basically get a timestamp and authentice (which we shouldn't because this is AI notebook)

In [3]:
from datetime import datetime
import os
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# If on Google Cloud Notebooks, then don't execute this code
if not IS_GOOGLE_CLOUD_NOTEBOOK:
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Set up a bucket to save things off

In [4]:
BUCKET_NAME = "gs://tmp-hackweek2021-model-training"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}

In [1]:
# uncomment to create the bucket
# ! gsutil mb -l $REGION $BUCKET_NAME

List the items in the bucket

In [5]:
! gsutil ls -al $BUCK`ET_NAME

      4139  2021-09-30T17:05:30Z  gs://tmp-hackweek2021-model-training/aiplatform-2021-09-30-17:05:30.399-aiplatform_custom_trainer_script-0.1.tar.gz#1633021530511685  metageneration=1
       297  2021-09-30T17:04:29Z  gs://tmp-hackweek2021-model-training/mean_and_std.json#1633021469934116  metageneration=1
                                 gs://tmp-hackweek2021-model-training/aiplatform-custom-training-2021-09-30-17:05:30.544/
TOTAL: 2 objects, 4436 bytes (4.33 KiB)


## AI Platform Config

In [6]:
import os
import sys

from google.cloud import aiplatform
from google.cloud.aiplatform import gapic as aip

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

In [7]:
(aip.AcceleratorType.NVIDIA_TESLA_K80, 4)

(<AcceleratorType.NVIDIA_TESLA_K80: 1>, 4)

In [8]:
TRAIN_GPU, TRAIN_NGPU = (aip.AcceleratorType.NVIDIA_TESLA_K80, 1)

DEPLOY_GPU, DEPLOY_NGPU = (aip.AcceleratorType.NVIDIA_TESLA_K80, 1)

In [9]:
TRAIN_VERSION = "tf-gpu.2-4"
DEPLOY_VERSION = "tf2-gpu.2-4"

TRAIN_IMAGE = "us-docker.pkg.dev/vertex-ai/training/{}:latest".format(TRAIN_VERSION)
DEPLOY_IMAGE = "us-docker.pkg.dev/vertex-ai/prediction/{}:latest".format(DEPLOY_VERSION)

print("Training:", TRAIN_IMAGE, TRAIN_GPU, TRAIN_NGPU)
print("Deployment:", DEPLOY_IMAGE, DEPLOY_GPU, DEPLOY_NGPU)

Training: us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-4:latest AcceleratorType.NVIDIA_TESLA_K80 1
Deployment: us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-4:latest AcceleratorType.NVIDIA_TESLA_K80 1


In [10]:
MACHINE_TYPE = "n1-standard"

VCPU = "4"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

MACHINE_TYPE = "n1-standard"

VCPU = "4"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

Train machine type n1-standard-4
Deploy machine type n1-standard-4


## Model Train Config

Query the data to flatten it out and export to a new table.

This makes the train-test-validation split conceptual easier when training the model. 
e.g., Can easily see the tables that were directly trained on in BQ Console

In [14]:
BQ_SOURCE_RAW = "bq://moz-fx-data-shared-prod.mlhackweek_search_stable.action_v1"
BQ_SOURCE_TRANSFORMED = "bq://mozdata.analysis.ccd_action_transformed_v1_tmp" # required to write to mozdata as credentials won't write to mlhackweek_search dataset

In [15]:
import json

import numpy as np
# Calculate mean and std across all rows
from google.cloud import bigquery

os.environ["GCLOUD_PROJECT"] = PROJECT_ID

NA_VALUES = ["NA", "."]

# Set up BigQuery clients
bqclient = bigquery.Client()

query = '''
select 
    metrics.string.search_meta_search_text as query,
    CAST(SPLIT(u.key, "_")[OFFSET(1)] AS INT64) as position,
    SPLIT(u.value, "/")[OFFSET(2)] as domain,
    p.value as preamble,
    d.value as short_description,
    t.value as title,
    s.value as selected
from `{tbl}`
cross join unnest(metrics.labeled_boolean.search_meta_selected) s
cross join unnest(metrics.labeled_string.search_meta_url) u
cross join unnest(metrics.labeled_string.search_meta_preamble) p
cross join unnest(metrics.labeled_string.search_meta_short_description) d
cross join unnest(metrics.labeled_string.search_meta_title) t
where submission_timestamp >="2021-01-01"
and CAST(SPLIT(s.key, "_")[OFFSET(1)] AS INT64) = CAST(SPLIT(u.key, "_")[OFFSET(1)] AS INT64)
and CAST(SPLIT(s.key, "_")[OFFSET(1)] AS INT64) = CAST(SPLIT(p.key, "_")[OFFSET(1)] AS INT64)
and CAST(SPLIT(s.key, "_")[OFFSET(1)] AS INT64) = CAST(SPLIT(d.key, "_")[OFFSET(2)] AS INT64)
and CAST(SPLIT(s.key, "_")[OFFSET(1)] AS INT64) = CAST(SPLIT(t.key, "_")[OFFSET(1)] AS INT64)
'''


df = bqclient.query(query.format(tbl = BQ_SOURCE_RAW[5:])).to_dataframe()
df.to_gbq(BQ_SOURCE_TRANSFORMED[5:])

0it [00:00, ?it/s]

1516 out of 1516 rows loaded.


1it [00:05,  5.04s/it]


In [19]:
dataset = aiplatform.TabularDataset.create(
    display_name="search_v1_transformed", bq_source=BQ_SOURCE_TRANSFORMED
    # display_name="search_v1", bq_source=BQ_SOURCE_RAW
)

INFO:google.cloud.aiplatform.datasets.dataset:Creating TabularDataset
INFO:google.cloud.aiplatform.datasets.dataset:Create TabularDataset backing LRO: projects/424090863877/locations/us-central1/datasets/3809367985592729600/operations/198032764400828416
INFO:google.cloud.aiplatform.datasets.dataset:TabularDataset created. Resource name: projects/424090863877/locations/us-central1/datasets/3809367985592729600
INFO:google.cloud.aiplatform.datasets.dataset:To use this TabularDataset in another session:
INFO:google.cloud.aiplatform.datasets.dataset:ds = aiplatform.TabularDataset('projects/424090863877/locations/us-central1/datasets/3809367985592729600')


In [21]:
JOB_NAME = "ccd_job_v1_" + TIMESTAMP

if not TRAIN_NGPU or TRAIN_NGPU < 2:
    TRAIN_STRATEGY = "single"
else:
    TRAIN_STRATEGY = "mirror"

EPOCHS = 20
BATCH_SIZE = 10

CMDARGS = [
    "--epochs=" + str(EPOCHS),
    "--batch_size=" + str(BATCH_SIZE),
    "--distribute=" + TRAIN_STRATEGY
]

## Custom Python Training Code

This is written to docker container and ran

* The training/validation/test splits are given by environmental variables that define the BQ Tables
  - These splits are created when the job is ran.
  - These are saved off in the running project BQ table. A new dataset is created with three tables (e.g., training, test, split).
  - This dataset can be found by going to the Vertex AI console: Training -> Custom Jobs -> View Custom Job Inputs in JSON. 
       - (not user friendly, no intuitive).
  - e.g., `dataorg-hackweek-2021.dataset_3809367985592729600_tables_2021_09_30T16_00_27_766Z.train`
  
  
**WARNING** 

This takes around 15 minutes to run.
It also depends when Google gets around to provisioning the resources. 
Going to Training -> Custom Jobs -> [View Logs](https://console.cloud.google.com/logs/query;query=resource.labels.job_id%3D%228878752107360944128%22%20timestamp%3E%3D%222021-09-30T23:47:01.947333Z%22;cursorTimestamp=2021-09-30T23:57:49.341720103Z?project=dataorg-hackweek-2021) gives the output from running the job 

In [29]:
%%writefile task_search_v1.py

import argparse
import tensorflow as tf
import numpy as np
import os

import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
from tensorflow.keras.models import Sequential
from tensorflow.keras import Input
from tensorflow.keras.layers import Dense, LSTM, Embedding, Dropout

from sklearn.preprocessing import LabelEncoder

from google.cloud import bigquery
from google.cloud import storage

# Read environmental variables
training_data_uri = os.environ["AIP_TRAINING_DATA_URI"]
validation_data_uri = os.environ["AIP_VALIDATION_DATA_URI"]
test_data_uri = os.environ["AIP_TEST_DATA_URI"]

# Read args
parser = argparse.ArgumentParser()
parser.add_argument('--epochs', dest='epochs',
                    default=10, type=int,
                    help='Number of epochs.')
parser.add_argument('--batch_size', dest='batch_size',
                    default=10, type=int,
                    help='Batch size.')
parser.add_argument('--distribute', dest='distribute', type=str, default='single',
                    help='Distributed training strategy.')
args = parser.parse_args()

# Single Machine, single compute device
if args.distribute == 'single':
    if tf.test.is_gpu_available():
        strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
    else:
        strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
# Single Machine, multiple compute device
elif args.distribute == 'mirror':
    strategy = tf.distribute.MirroredStrategy()
# Multiple Machine, multiple compute device
elif args.distribute == 'multi':
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

# Set up BigQuery clients
bqclient = bigquery.Client()

# Download a table
def download_table(bq_table_uri: str):
    # Remove bq:// prefix if present
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix):]

    table = bigquery.TableReference.from_string(bq_table_uri)
    rows = bqclient.list_rows(
        table,
    )
    return rows.to_dataframe(create_bqstorage_client=False)


df_train = download_table(training_data_uri)

df_train = df_train[~df_train.title.isnull()]

# df_validation = download_table(validation_data_uri)
# df_test = download_table(test_data_uri)

# create the model
max_tokens = 1000
max_len = 100
vectorize_layer = TextVectorization(
  max_tokens=max_tokens,
  output_mode="int",
  output_sequence_length=max_len,
)

train_texts = df_train["title"].to_numpy()
vectorize_layer.adapt(train_texts)


model = Sequential()

model.add(Input(shape=(1,), dtype="string"))
model.add(vectorize_layer)
model.add(Embedding(max_tokens + 1, 128))
model.add(LSTM(64))
model.add(Dense(64, activation="relu"))
model.add(Dense(df_train["domain"].unique().size, activation="sigmoid"))
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

# format the labels
encoder = LabelEncoder()
encoder.fit(sorted(df_train["domain"].unique()))
labels = encoder.transform(df_train['domain'].tolist())

# train the model
model.fit(df_train["title"].to_numpy(), labels, epochs=10)

# serialize it
tf.saved_model.save(model, os.environ["AIP_MODEL_DIR"])

Writing task_search_v1.py


The below fires off the Vertex AI training job. Also the train, test, validation split BQ tables occur here. 

In [None]:
job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path="task_search_v1.py",
    container_uri=TRAIN_IMAGE,
    requirements=["google-cloud-bigquery>=2.20.0"],
    model_serving_container_image_uri=DEPLOY_IMAGE,
)

MODEL_DISPLAY_NAME = "ccd-search-v1-" + TIMESTAMP

# Start the training
if TRAIN_GPU:
    model = job.run(
        dataset=dataset,
        model_display_name=MODEL_DISPLAY_NAME,
        bigquery_destination=f"bq://{PROJECT_ID}",
        args=CMDARGS,
        replica_count=1,
        machine_type=TRAIN_COMPUTE,
        accelerator_type=TRAIN_GPU.name,
        accelerator_count=TRAIN_NGPU,
    )
else:
    model = job.run(
        dataset=dataset,
        model_display_name=MODEL_DISPLAY_NAME,
        bigquery_destination=f"bq://{PROJECT_ID}",
        args=CMDARGS,
        replica_count=1,
        machine_type=TRAIN_COMPUTE,
        accelerator_count=0,
    )

INFO:google.cloud.aiplatform.utils.source_utils:Training script copied to:
gs://tmp-hackweek2021-model-training/aiplatform-2021-09-30-23:42:57.781-aiplatform_custom_trainer_script-0.1.tar.gz.
INFO:google.cloud.aiplatform.training_jobs:Training Output directory:
gs://tmp-hackweek2021-model-training/aiplatform-custom-training-2021-09-30-23:42:57.906 
INFO:google.cloud.aiplatform.training_jobs:No dataset split provided. The service will use a default split.
INFO:google.cloud.aiplatform.training_jobs:View Training:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/2615300556851249152?project=424090863877
INFO:google.cloud.aiplatform.training_jobs:CustomTrainingJob projects/424090863877/locations/us-central1/trainingPipelines/2615300556851249152 current state:
PipelineState.PIPELINE_STATE_RUNNING


## Deploy

In [None]:
DEPLOYED_NAME = "ccd-search-v1-deployed-" + TIMESTAMP

TRAFFIC_SPLIT = {"0": 100}

MIN_NODES = 1
MAX_NODES = 1

if DEPLOY_GPU:
    endpoint = model.deploy(
        deployed_model_display_name=DEPLOYED_NAME,
        traffic_split=TRAFFIC_SPLIT,
        machine_type=DEPLOY_COMPUTE,
        accelerator_type=DEPLOY_GPU.name,
        accelerator_count=DEPLOY_NGPU,
        min_replica_count=MIN_NODES,
        max_replica_count=MAX_NODES,
    )
else:
    endpoint = model.deploy(
        deployed_model_display_name=DEPLOYED_NAME,
        traffic_split=TRAFFIC_SPLIT,
        machine_type=DEPLOY_COMPUTE,
        accelerator_type=DEPLOY_COMPUTE.name,
        accelerator_count=0,
        min_replica_count=MIN_NODES,
        max_replica_count=MAX_NODES,
    )

INFO:google.cloud.aiplatform.models:Creating Endpoint
INFO:google.cloud.aiplatform.models:Create Endpoint backing LRO: projects/424090863877/locations/us-central1/endpoints/257250536527495168/operations/5320569477278990336
INFO:google.cloud.aiplatform.models:Endpoint created. Resource name: projects/424090863877/locations/us-central1/endpoints/257250536527495168
INFO:google.cloud.aiplatform.models:To use this Endpoint in another session:
INFO:google.cloud.aiplatform.models:endpoint = aiplatform.Endpoint('projects/424090863877/locations/us-central1/endpoints/257250536527495168')
INFO:google.cloud.aiplatform.models:Deploying model to Endpoint : projects/424090863877/locations/us-central1/endpoints/257250536527495168
INFO:google.cloud.aiplatform.models:Deploy Endpoint model backing LRO: projects/424090863877/locations/us-central1/endpoints/257250536527495168/operations/8765823192217419776


## Predictions

TBD

## Remove Endpoint

In [None]:
deployed_model_id = endpoint.list_models()[0].id
endpoint.undeploy(deployed_model_id=deployed_model_id)

## Clean-up

Uncomment and run to clean up all the training jobs, models, buckets, etc...

In [None]:
# delete_training_job = True
# delete_model = True
# delete_endpoint = True

# # Warning: Setting this to true will delete everything in your bucket
# delete_bucket = False

# # Delete the training job
# job.delete()

# # Delete the model
# model.delete()

# # Delete the endpoint
# endpoint.delete()

# if delete_bucket and "BUCKET_NAME" in globals():
#     ! gsutil -m rm -r $BUCKET_NAME

# TODO

1. Utilize deployment for prediction
2. Set "task.py" filename by config param
3. Supply train/test/validation splits fractions in training job