# Training and deploying model on Google Cloud Platform

## Code based on [this](https://cloud.google.com/vertex-ai/docs/tutorials/train-tensorflow-bigquery) GCP tutorial

## Author

<a href="https://github.com/mibanell">
<img src="../img/avatar_square.png" width="50"/>
</a>

-------------------

This notebook contains the necessary code for training and deploying a custom Tensorflow model on Google Cloud Platform using the iris dataset.

In [1]:
!pip install --upgrade google-cloud-aiplatform
!pip install -U google-cloud-storage
!pip install -U "google-cloud-bigquery[all]"



In [2]:
import os
import sys

from google.cloud import aiplatform
from google.cloud.aiplatform import gapic as aip

from datetime import datetime

In [3]:
PROJECT_ID = "mibanell"

In [4]:
# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# If on Google Cloud Notebooks, then don't execute this code
if not IS_GOOGLE_CLOUD_NOTEBOOK:
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

In [5]:
BUCKET_NAME = "gs://" + PROJECT_ID + "-iris-code"
REGION = "us-central1"

print(BUCKET_NAME)

gs://mibanell-iris-code


In [6]:
!gsutil mb -p $PROJECT_ID -l $REGION $BUCKET_NAME

Creating gs://mibanell-iris-code/...
ServiceException: 409 A Cloud Storage bucket named 'mibanell-iris-code' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [7]:
!gsutil ls -al $BUCKET_NAME

      3509  2021-09-19T10:04:43Z  gs://mibanell-iris-code/aiplatform-2021-09-19-10:04:42.929-aiplatform_custom_trainer_script-0.1.tar.gz#1632045883044336  metageneration=1
      3496  2021-09-19T10:17:25Z  gs://mibanell-iris-code/aiplatform-2021-09-19-10:17:24.972-aiplatform_custom_trainer_script-0.1.tar.gz#1632046645083659  metageneration=1
                                 gs://mibanell-iris-code/aiplatform-custom-training-2021-09-19-10:17:25.112/
TOTAL: 2 objects, 7005 bytes (6.84 KiB)


In [8]:
# Init aiplatform
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

In [9]:
# Set training and deployment Docker images
TRAIN_IMAGE = "us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-6:latest"
DEPLOY_IMAGE = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest"

In [10]:
# Set machines
MACHINE_TYPE = "n1-standard"

VCPU = "4"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

MACHINE_TYPE = "n1-standard"

VCPU = "4"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

Train machine type n1-standard-4
Deploy machine type n1-standard-4


In [11]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

JOB_NAME = "custom_job_" + TIMESTAMP

TRAIN_STRATEGY = "single"

EPOCHS = 20
BATCH_SIZE = 50

CMDARGS = [
    "--epochs=" + str(EPOCHS),
    "--batch_size=" + str(BATCH_SIZE),
    "--distribute=" + TRAIN_STRATEGY
]

In [12]:
# Create dataset from BigQuery data

BQ_SOURCE = "bq://mibanell.iris.iris"

dataset = aiplatform.TabularDataset.create(
    display_name="iris-data", bq_source=BQ_SOURCE
)

INFO:google.cloud.aiplatform.datasets.dataset:Creating TabularDataset
INFO:google.cloud.aiplatform.datasets.dataset:Create TabularDataset backing LRO: projects/987533379656/locations/us-central1/datasets/414498291485507584/operations/8453791687859240960
INFO:google.cloud.aiplatform.datasets.dataset:TabularDataset created. Resource name: projects/987533379656/locations/us-central1/datasets/414498291485507584
INFO:google.cloud.aiplatform.datasets.dataset:To use this TabularDataset in another session:
INFO:google.cloud.aiplatform.datasets.dataset:ds = aiplatform.TabularDataset('projects/987533379656/locations/us-central1/datasets/414498291485507584')


## Create training package task.py

In [13]:
%%writefile task.py

import argparse
import tensorflow as tf
import numpy as np
import os

import pandas as pd
import tensorflow as tf

from google.cloud import bigquery
from google.cloud import storage

# Read environmental variables
training_data_uri = os.environ["AIP_TRAINING_DATA_URI"]
validation_data_uri = os.environ["AIP_VALIDATION_DATA_URI"]
test_data_uri = os.environ["AIP_TEST_DATA_URI"]

# Read args
parser = argparse.ArgumentParser()
parser.add_argument('--epochs', dest='epochs',
                    default=10, type=int,
                    help='Number of epochs.')
parser.add_argument('--batch_size', dest='batch_size',
                    default=10, type=int,
                    help='Batch size.')
parser.add_argument('--distribute', dest='distribute', type=str, default='single',
                    help='Distributed training strategy.')
args = parser.parse_args()

# Single Machine, single compute device
strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

# Set up training variables
LABEL_COLUMN = "Species"
UNUSED_COLUMNS = ['Id']
NA_VALUES = ["NA", "."]

# Possible categorical values
SPECIES = ['Iris-versicolor', 'Iris-virginica', 'Iris-setosa']

# Set up BigQuery clients
bqclient = bigquery.Client()

# Download a table
def download_table(bq_table_uri: str):
    # Remove bq:// prefix if present
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix):]

    table = bigquery.TableReference.from_string(bq_table_uri)
    rows = bqclient.list_rows(
        table,
    )
    return rows.to_dataframe(create_bqstorage_client=False)


df_train = download_table(training_data_uri)
df_validation = download_table(validation_data_uri)
df_test = download_table(test_data_uri)

# Remove NA values
def clean_dataframe(df):
    return df.replace(to_replace=NA_VALUES, value=np.NaN).dropna()


df_train = clean_dataframe(df_train)
df_validation = clean_dataframe(df_validation)

_CATEGORICAL_TYPES = {
    "Species": pd.api.types.CategoricalDtype(categories=SPECIES)
}


def preprocess(df):
    """Converts categorical features to numeric. Removes unused columns.

    Args:
      df: Pandas df with raw data

    Returns:
      df with preprocessed data
    """
    df = df.drop(columns=UNUSED_COLUMNS)

    # Drop rows with NaN's
    df = df.dropna()

    # Convert integer valued (numeric) columns to floating point
    numeric_columns = df.select_dtypes(["int32", "float32", "float64"]).columns
    df[numeric_columns] = df[numeric_columns].astype("float32")

    # Convert categorical columns to numeric
    cat_columns = df.select_dtypes(["object"]).columns

    df[cat_columns] = df[cat_columns].apply(
        lambda x: x.astype(_CATEGORICAL_TYPES[x.name])
    )
    df[cat_columns] = df[cat_columns].apply(lambda x: x.cat.codes)
    return df


def convert_dataframe_to_dataset(
    df_train,
    df_validation
):
    df_train = preprocess(df_train)
    df_validation = preprocess(df_validation)

    df_train_x, df_train_y = df_train, df_train.pop(LABEL_COLUMN)
    df_validation_x, df_validation_y = df_validation, df_validation.pop(LABEL_COLUMN)

    # Join train_x and eval_x to normalize on overall means and standard
    # deviations. Then separate them again.
    all_x = pd.concat([df_train_x, df_validation_x], keys=["train", "eval"])
    # all_x = standardize(all_x, mean_and_std)
    df_train_x, df_validation_x = all_x.xs("train"), all_x.xs("eval")

    y_train = np.asarray(df_train_y).astype("float32")
    y_validation = np.asarray(df_validation_y).astype("float32")

    # Convert to numpy representation
    x_train = np.asarray(df_train_x)
    x_test = np.asarray(df_validation_x)

    # Convert to one-hot representation
    y_train = tf.keras.utils.to_categorical(y_train, num_classes=len(SPECIES))
    y_validation = tf.keras.utils.to_categorical(y_validation, num_classes=len(SPECIES))

    dataset_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    dataset_validation = tf.data.Dataset.from_tensor_slices((x_test, y_validation))
    return (dataset_train, dataset_validation)

# Create datasets
dataset_train, dataset_validation = convert_dataframe_to_dataset(df_train, df_validation)

# Shuffle train set
dataset_train = dataset_train.shuffle(len(df_train))

def create_model(num_features):
    # Create model
    Dense = tf.keras.layers.Dense
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Normalization(axis=None),
            Dense(
                100,
                activation=tf.nn.relu,
                kernel_initializer="uniform",
                input_dim=num_features,
            ),
            Dense(75, activation=tf.nn.relu),
            Dense(50, activation=tf.nn.relu),
            Dense(25, activation=tf.nn.relu),
            Dense(3, activation=tf.nn.softmax),
        ]
    )

    # Compile Keras model
    optimizer = tf.keras.optimizers.RMSprop(lr=0.001)
    model.compile(
        loss="categorical_crossentropy", metrics=["accuracy"], optimizer=optimizer
    )

    return model

# Create the model
with strategy.scope():
    model = create_model(num_features=dataset_train._flat_shapes[0].dims[0].value)

# Set up datasets
NUM_WORKERS = strategy.num_replicas_in_sync
# Here the batch size scales up by number of workers since
# `tf.data.Dataset.batch` expects the global batch size.
GLOBAL_BATCH_SIZE = args.batch_size * NUM_WORKERS
dataset_train = dataset_train.batch(GLOBAL_BATCH_SIZE)
dataset_validation = dataset_validation.batch(GLOBAL_BATCH_SIZE)

# Train the model
model.fit(dataset_train, epochs=args.epochs, validation_data=dataset_validation)

tf.saved_model.save(model, os.environ["AIP_MODEL_DIR"])

df_test.head()

Writing task.py


## Create training pipeline

In [14]:
job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path="task.py",
    container_uri=TRAIN_IMAGE,
    requirements=["google-cloud-bigquery>=2.20.0"],
    model_serving_container_image_uri=DEPLOY_IMAGE,
)

MODEL_DISPLAY_NAME = "iris-" + TIMESTAMP

# Start the training
model = job.run(
        dataset=dataset,
        model_display_name=MODEL_DISPLAY_NAME,
        bigquery_destination=f"bq://{PROJECT_ID}",
        args=CMDARGS,
        replica_count=1,
        machine_type=TRAIN_COMPUTE,
        accelerator_count=0,
    )

INFO:google.cloud.aiplatform.utils.source_utils:Training script copied to:
gs://mibanell-iris-code/aiplatform-2021-09-19-18:57:31.513-aiplatform_custom_trainer_script-0.1.tar.gz.
INFO:google.cloud.aiplatform.training_jobs:Training Output directory:
gs://mibanell-iris-code/aiplatform-custom-training-2021-09-19-18:57:31.740 
INFO:google.cloud.aiplatform.training_jobs:No dataset split provided. The service will use a default split.
INFO:google.cloud.aiplatform.training_jobs:View Training:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/1685738242357329920?project=987533379656
INFO:google.cloud.aiplatform.training_jobs:CustomTrainingJob projects/987533379656/locations/us-central1/trainingPipelines/1685738242357329920 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.training_jobs:CustomTrainingJob projects/987533379656/locations/us-central1/trainingPipelines/1685738242357329920 current state:
PipelineState.PIPELINE_STATE_RUNNING
IN

## Deploy model

In [17]:
DEPLOYED_NAME = "iris_deployed-" + TIMESTAMP

TRAFFIC_SPLIT = {"0": 100}

MIN_NODES = 1
MAX_NODES = 1

endpoint = model.deploy(
        deployed_model_display_name=DEPLOYED_NAME,
        traffic_split=TRAFFIC_SPLIT,
        machine_type=DEPLOY_COMPUTE,
        min_replica_count=MIN_NODES,
        max_replica_count=MAX_NODES,
    )

INFO:google.cloud.aiplatform.models:Creating Endpoint
INFO:google.cloud.aiplatform.models:Create Endpoint backing LRO: projects/987533379656/locations/us-central1/endpoints/7359708623867478016/operations/1037489001486876672
INFO:google.cloud.aiplatform.models:Endpoint created. Resource name: projects/987533379656/locations/us-central1/endpoints/7359708623867478016
INFO:google.cloud.aiplatform.models:To use this Endpoint in another session:
INFO:google.cloud.aiplatform.models:endpoint = aiplatform.Endpoint('projects/987533379656/locations/us-central1/endpoints/7359708623867478016')
INFO:google.cloud.aiplatform.models:Deploying model to Endpoint : projects/987533379656/locations/us-central1/endpoints/7359708623867478016
INFO:google.cloud.aiplatform.models:Deploy Endpoint model backing LRO: projects/987533379656/locations/us-central1/endpoints/7359708623867478016/operations/1141071792916398080
INFO:google.cloud.aiplatform.models:Endpoint model deployed. Resource name: projects/98753337965

In [18]:
# Prediction
endpoint.predict(instances=[5.1, 2.5, 3.0, 1.1])

Prediction(predictions=[[0.631752431, 0.123804167, 0.244443342]], deployed_model_id='8104869644243828736', explanations=None)