In [0]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

<table align="left">
  <td>
  <a href="https://colab.research.google.com/github/GoogleCloudPlatform/ai-platform-samples/blob/main/notebooks/samples/predictions_logging/training_and_serving.ipynb">
  <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab </a>
  </td>
  <td> 
  <a href="https://github.com/GoogleCloudPlatform/ai-platform-samples/blob/main/notebooks/samples/predictions_logging/training_and_serving">
  <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"> View on GitHub </a>
  </td>
 </table>

# Overview

This tutorial shows how to train a TensorFlow classification model, using the Keras API, and deploy it to AI Platform for online prediction. The tutorial also shows how to enable [AI Platform Prediction request-response logging](https://cloud.google.com/ai-platform/prediction/docs/online-predict#requesting_logs_for_online_prediction_requests) to BigQuery.

### Dataset

We use the [covertype](https://archive.ics.uci.edu/ml/datasets/covertype) from UCI Machine Learning Repository. The task is to Predict forest cover type from cartographic variables only.

Note that the aim is to build and deploy a **minimal model** to showcase the AI Platform Prediction request-response **logging capabilities**.
Such logs enables further analysis for detecting on the serving data skews.

### Objective

The tutorial covers the following steps:

1. Prepare the data and generate metadata 
2. Train and evaluate, a TensorFlow classification model using Keras API
3. Export the trained model as a SavedModel for serving
4. Deploy the trained model to AI Platform Prediction 
5. Enabled request-response logging to BigQuery
6. Parse and query logs from BigQuery

### Costs 

This tutorial uses billable components of Google Cloud Platform (GCP):

* BigQuery
* Cloud AI Platform
* Cloud Storage

You can use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

This example uses **TensorFlow 2.x**

## Before you begin

Complete the following steps to set up your development environment, install the required packages, set up your GCP project, and authenticate your GCP account.

### Set up your local development environment

**If you are using Colab or AI Platform Notebooks**, your environment already meets
all the requirements to run this notebook. You can skip this step. **Otherwise**, make sure your environment meets this notebook's requirements. The Google Cloud guide to [Setting up a Python development
environment](https://cloud.google.com/python/setup) and the [Jupyter
installation guide](https://jupyter.org/install) provide detailed instructions
for meeting these requirements

### PIP Install Packages and dependencies

In [0]:
!pip install -q -U tensorflow==2.1
!pip install -U -q google-api-python-client
!pip install -U -q pandas

In [0]:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)  

### Set up your GCP project and GCS bucket

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a GCP project.](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project.](https://cloud.google.com/billing/docs/how-to/modify-project)

3. [Enable the AI Platform APIs and Compute Engine APIs.](https://console.cloud.google.com/flows/enableapi?apiid=ml.googleapis.com,compute_component)

4. If you are running this notebook locally, you will need to install [Google Cloud SDK](https://cloud.google.com/sdk).


When you deploy a model to AI Platform Prediction, you need to upload your model artifacts to a Cloud Storage bucket. You can then
create an AI Platform model version based on these artifacts in order to serve online predictions. **Make sure that you GCS bucket already exists**. See [creating Cloud Storage bucket](https://cloud.google.com/storage/docs/creating-buckets).  

Enter your project ID, bucket, and region bellow. Make sure to [choose a region where Cloud AI Platform services are
available](https://cloud.google.com/ml-engine/docs/tensorflow/regions)

In [0]:
PROJECT_ID = "[your-project-Id]" #@param {type:"string"}
BUCKET = '[your-bucket-name]' #@param {type:"string"}
REGION = 'us-central1' #@param {type:"string"}
!gcloud config set project $PROJECT_ID

### Authenticate your GCP account

**If you are using AI Platform Notebooks**, your environment is already
authenticated. Skip this step.

**If you are using Colab**, run the cell below and follow the instructions
when prompted to authenticate your account via oAuth.

In [0]:
try:
  from google.colab import auth
  auth.authenticate_user()
  print("Colab user is authenticated.")
except: pass

### Import libraries

In [0]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import tensorflow as tf
import pandas as pd
from google.cloud import bigquery

print("TF version: {}".format(tf.__version__))

### Define constants

You can change the default values for the following constants

In [0]:
LOCAL_WORKSPACE = './workspace'
LOCAL_DATA_DIR = os.path.join(LOCAL_WORKSPACE, 'data')
BQ_DATASET_NAME = 'prediction_logs'
BQ_TABLE_NAME = 'covertype_classifier_logs' 
MODEL_NAME = 'covertype_classifier'
VERSION_NAME = 'v1' 
TRAINING_DIR = os.path.join(LOCAL_WORKSPACE, 'training')
MODEL_DIR = os.path.join(TRAINING_DIR, 'exported_model')

### Create a local workspace

In [0]:
if tf.io.gfile.exists(LOCAL_WORKSPACE):
  print("Removing previous workspace artifacts...")
  tf.io.gfile.rmtree(LOCAL_WORKSPACE)

print("Creating a new workspace...")
tf.io.gfile.makedirs(LOCAL_WORKSPACE)
tf.io.gfile.makedirs(LOCAL_DATA_DIR)

## 1. Dataset preparation and schema generation 

The dataset is preprocessed, split, and uploaded to the `gs://workshop-datasets/covertype` public GCS location. 

We use this version of the preprocessed dataset in this notebook. For more information, see [Cover Type Dataset](https://github.com/GoogleCloudPlatform/mlops-on-gcp/tree/main/datasets/covertype)

### 1.1. Download the data

In [0]:
LOCAL_TRAIN_DATA = os.path.join(LOCAL_DATA_DIR, 'train.csv') 
LOCAL_EVAL_DATA = os.path.join(LOCAL_DATA_DIR, 'eval.csv') 

In [0]:
!gsutil cp gs://workshop-datasets/covertype/data_validation/training/dataset.csv {LOCAL_TRAIN_DATA}
!gsutil cp gs://workshop-datasets/covertype/data_validation/evaluation/dataset.csv {LOCAL_EVAL_DATA}
!wc -l {LOCAL_TRAIN_DATA}

View a sample of the downloaded data

In [0]:
sample = pd.read_csv(LOCAL_TRAIN_DATA).head()
sample.T

### 1.2 Define metadata
The following is metadata of the dataset, which is used to create the data input function, feature columns, and serving function. 

In [0]:
HEADER = ['Elevation', 'Aspect', 'Slope','Horizontal_Distance_To_Hydrology',
          'Vertical_Distance_To_Hydrology', 'Horizontal_Distance_To_Roadways',
          'Hillshade_9am', 'Hillshade_Noon', 'Hillshade_3pm',
          'Horizontal_Distance_To_Fire_Points', 'Wilderness_Area', 'Soil_Type',
          'Cover_Type']

TARGET_FEATURE_NAME = 'Cover_Type'

FEATURE_LABELS = ['0', '1', '2', '3', '4', '5', '6']

NUMERIC_FEATURE_NAMES = ['Aspect', 'Elevation', 'Hillshade_3pm', 
                         'Hillshade_9am', 'Hillshade_Noon', 
                         'Horizontal_Distance_To_Fire_Points',
                         'Horizontal_Distance_To_Hydrology',
                         'Horizontal_Distance_To_Roadways','Slope',
                         'Vertical_Distance_To_Hydrology']

CATEGORICAL_FEATURES_WITH_VOCABULARY = {
    'Soil_Type': ['2702', '2703', '2704', '2705', '2706', '2717', '3501', '3502', 
                  '4201', '4703', '4704', '4744', '4758', '5101', '6101', '6102', 
                  '6731', '7101', '7102', '7103', '7201', '7202', '7700', '7701', 
                  '7702', '7709', '7710', '7745', '7746', '7755', '7756', '7757', 
                  '7790', '8703', '8707', '8708', '8771', '8772', '8776'], 
    'Wilderness_Area': ['Cache', 'Commanche', 'Neota', 'Rawah']
}

FEATURE_NAMES = list(CATEGORICAL_FEATURES_WITH_VOCABULARY.keys()) + NUMERIC_FEATURE_NAMES

HEADER_DEFAULTS = [[0] if feature_name in NUMERIC_FEATURE_NAMES + [TARGET_FEATURE_NAME] else ['NA'] 
                   for feature_name in HEADER]

NUM_CLASSES = len(FEATURE_LABELS)

## 2. Model training and evaluation

### 2.1. Implement data input pipeline

In [0]:
RANDOM_SEED = 19830610
import multiprocessing

def create_dataset(file_pattern, 
                  batch_size=128, num_epochs=1, shuffle=False):
  
    dataset = tf.data.experimental.make_csv_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        column_names=HEADER,
        column_defaults=HEADER_DEFAULTS,
        label_name=TARGET_FEATURE_NAME,
        field_delim=',',
        header=True,
        num_epochs=num_epochs,
        shuffle=shuffle,
        shuffle_buffer_size=(5 * batch_size),
        shuffle_seed=RANDOM_SEED,
        num_parallel_reads=multiprocessing.cpu_count(),
        sloppy=True,
    )
    return dataset.cache()

The following code test reading some batches of data using the data input function.

In [0]:
index = 1
for batch in create_dataset(LOCAL_TRAIN_DATA, batch_size=5, shuffle=False).take(2):
  print("Batch: {}".format(index))
  print("========================")
  record, target = batch
  print("Input features:")
  for key in record:
    print(" - {}:{}".format(key, record[key].numpy()))
  print("Target: {}".format(target))
  index += 1
  print()

### 2.2. Create feature columns

In [0]:
import math

def create_feature_columns():
  feature_columns = []
  
  for feature_name in FEATURE_NAMES:
    # Categorical features
    if feature_name in CATEGORICAL_FEATURES_WITH_VOCABULARY:
      
      vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
      vocab_size = len(vocabulary)
      
      # Create embedding column for categotical feature column with vocabulary
      embedding_feature_column = tf.feature_column.embedding_column(
          categorical_column = tf.feature_column.categorical_column_with_vocabulary_list(
              key=feature_name,
              vocabulary_list=vocabulary), dimension=int(math.sqrt(vocab_size) + 1))
            
      feature_columns.append(embedding_feature_column)

    # Numeric features
    else:
      numeric_column = tf.feature_column.numeric_column(feature_name)
      feature_columns.append(numeric_column)

  return feature_columns


The following code tests the feature columns to be created.

In [0]:
feature_columns = create_feature_columns()

for column in feature_columns:
  print(column)

### 2.3. Create and compile the model



In [0]:
def create_model(params):

  feature_columns = create_feature_columns()
  
  layers = []
  layers.append(tf.keras.layers.DenseFeatures(feature_columns))
  for units in params.hidden_units:
    layers.append(tf.keras.layers.Dense(units=units, activation='relu'))
    layers.append(tf.keras.layers.BatchNormalization())
    layers.append(tf.keras.layers.Dropout(rate=params.dropout))
  
  layers.append(tf.keras.layers.Dense(units=NUM_CLASSES, activation='softmax'))
  
  model = tf.keras.Sequential(layers=layers, name='classifier')
    
  adam_optimzer = tf.keras.optimizers.Adam(learning_rate=params.learning_rate)

  model.compile(
        optimizer=adam_optimzer, 
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False), 
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()], 
        loss_weights=None,
        sample_weight_mode=None, 
        weighted_metrics=None, 
    )

  return model  

### 2.4. Train and evaluate experiment

#### Experiment

In [0]:
def run_experiment(model, params):

  # TensorBoard callback
  LOG_DIR = os.path.join(TRAINING_DIR, 'logs')
  tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=LOG_DIR)

  # early stopping callback
  earlystopping_callback = tf.keras.callbacks.EarlyStopping(
      monitor='val_sparse_categorical_accuracy', 
      patience=3, 
      restore_best_weights=True
  )

  callbacks = [
        tensorboard_callback,
        earlystopping_callback]

  # train dataset
  train_dataset = create_dataset(
      LOCAL_TRAIN_DATA,
      batch_size=params.batch_size,
      shuffle=True)
    
  # eval dataset
  eval_dataset = create_dataset(
      LOCAL_EVAL_DATA,
      batch_size=params.batch_size)
    
  # Prep training directory
  if tf.io.gfile.exists(TRAINING_DIR):
    print("Removing previous training artefacts...")
    tf.io.gfile.rmtree(TRAINING_DIR)

  print("Creating training directory...")
  tf.io.gfile.mkdir(TRAINING_DIR)

  print("Experiment started...")
  print(".......................................")
  
  # Run train and evaluate.
  history = model.fit(
    x=train_dataset, 
    epochs=params.epochs, 
    callbacks=callbacks,
    validation_data=eval_dataset,
  )

  print(".......................................")
  print("Experiment finished.")
  print("")

  return history


#### Hyper parameters

In [0]:
class Parameters():
    pass

TRAIN_DATA_SIZE = 431010

params = Parameters()
params.learning_rate = 0.01
params.hidden_units = [128, 128]
params.dropout = 0.15
params.batch_size =  265
params.steps_per_epoch = int(math.ceil(TRAIN_DATA_SIZE / params.batch_size))
params.epochs = 10

#### Run experiment

In [0]:
model = create_model(params)
example_batch, _ = list(
    create_dataset(LOCAL_TRAIN_DATA, batch_size=2, shuffle=True).take(1))[0]
model(example_batch)
model.summary()

In [0]:
import logging
logger = tf.get_logger()
logger.setLevel(logging.ERROR)

history = run_experiment(model, params)

#### Visualize training history

In [0]:
import matplotlib.pyplot as plt
fig, (ax1, ax2) = plt.subplots(1, 2)
fig.set_size_inches(w=(10, 5))

# Plot training & validation accuracy values
ax1.plot(history.history['sparse_categorical_accuracy'])
ax1.plot(history.history['val_sparse_categorical_accuracy'])
ax1.set_title('Model accuracy')
ax1.set(xlabel='Accuracy', ylabel='Epoch')

ax1.legend(['Train', 'Eval'], loc='upper left')


# Plot training & validation loss values
ax2.plot(history.history['loss'])
ax2.plot(history.history['val_loss'])
ax2.set_title('Model loss')
ax2.set(xlabel='Accuracy', ylabel='Epoch')
ax2.legend(['Train', 'Eval'], loc='upper left')


## 3. Model export for serving

In [0]:
MODEL_OUTPUT_KEY = 'probabilities'
SIGNATURE_NAME = 'serving_default'

### 3.1. Implement serving input receiver functions

#### Serving function

We create serving input function that expects features dictionary and returns the prediction pobabilities from the model

In [0]:
def make_features_serving_fn(model):

  @tf.function
  def serve_features_fn(features):
    return {MODEL_OUTPUT_KEY: model(features)}

  return serve_features_fn

#### Feature spec

We create feature_spec dictionary for the input features with respect to the dataset metadata

In [0]:
feature_spec = {}
for feature_name in FEATURE_NAMES:
    if feature_name in CATEGORICAL_FEATURES_WITH_VOCABULARY:
        feature_spec[feature_name] = tf.io.FixedLenFeature(
            shape=(1,), dtype=tf.string)
    else:
        feature_spec[feature_name] = tf.io.FixedLenFeature(
            shape=(1,), dtype=tf.float32)

for key, value in feature_spec.items():
  print("{}: {}".format(key, value))

### 3.2. Export the model

In [0]:
features_input_signature = {
    feature: tf.TensorSpec(shape=spec.shape, dtype=spec.dtype, name=feature)
    for feature, spec in feature_spec.items()
    }

signatures = {        
    SIGNATURE_NAME: make_features_serving_fn(model).get_concrete_function(
        features_input_signature),
    }

model.save(MODEL_DIR, save_format='tf', signatures=signatures)
print("Model is exported to: {}.".format(MODEL_DIR))

Verify the signature (inputs and outputs) of the exported model using `saved_model_cli`

In [0]:
!saved_model_cli show --dir {MODEL_DIR} --tag_set serve --signature_def {SIGNATURE_NAME}

### 3.3. Test exported model locally

Create a sample instance for prediction

In [0]:
instances = [
      { 
        'Soil_Type': '7202',
        'Wilderness_Area': 'Commanche',
        'Aspect': 61,
        'Elevation': 3091,
        'Hillshade_3pm': 129,
        'Hillshade_9am': 227,
        'Hillshade_Noon': 223,
        'Horizontal_Distance_To_Fire_Points': 2868,
        'Horizontal_Distance_To_Hydrology': 134,
        'Horizontal_Distance_To_Roadways': 0, 
        'Slope': 8, 
        'Vertical_Distance_To_Hydrology': 10,
    }
]

Prepare the sample instance in the format expected by the model signature

In [0]:
import numpy as np

def create_tf_features(instance):
 
  new_instance = {}
  for key, value in instance.items():
    if key in CATEGORICAL_FEATURES_WITH_VOCABULARY:
      new_instance[key] = tf.constant(value, dtype=tf.string)
    else:
      new_instance[key] = tf.constant(value, dtype=tf.float32)
  
  return new_instance

Load the SavedModel for prediction, and create a function that generates the prediction pobabilities from the model to return the class label with the highest probability

In [0]:
features_predictor = tf.saved_model.load(MODEL_DIR).signatures[SIGNATURE_NAME]

def local_predict(instance):

  features = create_tf_features(instance)
  probabilities = features_predictor(**features)[MODEL_OUTPUT_KEY].numpy()
  predictions = FEATURE_LABELS[int(np.argmax(probabilities))]

  return predictions

Predict using the local SavedModel

In [0]:
local_predict(instances[0])

### 3.4  Upload exported model to GCS

In [0]:
!gsutil rm -r gs://{BUCKET}/models/{MODEL_NAME}
!gsutil cp -r {MODEL_DIR} gs://{BUCKET}/models/{MODEL_NAME}

## 4. Model deployment to AI Platform 


### 4.1. Create model in AI Platform

In [0]:
!gcloud ai-platform models create {MODEL_NAME} \
  --project {PROJECT_ID} \
  --regions {REGION}

# list the models
!gcloud ai-platform models list --project {PROJECT_ID}

### 4.2. Create a model version

In [0]:
!gcloud ai-platform versions create {VERSION_NAME} \
  --model={MODEL_NAME} \
  --origin=gs://{BUCKET}/models/{MODEL_NAME} \
  --runtime-version=2.1 \
  --framework=TENSORFLOW \
  --python-version=3.7 \
  --project={PROJECT_ID}

# list the model versions
!gcloud ai-platform versions list --model={MODEL_NAME} --project={PROJECT_ID}

### 4.3. Test deployed model

Create a function to call the AI Platform Prediction model version

In [0]:
from googleapiclient import discovery

service = googleapiclient.discovery.build('ml', 'v1')
name = 'projects/{}/models/{}/versions/{}'.format(PROJECT_ID, MODEL_NAME, VERSION_NAME)
print("Service name: {}".format(name))

def caip_predict(instances):
  
  serving_instances = []
  for instance in instances:
    serving_instances.append(
        {key: [value] for key, value in instance.items()})
    
  request_body={
      'signature_name': SIGNATURE_NAME,
      'instances': serving_instances}

  response = service.projects().predict(
      name=name,
      body=request_body

  ).execute()

  if 'error' in response:
    raise RuntimeError(response['error'])

  probability_list = [output[MODEL_OUTPUT_KEY] for output in response['predictions']]
  classes = [FEATURE_LABELS[int(np.argmax(probabilities))] for probabilities in probability_list]
  return classes

Predict using AI Platform Prediction

In [0]:
caip_predict(instances)

## 5. BigQuery logging dataset preparation

### 5.1. Create the BigQuery dataset

In [0]:
client = bigquery.Client(PROJECT_ID)
dataset_names = [dataset.dataset_id for dataset in client.list_datasets(PROJECT_ID)]

dataset = bigquery.Dataset("{}.{}".format(PROJECT_ID, BQ_DATASET_NAME))
dataset.location = "US"

if BQ_DATASET_NAME not in dataset_names:
  dataset = client.create_dataset(dataset)
  print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

print("BigQuery Dataset is ready.")

### 5.2. Create the BigQuery table to store the logs


#### Table schema

In [0]:
import json

table_schema_json = [
  {
    "name": "model", 
    "type": "STRING", 
    "mode": "REQUIRED"
   },
   {
     "name":"model_version", 
     "type": "STRING", 
     "mode":"REQUIRED"
  },
  {
    "name":"time", 
    "type": "TIMESTAMP", 
    "mode": "REQUIRED"
  },
  {
    "name":"raw_data", 
    "type": "STRING", 
    "mode": "REQUIRED"
  },
  {
    "name":"raw_prediction", 
    "type": "STRING", 
    "mode": "NULLABLE"
  },
  {
    "name":"groundtruth", 
    "type": "STRING", 
    "mode": "NULLABLE"
  },
]

json.dump(
    table_schema_json, open('table_schema.json', 'w'))

#### Ceating an ingestion-time partitioned tables

In [0]:
table = bigquery.Table(
    "{}.{}.{}".format(PROJECT_ID, BQ_DATASET_NAME, BQ_TABLE_NAME))

table_names = [table.table_id for table in client.list_tables(dataset)]

if BQ_TABLE_NAME in table_names:
  print("Deleteing BQ Table: {} ...".format(BQ_TABLE_NAME))
  client.delete_table(table)

In [0]:
TIME_PARTITION_EXPERIATION = int(60 * 60 * 24 * 7)

!bq mk --table \
  --project_id={PROJECT_ID} \
  --time_partitioning_field=time \
  --time_partitioning_type=DAY \
  --time_partitioning_expiration={TIME_PARTITION_EXPERIATION} \
  {PROJECT_ID}:{BQ_DATASET_NAME}.{BQ_TABLE_NAME} \
  'table_schema.json'

### 5.3. Configre the AI Platform model version to enable request-response logging to BigQuery

In order to enable the request-response logging to an existing AI Platform Prediction model version, you need to call the `patch` API and populate the [requestLoggingConfig](https://cloud.google.com/ai-platform/prediction/docs/online-predict#requesting_logs_for_online_prediction_requests) field.

In [0]:
sampling_percentage = 1.0
bq_full_table_name = '{}.{}.{}'.format(PROJECT_ID, BQ_DATASET_NAME, BQ_TABLE_NAME)

In [0]:
logging_config = {
    "requestLoggingConfig":{
        "samplingPercentage": sampling_percentage,
        "bigqueryTableName": bq_full_table_name
        }
    }

service.projects().models().versions().patch(
    name=name,
    body=logging_config,
    updateMask="requestLoggingConfig"
    ).execute()

### 5.4. Test request-response logging

Send sample prediction requests to the model version on AI Platform Prediction

In [0]:
import time

for i in range(10):
  caip_predict(instances)
  print('.', end='')
  time.sleep(0.1)

Query the logged request-reponse entries in BigQuery

In [0]:
query = '''
  SELECT * FROM 
  `{}.{}` 
  WHERE model_version = '{}'
  ORDER BY time desc
  LIMIT {}
'''.format(BQ_DATASET_NAME, BQ_TABLE_NAME, VERSION_NAME, 3)

pd.io.gbq.read_gbq(
    query, project_id=PROJECT_ID).T


## 6. BigQuery logs parsing

### 6.1. Generate the CREATE VIEW script

In [0]:
view_name = "vw_"+BQ_TABLE_NAME+"_"+VERSION_NAME

colum_names = FEATURE_NAMES
input_features = ', \r\n  '.join(colum_names)

json_features_extraction = []
for feature_name in colum_names:
  s = "JSON_EXTRACT(instance, '$.{}')".format(feature_name) 
  if feature_name in NUMERIC_FEATURE_NAMES:
    s = "CAST({} AS NUMERIC)".format(s)
  s += " AS {}".format(feature_name)
  json_features_extraction.append(s)
json_features_extraction = ', \r\n    '.join(json_features_extraction)

class_probability_pivoting = []
for class_index, class_label in enumerate(FEATURE_LABELS):
  s = "CAST(MAX(IF(class_index = {}, class_probability, NULL)) as FLOAT64) as prob_{}".format(class_index, class_label)
  class_probability_pivoting.append(s)
class_probability_pivoting = ', \r\n  '.join(class_probability_pivoting)

class_prob = []
for class_label in FEATURE_LABELS:
  s = 'prob_{}'.format(class_label)
  class_prob.append(s)

class_prob = ', \r\n  '.join(class_prob)

case_conditions = []
for class_label in FEATURE_LABELS:
  s = 'WHEN prob_max = prob_{} THEN {}'.format(class_label, class_label)
  case_conditions.append(s)
case_conditions = '   \r\n '.join(case_conditions)

In [0]:
sql_script = '''
CREATE OR REPLACE VIEW @dataset_name.@view_name
AS

WITH step1
AS
(
  SELECT 
    model, 
    model_version, 
    time, 
    SPLIT(JSON_EXTRACT(raw_data, '$.instances'), '}],[{') instance_list, 
    SPLIT(JSON_EXTRACT(raw_prediction, '$.predictions'), '}],[{') as prediction_list
  FROM 
  `@project.@dataset_name.@table_name` 
  WHERE 
    model = '@model_name' AND
    model_version = '@version'
),

step2
AS
(
  SELECT
    model, 
    model_version, 
    time, 
    REPLACE(REPLACE(instance, '[', ''),']', '') AS instance,
    REPLACE(REPLACE(prediction, '[{"@model_output_key":[', ''),']}]', '') AS prediction,
  FROM step1
  JOIN UNNEST(step1.instance_list) AS instance
  WITH OFFSET AS f1
  JOIN UNNEST(step1.prediction_list) AS prediction
  WITH OFFSET AS f2
  ON f1=f2
),

step3 AS
(
  SELECT 
    model, 
    model_version, 
    time,
    @json_features_extraction,
    SPLIT(prediction, ',') AS class_probabilities, 
  FROM step2
),

step4
AS
(
  SELECT * EXCEPT(class_probabilities)
  FROM step3
  JOIN UNNEST(step3.class_probabilities) AS class_probability
  WITH OFFSET AS class_index
),

step5
AS
(
  SELECT
    model,
    model_version,
    time,
    @input_features,
    @class_probability_pivoting,
    MAX(CAST(class_probability AS FLOAT64)) as prob_max
  FROM step4
  GROUP BY
    model,
    model_version,
    time,
    @input_features
)

SELECT
  model,
  model_version,
  time,
  @input_features,
  @class_prob,
  CASE
  @case_conditions
  END as predicted_class
FROM
  step5

'''

In [0]:
sql_script = sql_script.replace("@project", PROJECT_ID)
sql_script = sql_script.replace("@dataset_name", BQ_DATASET_NAME)
sql_script = sql_script.replace("@table_name", BQ_TABLE_NAME)
sql_script = sql_script.replace("@view_name", view_name)
sql_script = sql_script.replace("@model_name", MODEL_NAME)
sql_script = sql_script.replace("@version", VERSION_NAME)
sql_script = sql_script.replace("@input_features", input_features)
sql_script = sql_script.replace("@json_features_extraction", json_features_extraction)
sql_script = sql_script.replace("@model_output_key", MODEL_OUTPUT_KEY)
sql_script = sql_script.replace("@class_probability_pivoting", class_probability_pivoting)
sql_script = sql_script.replace("@class_prob", class_prob)
sql_script = sql_script.replace("@case_conditions", case_conditions)

Print the generated script

In [0]:
print(sql_script)

### 6.2. Create a view to parse the logs

In [0]:
client.query(query = sql_script)
print("View was created or replaced.")

### 6.3. Query the view

In [0]:
query = '''
  SELECT * FROM 
  `{}.{}` 
  LIMIT {}
'''.format(BQ_DATASET_NAME, view_name, 3)

pd.io.gbq.read_gbq(
    query, project_id=PROJECT_ID).T

# Cleaning up

To clean up all GCP resources used in this project, you can [delete the GCP
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.