<a href="https://colab.research.google.com/github/tommycon/Deeplearning-GoogleCloud/blob/main/BigQuery-Keras-AI-Platform/BQ_to_Train_Keras_on_AI_Platform_Model_Deployment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Ensure the right version of Tensorflow is installed.
!pip freeze | grep tensorflow==2.1

In [None]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


In [None]:
# change these to try this notebook out
BUCKET = 'bq-deploy-ml-pipeline'
PROJECT = 'tc-test-project-260312'
REGION = 'us-central1'

In [None]:
import os
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "2.1"
os.environ["PYTHONVERSION"] = "3.7"

In [None]:
%%bash
sudo pip freeze | grep google-cloud-bigquery==1.6.1 || \
sudo pip install google-cloud-bigquery==1.6.1

google-cloud-bigquery==1.6.1


ERROR: Could not find a version that satisfies the requirement hypertune (from versions: none)
ERROR: No matching distribution found for hypertune


In [None]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION
#gsutil mb gs://bq-deploy-ml-pipeline

Updated property [core/project].
Updated property [compute/region].


In [None]:
import os
from google.cloud import bigquery

In [None]:
%%bash

# Create a BigQuery dataset for babyweight if it doesn't exist
datasetexists=$(bq ls -d | grep -w citibike)

if [ -n "$datasetexists" ]; then
    echo -e "BigQuery dataset already exists, let's not recreate it."

else
    echo "Creating BigQuery dataset titled: babyweight"
    
    bq --location=US mk --dataset \
        --description "citibike" \
        $PROJECT:citibike
    echo "Here are your current datasets:"
    bq ls
fi
    
## Create GCS bucket if it doesn't exist already...
exists=$(gsutil ls -d | grep -w gs://${BUCKET}/)

if [ -n "$exists" ]; then
    echo -e "Bucket exists, let's not recreate it."
    
else
    echo "Creating a new GCS bucket."
    gsutil mb -l ${REGION} gs://${BUCKET}
    echo "Here are your current buckets:"
    gsutil ls
fi

Creating BigQuery dataset titled: babyweight
Dataset 'tc-test-project-260312:citibike' successfully created.
Here are your current datasets:
   datasetId    
 -------------- 
  babyweight    
  bqml          
  citibike      
  fastly_logs   
  weather_demo  
Bucket exists, let's not recreate it.


In [None]:
%%bigquery --project {PROJECT}
CREATE OR REPLACE TABLE
    citibike.citibike_data AS
SELECT
    tripduration,
    starttime,
    start_station_latitude,
    start_station_longitude,
    end_station_latitude,
    end_station_longitude,
    usertype,
    birth_year,
    gender
FROM
bigquery-public-data.new_york_citibike.citibike_trips
WHERE starttime > date_add(CURRENT_DATE(), interval -3 year)

### Augment dataset to simulate missing data

Now we want to augment our dataset with our simulated babyweight data by setting all gender information to `Unknown` and setting plurality of all non-single births to `Multiple(2+)`.

In [None]:
%%bigquery --project {PROJECT}
CREATE OR REPLACE TABLE
    citibike.citibike_augmented_data AS
SELECT
    tripduration,
    sqrt(pow((start_station_latitude -  end_station_latitude),2) + pow((start_station_longitude - end_station_longitude),2)) as euclidean_diff,
    usertype,
    birth_year,
    gender,
    FARM_FINGERPRINT(
        CONCAT(
            CAST(EXTRACT(YEAR FROM starttime) AS STRING),
            CAST(EXTRACT(MONTH FROM starttime) AS STRING)
        )
    ) AS hashmonth
FROM
    citibike.citibike_data

#### Split augmented dataset into train dataset

In [None]:
%%bigquery --project {PROJECT}
CREATE OR REPLACE TABLE
    citibike.citibike_data_train AS
SELECT
    tripduration,
    euclidean_diff,
    usertype,
    birth_year,
    gender,
FROM
    citibike.citibike_augmented_data
WHERE
    ABS(MOD(hashmonth, 4)) < 3

#### Split augmented dataset into eval dataset

In [None]:
%%bigquery --project {PROJECT}
CREATE OR REPLACE TABLE
    citibike.citibike_data_eval AS
SELECT
    tripduration,
    euclidean_diff,
    usertype,
    birth_year,
    gender,
FROM
    citibike.citibike_augmented_data
WHERE
    ABS(MOD(hashmonth, 4)) = 3

## Verify table creation

Verify that you created the dataset and training data table.

In [None]:
%%bigquery --project {PROJECT}
-- LIMIT 0 is a free query; this allows us to check that the table exists.
SELECT * FROM citibike.citibike_data_train
LIMIT 10

Unnamed: 0,tripduration,euclidean_diff,usertype,birth_year,gender
0,441,0.011829,Subscriber,1952,male
1,1799,0.027878,Subscriber,1949,male
2,488,0.019923,Subscriber,1900,male
3,107,0.003452,Subscriber,1952,male
4,2016,0.025544,Subscriber,1953,male
5,427,0.01185,Subscriber,1945,male
6,493,0.012062,Subscriber,1946,female
7,458,0.014109,Subscriber,1952,male
8,603,0.011565,Subscriber,1948,male
9,603,0.025594,Subscriber,2001,male


In [None]:
%%bigquery --project {PROJECT}
-- LIMIT 0 is a free query; this allows us to check that the table exists.
SELECT * FROM citibike.citibike_data_eval
LIMIT 10

Unnamed: 0,tripduration,euclidean_diff,usertype,birth_year,gender
0,857,0.024528,Subscriber,1949,male
1,1348,0.0,Subscriber,1951,male
2,510,0.010083,Subscriber,1946,male
3,99,0.003223,Subscriber,1949,male
4,223,0.011211,Subscriber,2000,male
5,669,0.015776,Subscriber,1947,male
6,121,0.00247,Subscriber,1945,male
7,503,0.009004,Subscriber,1941,male
8,305,0.012938,Subscriber,2000,male
9,1839,0.052289,Subscriber,1951,male


In [None]:
# Construct a BigQuery client object.
client = bigquery.Client(project = PROJECT)

dataset_name = "citibike"

# Create dataset reference object
dataset_ref = client.dataset(
    dataset_id=dataset_name, project=client.project)

# Export both train and eval tables
for step in ["train", "eval"]:
    destination_uri = os.path.join(
        "gs://", BUCKET, dataset_name, "data", "{}*.csv".format(step))
    table_name = "citibike_data_{}".format(step)
    table_ref = dataset_ref.table(table_name)
    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        # Location must match that of the source table.
        location="US",
    )  # API request
    extract_job.result()  # Waits for job to complete.

    print("Exported {}:{}.{} to {}".format(
        client.project, dataset_name, table_name, destination_uri))



Exported tc-test-project-260312:citibike.citibike_data_train to gs://bq-deploy-ml-pipeline/citibike/data/train*.csv
Exported tc-test-project-260312:citibike.citibike_data_eval to gs://bq-deploy-ml-pipeline/citibike/data/eval*.csv


In [None]:
%%bash
gsutil ls gs://${BUCKET}/citibike/data/*.csv

gs://bq-deploy-ml-pipeline/citibike/data/eval000000000000.csv
gs://bq-deploy-ml-pipeline/citibike/data/train000000000000.csv


In [None]:
%%bash
mkdir -p citibike/trainer
touch citibike/trainer/__init__.py

In [None]:
%%writefile citibike/trainer/task.py
import argparse
import json
import os

from trainer import model

import tensorflow as tf

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--job-dir",
        help="this model ignores this field, but it is required by gcloud",
        default="junk"
    )
    parser.add_argument(
        "--train_data_path",
        help="GCS location of training data",
        required=True
    )
    parser.add_argument(
        "--eval_data_path",
        help="GCS location of evaluation data",
        required=True
    )
    parser.add_argument(
        "--output_dir",
        help="GCS location to write checkpoints and export models",
        required=True
    )
    parser.add_argument(
        "--batch_size",
        help="Number of examples to compute gradient over.",
        type=int,
        default=512
    )
    parser.add_argument(
        "--nnsize",
        help="Hidden layer sizes for DNN -- provide space-separated layers",
        nargs="+",
        type=int,
        default=[128, 32, 4]
    )
    parser.add_argument(
        "--nembeds",
        help="Embedding size of a cross of n key real-valued parameters",
        type=int,
        default=3
    )
    parser.add_argument(
        "--num_epochs",
        help="Number of epochs to train the model.",
        type=int,
        default=10
    )
    parser.add_argument(
        "--train_examples",
        help="""Number of examples (in thousands) to run the training job over.
        If this is more than actual # of examples available, it cycles through
        them. So specifying 1000 here when you have only 100k examples makes
        this 10 epochs.""",
        type=int,
        default=5000
    )
    parser.add_argument(
        "--eval_steps",
        help="""Positive number of steps for which to evaluate model. Default
        to None, which means to evaluate until input_fn raises an end-of-input
        exception""",
        type=int,
        default=None
    )

    # Parse all arguments
    args = parser.parse_args()
    arguments = args.__dict__

    # Unused args provided by service
    arguments.pop("job_dir", None)
    arguments.pop("job-dir", None)

    # Modify some arguments
    arguments["train_examples"] *= 1000

    # Append trial_id to path if we are doing hptuning
    # This code can be removed if you are not using hyperparameter tuning
    arguments["output_dir"] = os.path.join(
        arguments["output_dir"],
        json.loads(
            os.environ.get("TF_CONFIG", "{}")
        ).get("task", {}).get("trial", "")
    )

    # Run the training job
    model.train_and_evaluate(arguments)

Overwriting citibike/trainer/task.py


In [None]:
%%writefile citibike/trainer/model.py
import datetime
import os
import shutil
import numpy as np
import tensorflow as tf
import hypertune

# Determine CSV, label, and key columns
CSV_COLUMNS = ["tripduration",
                "euclidean_diff",
                "usertype",
                "birth_year",
                "gender"]
LABEL_COLUMN = "tripduration"

# Set default values for each CSV column.
# Treat is_male and plurality as strings.
DEFAULTS = [[0.0], [0.0], ["null"], [0.0], ["null"]]


def features_and_labels(row_data):
    """Splits features and labels from feature dictionary.

    Args:
        row_data: Dictionary of CSV column names and tensor values.
    Returns:
        Dictionary of feature tensors and label tensor.
    """
    label = row_data.pop(LABEL_COLUMN)

    return row_data, label  # features, label


def load_dataset(pattern, batch_size=1, mode='eval'):
    """Loads dataset using the tf.data API from CSV files.

    Args:
        pattern: str, file pattern to glob into list of files.
        batch_size: int, the number of examples per batch.
        mode: 'train' | 'eval' to determine if training or evaluating.
    Returns:
        `Dataset` object.
    """
    print("mode = {}".format(mode))
    # Make a CSV dataset
    dataset = tf.data.experimental.make_csv_dataset(
        file_pattern=pattern,
        batch_size=batch_size,
        column_names=CSV_COLUMNS,
        column_defaults=DEFAULTS)

    # Map dataset to features and label
    dataset = dataset.map(map_func=features_and_labels)  # features, label

    # Shuffle and repeat for training
    if mode == 'train':
        dataset = dataset.shuffle(buffer_size=1000).repeat()

    # Take advantage of multi-threading; 1=AUTOTUNE
    dataset = dataset.prefetch(buffer_size=1)

    return dataset


def create_input_layers():
    """Creates dictionary of input layers for each feature.

    Returns:
        Dictionary of `tf.Keras.layers.Input` layers for each feature.
    """
    deep_inputs = {
        colname: tf.keras.layers.Input(
            name=colname, shape=(), dtype="float32")
        for colname in ["euclidean_diff","birth_year"]
    }

    wide_inputs = {
        colname: tf.keras.layers.Input(
            name=colname, shape=(), dtype="string")
        for colname in ["usertype", "gender"]
    }

    inputs = {**wide_inputs, **deep_inputs}

    return inputs


def categorical_fc(name, values):
    """Helper function to wrap categorical feature by indicator column.

    Args:
        name: str, name of feature.
        values: list, list of strings of categorical values.
    Returns:
        Categorical and indicator column of categorical feature.
    """
    cat_column = tf.feature_column.categorical_column_with_vocabulary_list(
            key=name, vocabulary_list=values)
    ind_column = tf.feature_column.indicator_column(
        categorical_column=cat_column)

    return cat_column, ind_column


def create_feature_columns(nembeds):
    """Creates wide and deep dictionaries of feature columns from inputs.

    Args:
        nembeds: int, number of dimensions to embed categorical column down to.
    Returns:
        Wide and deep dictionaries of feature columns.
    """
    deep_fc = {
        colname: tf.feature_column.numeric_column(key=colname)
        for colname in ["euclidean_diff","birth_year"]
    }
    wide_fc = {}
    is_male, wide_fc["usertype"] = categorical_fc(
        "usertype", ["Customer", "Subscriber"])
    plurality, wide_fc["gender"] = categorical_fc(
        "gender", ["male", "female"])

    # Bucketize the float fields. This makes them wide
    birth_buckets = tf.feature_column.bucketized_column(
        source_column=deep_fc["birth_year"],
        boundaries=np.arange(1900, 2020, 5).tolist())
    wide_fc["birth_buckets"] = tf.feature_column.indicator_column(
        categorical_column=birth_buckets)

    euclidean_buckets = tf.feature_column.bucketized_column(
        source_column=deep_fc["euclidean_diff"],
        boundaries=np.arange(0, 3, .0001).tolist())
    wide_fc["euclidean_buckets"] = tf.feature_column.indicator_column(
        categorical_column=euclidean_buckets)

    # Cross all the wide columns, have to do the crossing before we one-hot
    crossed = tf.feature_column.crossed_column(
        keys=[birth_buckets, euclidean_buckets],
        hash_bucket_size=1000)
    deep_fc["crossed_embeds"] = tf.feature_column.embedding_column(
        categorical_column=crossed, dimension=nembeds)

    return wide_fc, deep_fc


def get_model_outputs(wide_inputs, deep_inputs, dnn_hidden_units):
    """Creates model architecture and returns outputs.

    Args:
        wide_inputs: Dense tensor used as inputs to wide side of model.
        deep_inputs: Dense tensor used as inputs to deep side of model.
        dnn_hidden_units: List of integers where length is number of hidden
            layers and ith element is the number of neurons at ith layer.
    Returns:
        Dense tensor output from the model.
    """
    # Hidden layers for the deep side
    layers = [int(x) for x in dnn_hidden_units]
    deep = deep_inputs
    for layerno, numnodes in enumerate(layers):
        deep = tf.keras.layers.Dense(
            units=numnodes,
            activation="relu",
            name="dnn_{}".format(layerno+1))(deep)
    deep_out = deep

    # Linear model for the wide side
    wide_out = tf.keras.layers.Dense(
        units=10, activation="relu", name="linear")(wide_inputs)

    # Concatenate the two sides
    both = tf.keras.layers.concatenate(
        inputs=[deep_out, wide_out], name="both")

    # Final output is a linear activation because this is regression
    output = tf.keras.layers.Dense(
        units=1, activation="linear", name="time")(both)

    return output


def rmse(y_true, y_pred):
    """Calculates RMSE evaluation metric.

    Args:
        y_true: tensor, true labels.
        y_pred: tensor, predicted labels.
    Returns:
        Tensor with value of RMSE between true and predicted labels.
    """
    return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))


def build_wide_deep_model(dnn_hidden_units=[64, 32], nembeds=3):
    """Builds wide and deep model using Keras Functional API.

    Returns:
        `tf.keras.models.Model` object.
    """
    # Create input layers
    inputs = create_input_layers()

    # Create feature columns for both wide and deep
    wide_fc, deep_fc = create_feature_columns(nembeds)

    # The constructor for DenseFeatures takes a list of numeric columns
    # The Functional API in Keras requires: LayerConstructor()(inputs)
    wide_inputs = tf.keras.layers.DenseFeatures(
        feature_columns=wide_fc.values(), name="wide_inputs")(inputs)
    deep_inputs = tf.keras.layers.DenseFeatures(
        feature_columns=deep_fc.values(), name="deep_inputs")(inputs)

    # Get output of model given inputs
    output = get_model_outputs(wide_inputs, deep_inputs, dnn_hidden_units)

    # Build model and compile it all together
    model = tf.keras.models.Model(inputs=inputs, outputs=output)
    model.compile(optimizer="adam", loss="mse", metrics=[rmse, "mse"])

    return model


def train_and_evaluate(args):
    model = build_wide_deep_model(args["nnsize"], args["nembeds"])
    print("Here is our Wide-and-Deep architecture so far:\n")
    print(model.summary())

    trainds = load_dataset(
        args["train_data_path"],
        args["batch_size"],
        'train')

    evalds = load_dataset(
        args["eval_data_path"], 1000, 'eval')
    if args["eval_steps"]:
        evalds = evalds.take(count=args["eval_steps"])

    num_batches = args["batch_size"] * args["num_epochs"]
    steps_per_epoch = args["train_examples"] // num_batches

    checkpoint_path = os.path.join(args["output_dir"], "checkpoints/citibike")
    cp_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_path, verbose=1, save_weights_only=True)

    history = model.fit(
        trainds,
        validation_data=evalds,
        epochs=args["num_epochs"],
        steps_per_epoch=steps_per_epoch,
        verbose=2,  # 0=silent, 1=progress bar, 2=one line per epoch
        callbacks=[cp_callback])

    EXPORT_PATH = os.path.join(
        args["output_dir"], datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
    tf.saved_model.save(
        obj=model, export_dir=EXPORT_PATH)  # with default serving function
    
    hp_metric = history.history['val_rmse'][-1]

    hpt = hypertune.HyperTune()
    hpt.report_hyperparameter_tuning_metric(
        hyperparameter_metric_tag='rmse',
        metric_value=hp_metric,
        global_step=args['num_epochs'])
    
    print("Exported trained model to {}".format(EXPORT_PATH))

Overwriting citibike/trainer/model.py


In [None]:
%%bash

OUTDIR=gs://${BUCKET}/citibike/trained_model_keras
JOBID=citibike_$(date -u +%y%m%d_%H%M%S)

gcloud ai-platform jobs submit training ${JOBID} \
    --region=${REGION} \
    --module-name=trainer.task \
    --package-path=$(pwd)/citibike/trainer \
    --job-dir=${OUTDIR} \
    --staging-bucket=gs://${BUCKET} \
    --master-machine-type=n1-standard-8 \
    --scale-tier=CUSTOM \
    --runtime-version=${TFVERSION} \
    --python-version=${PYTHONVERSION} \
    -- \
    --train_data_path=gs://${BUCKET}/citibike/data/train*.csv \
    --eval_data_path=gs://${BUCKET}/citibike/data/eval*.csv \
    --output_dir=${OUTDIR} \
    --num_epochs=10 \
    --train_examples=10000 \
    --eval_steps=100 \
    --batch_size=32 \
    --nembeds=8

jobId: citibike_200827_144418
state: QUEUED


Job [citibike_200827_144418] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe citibike_200827_144418

or continue streaming the logs with the command

  $ gcloud ai-platform jobs stream-logs citibike_200827_144418


In [None]:
%%bash
gsutil ls gs://${BUCKET}/citibike/trained_model_keras/20200827150750

gs://bq-deploy-ml-pipeline/citibike/trained_model_keras/20200827150750/
gs://bq-deploy-ml-pipeline/citibike/trained_model_keras/20200827150750/saved_model.pb
gs://bq-deploy-ml-pipeline/citibike/trained_model_keras/20200827150750/assets/
gs://bq-deploy-ml-pipeline/citibike/trained_model_keras/20200827150750/variables/


In [None]:
%%bash
MODEL_NAME="citibike"
MODEL_VERSION="keras_on_gcp"
#MODEL_LOCATION=$(gsutil ls gs://${BUCKET}/babyweight/trained_model/ | tail -1)
MODEL_LOCATION=gs://${BUCKET}/citibike/trained_model_keras/20200827150750
echo "Deleting and deploying $MODEL_NAME $MODEL_VERSION from $MODEL_LOCATION ... this will take a few minutes"
#gcloud ai-platform versions delete ${MODEL_VERSION} --model ${MODEL_NAME}
#gcloud ai-platform models delete ${MODEL_NAME}
#gcloud ai-platform models create ${MODEL_NAME} --regions $REGION
gcloud ai-platform versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin ${MODEL_LOCATION} --runtime-version $TFVERSION

Deleting and deploying citibike keras_on_gcp from gs://bq-deploy-ml-pipeline/citibike/trained_model_keras/20200827150750 ... this will take a few minutes


Using endpoint [https://ml.googleapis.com/]
Creating version (this might take a few minutes)......
..........................................................................................................................................................................................................................................................................done.


In [None]:
from oauth2client.client import GoogleCredentials
import requests
import json

MODEL_NAME = 'citibike'
MODEL_VERSION = 'keras_on_gcp'

token = GoogleCredentials.get_application_default().get_access_token().access_token
api = 'https://ml.googleapis.com/v1/projects/{}/models/{}/versions/{}:predict' \
         .format(PROJECT, MODEL_NAME, MODEL_VERSION)
headers = {'Authorization': 'Bearer ' + token }

CSV_COLUMNS = ["tripduration",
                "euclidean_diff",
                "usertype",
                "birth_year",
                "gender"]
LABEL_COLUMN = "tripduration"


data = {
  'instances': [
      {
    "euclidean_diff": 0.011828876033106495,
    "usertype": "Subscriber",
    "birth_year": 1952,
    "gender": "male"
    },
    {
    "euclidean_diff": 0.011828876033106495,
    "usertype": "Subscriber",
    "birth_year": 1952,
    "gender": "female"
    },
    {
    "euclidean_diff": 0.02787824432188638,
    "usertype": "Subscriber",
    "birth_year": 1949,
    "gender": "male"
    },
    {
    "euclidean_diff": 0.02787824432188638,
    "usertype": "Subscriber",
    "birth_year": 1949,
    "gender": "female"
    },
        {
    "euclidean_diff": 0.02787824432188638,
    "usertype": "Subscriber",
    "birth_year": 1985,
    "gender": "male"
    },
    {
    "euclidean_diff": 0.02787824432188638,
    "usertype": "Subscriber",
    "birth_year": 2000,
    "gender": "male"
    },
    {
    "euclidean_diff": 0.02787824432188638,
    "usertype": "Customer",
    "birth_year": 2000,
    "gender": "male"
    },
  ]
}
response = requests.post(api, json=data, headers=headers)
print(response.content)

b'{"predictions": [{"time": [476.48712158203125]}, {"time": [567.864990234375]}, {"time": [1241.9930419921875]}, {"time": [1333.370849609375]}, {"time": [1190.6685791015625]}, {"time": [1082.8656005859375]}, {"time": [2718.666015625]}]}'
