# Using custom containers with AI Platform Training

**Learning Objectives:**
1. Learn how to create a train and a validation split with Big Query
1. Learn how to wrap a machine learning model into a Docker container and train in on CAIP
1. Learn how to use the hyperparameter tunning engine on GCP to find the best hyperparameters
1. Learn how to deploy a trained machine learning model GCP as a rest API and query it

In this lab, you develop, package as a docker image, and run on **AI Platform Training** a training application that trains a multi-class classification model that predicts the type of forest cover from cartographic data. The [dataset](../../../datasets/covertype/README.md) used in the lab is based on **Covertype Data Set** from UCI Machine Learning Repository.

The training code uses `scikit-learn` for data pre-processing and modeling. The code has been instrumented using the `hypertune` package so it can be used with **AI Platform** hyperparameter tuning.


In [1]:
import json
import os
import numpy as np
import pandas as pd
import pickle
import uuid
import time
import tempfile

from googleapiclient import discovery
from googleapiclient import errors

from google.cloud import bigquery
from jinja2 import Template
from kfp.components import func_to_container_op
from typing import NamedTuple

from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import SGDClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer

## Configure environment settings

Set location paths, connections strings, and other environment settings. Make sure to update   `REGION`, and `ARTIFACT_STORE`  with the settings reflecting your lab environment. 

- `REGION` - the compute region for AI Platform Training and Prediction
- `ARTIFACT_STORE` - the GCS bucket created during installation of AI Platform Pipelines. The bucket name starts with the `hostedkfp-default-` prefix.

In [2]:
!gsutil ls

gs://artifacts.mlops-dev-env.appspot.com/
gs://hostedkfp-default-36un4wco1q/
gs://jk-mlops-dev-sandbox/
gs://mlops-dev-env-staging/
gs://mlops-dev-env_cloudbuild/
gs://mlops-dev-workspace/


In [66]:
REGION = 'us-central1'
ARTIFACT_STORE = 'gs://mlops-dev-env-staging'

PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]
DATA_ROOT='{}/data'.format(ARTIFACT_STORE)
JOB_DIR_ROOT='{}/jobs'.format(ARTIFACT_STORE)
TRAINING_FILE_PATH='{}/{}/{}'.format(DATA_ROOT, 'training', 'dataset.csv')
VALIDATION_FILE_PATH='{}/{}/{}'.format(DATA_ROOT, 'validation', 'dataset.csv')

BQ_DATASET_NAME = 'data_validation'
BQ_TABLE_NAME = 'covertype_classifier_logs'

## Explore the Covertype dataset 

In [19]:
%%bigquery
SELECT *
FROM `covertype_dataset.covertype`

Unnamed: 0,Elevation,Aspect,Slope,Horizontal_Distance_To_Hydrology,Vertical_Distance_To_Hydrology,Horizontal_Distance_To_Roadways,Hillshade_9am,Hillshade_Noon,Hillshade_3pm,Horizontal_Distance_To_Fire_Points,Wilderness_Area,Soil_Type,Cover_Type
0,2085,256,18,150,27,738,176,248,208,914,Cache,C2702,5
1,2125,256,20,30,12,871,169,248,215,300,Cache,C2702,2
2,2146,256,34,150,62,1253,122,237,239,511,Cache,C2702,2
3,2186,256,38,210,102,1294,109,232,244,552,Cache,C2702,2
4,2831,256,25,277,183,1706,153,246,225,1485,Commanche,C2705,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,3136,254,12,319,60,5734,193,248,193,2467,Rawah,C7746,1
99996,3242,254,12,636,148,3551,193,248,193,2010,Commanche,C7757,0
99997,2071,255,12,234,63,342,192,247,193,247,Cache,C2706,2
99998,3248,255,12,730,113,725,192,247,193,2724,Commanche,C7756,1


## Create training and validation splits

Use BigQuery to sample training and validation splits and save them to GCS storage
### Create a training split

In [5]:
!bq query \
-n 0 \
--destination_table covertype_dataset.training \
--replace \
--use_legacy_sql=false \
'SELECT * \
FROM `covertype_dataset.covertype` AS cover \
WHERE \
MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(cover))), 10) IN (1, 2, 3, 4)' 

Waiting on bqjob_r4cfe05c956bb50eb_00000171cd9aa1be_1 ... (2s) Current status: DONE   


In [6]:
!bq extract \
--destination_format CSV \
covertype_dataset.training \
$TRAINING_FILE_PATH

Waiting on bqjob_r1c9da0974cc46a07_00000171cd9ab497_1 ... (0s) Current status: DONE   


### Create a validation split

In [7]:
!bq query \
-n 0 \
--destination_table covertype_dataset.validation \
--replace \
--use_legacy_sql=false \
'SELECT * \
FROM `covertype_dataset.covertype` AS cover \
WHERE \
MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(cover))), 10) IN (8)' 

Waiting on bqjob_r22cb74d9fb203c2a_00000171cd9abeb8_1 ... (1s) Current status: DONE   


In [8]:
!bq extract \
--destination_format CSV \
covertype_dataset.validation \
$VALIDATION_FILE_PATH

Waiting on bqjob_r137c248044b38604_00000171cd9acc6e_1 ... (1s) Current status: DONE   


In [9]:
df_train = pd.read_csv(TRAINING_FILE_PATH)
df_validation = pd.read_csv(VALIDATION_FILE_PATH)
print(df_train.shape)
print(df_validation.shape)

(40009, 13)
(9836, 13)


## Develop a training application

### Configure the `sklearn` training pipeline.

The training pipeline preprocesses data by standardizing all numeric features using `sklearn.preprocessing.StandardScaler` and encoding all categorical features using `sklearn.preprocessing.OneHotEncoder`. It uses stochastic gradient descent linear classifier (`SGDClassifier`) for modeling.

In [10]:
numeric_feature_indexes = slice(0, 10)
categorical_feature_indexes = slice(10, 12)

preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_feature_indexes),
        ('cat', OneHotEncoder(), categorical_feature_indexes) 
    ])

pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', SGDClassifier(loss='log', tol=1e-3))
])

### Convert all numeric features to `float64`

To avoid warning messages from `StandardScaler` all numeric features are converted to `float64`.

In [11]:
num_features_type_map = {feature: 'float64' for feature in df_train.columns[numeric_feature_indexes]}

df_train = df_train.astype(num_features_type_map)
df_validation = df_validation.astype(num_features_type_map)

### Run the pipeline locally.

In [12]:
X_train = df_train.drop('Cover_Type', axis=1)
y_train = df_train['Cover_Type']
X_validation = df_validation.drop('Cover_Type', axis=1)
y_validation = df_validation['Cover_Type']

pipeline.set_params(classifier__alpha=0.001, classifier__max_iter=200)
pipeline.fit(X_train, y_train)

Pipeline(memory=None,
         steps=[('preprocessor',
                 ColumnTransformer(n_jobs=None, remainder='drop',
                                   sparse_threshold=0.3,
                                   transformer_weights=None,
                                   transformers=[('num',
                                                  StandardScaler(copy=True,
                                                                 with_mean=True,
                                                                 with_std=True),
                                                  slice(0, 10, None)),
                                                 ('cat',
                                                  OneHotEncoder(categorical_features=None,
                                                                categories=None,
                                                                drop=None,
                                                                dtype=<class 'numpy.float64

### Calculate the trained model's accuracy.

In [13]:
accuracy = pipeline.score(X_validation, y_validation)
print(accuracy)

0.6968279788531924


In [27]:
gcs_model_path = '%s/model/' % ARTIFACT_STORE
local_model_path = '/tmp/model.pkl'

In [28]:
with open(local_model_path, 'wb') as model_file:
            pickle.dump(pipeline, model_file)
        
!gsutil cp {local_model_path} {gcs_model_path}
!gsutil ls {gcs_model_path}

Copying file:///tmp/model.pkl [Content-Type=application/octet-stream]...
/ [1 files][  6.2 KiB/  6.2 KiB]                                                
Operation completed over 1 objects/6.2 KiB.                                      
gs://mlops-dev-env-staging/model/model.pkl


## Deploy the model to AI Platform Prediction

### Create a model resource

In [29]:
model_name = 'forest_cover_classifier'
labels = "task=classifier,domain=forestry"
filter = 'name:{}'.format(model_name)
models = !(gcloud ai-platform models list --filter={filter} --format='value(name)')

if not models:
    !gcloud ai-platform models create  $model_name \
    --regions=$REGION \
    --labels=$labels
else:
    print("Model: {} already exists.".format(models[0]))

Model: forest_cover_classifier already exists.


### Create a model version

In [30]:
model_version = 'v01'
filter = 'name:{}'.format(model_version)
versions = !(gcloud ai-platform versions list --model={model_name} --format='value(name)' --filter={filter})

if not versions:
    !gcloud ai-platform versions create {model_version} \
    --model={model_name} \
    --origin={gcs_model_path} \
    --runtime-version=1.15 \
    --framework=scikit-learn \
    --python-version=3.7
else:
    print("Model version: {} already exists.".format(versions[0]))

Creating version (this might take a few minutes)......done.                    


## Predict

### Define predict function

In [54]:
service = discovery.build('ml', 'v1')
name = 'projects/{}/models/{}/versions/{}'.format(PROJECT_ID, model_name, model_version)
print("Service name: {}".format(name))

def caip_predict(instances):
    
  request_body={
      'instances': instances}

  response = service.projects().predict(
      name=name,
      body=request_body

  ).execute()

  if 'error' in response:
    raise RuntimeError(response['error'])

  return response

Service name: projects/mlops-dev-env/models/forest_cover_classifier/versions/v01


### Invoke the service

In [65]:
instance_iterator = X_validation.iterrows()

for num_calls in range(5):
    instances = []
    for num_instances in range(3):
        instances.append(list(next(instance_iterator)[1].values))
    response = caip_predict(instances)
    print(response)
    print("***")
    time.sleep(0.25)

{'predictions': [1, 1, 0]}
***
{'predictions': [1, 1, 1]}
***
{'predictions': [1, 1, 0]}
***
{'predictions': [0, 0, 0]}
***
{'predictions': [2, 1, 1]}
***


## 5. BigQuery logging dataset preparation

### 5.1. Create BQ Dataset

In [68]:
from google.cloud import bigquery

client = bigquery.Client(PROJECT_ID)
dataset_names = [dataset.dataset_id for dataset in client.list_datasets(PROJECT_ID)]

dataset = bigquery.Dataset("{}.{}".format(PROJECT_ID, BQ_DATASET_NAME))
dataset.location = "US"

if BQ_DATASET_NAME not in dataset_names:
  dataset = client.create_dataset(dataset)
  print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

print("BigQuery Dataset is ready.")

BigQuery Dataset is ready.


### 5.2. Create BQ Table


#### Table schema

In [69]:
import json

table_schema_json = [
  {
    "name": "model", 
    "type": "STRING", 
    "mode": "REQUIRED"
   },
   {
     "name":"model_version", 
     "type": "STRING", 
     "mode":"REQUIRED"
  },
  {
    "name":"time", 
    "type": "TIMESTAMP", 
    "mode": "REQUIRED"
  },
  {
    "name":"raw_data", 
    "type": "STRING", 
    "mode": "REQUIRED"
  },
  {
    "name":"raw_prediction", 
    "type": "STRING", 
    "mode": "NULLABLE"
  },
  {
    "name":"groundtruth", 
    "type": "STRING", 
    "mode": "NULLABLE"
  },
]

json.dump(table_schema_json, open('table_schema.json', 'w'))

#### Ceating an ingestion-time partitioned tables

In [70]:
table = bigquery.Table(
    "{}.{}.{}".format(PROJECT_ID, BQ_DATASET_NAME, BQ_TABLE_NAME))

table_names = [table.table_id for table in client.list_tables(dataset)]

if BQ_TABLE_NAME in table_names:
  print("Deleteing BQ Table: {} ...".format(BQ_TABLE_NAME))
  client.delete_table(table)

# table = client.create_table(table)
# table.partition_expiration = 60 * 60 * 24 * 7
# print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


Deleteing BQ Table: covertype_classifier_logs ...


In [73]:
TIME_PARTITION_EXPERIATION = 60 * 60 * 24 * 7 # week

!bq mk --table \
  --project_id={PROJECT_ID} \
  --time_partitioning_type=DAY \
  --time_partitioning_expiration {TIME_PARTITION_EXPERIATION} \
  {PROJECT_ID}:{BQ_DATASET_NAME}.{BQ_TABLE_NAME} \
  'table_schema.json'

Table 'mlops-dev-env:data_validation.covertype_classifier_logs' successfully created.


### 5.3. Configre the AI Platform model version to enable request-response logging to BigQuery

In [74]:
sampling_percentage = 1.0
bq_full_table_name = '{}.{}.{}'.format(PROJECT_ID, BQ_DATASET_NAME, BQ_TABLE_NAME)

In [76]:
logging_config = {
    "requestLoggingConfig":{
        "samplingPercentage": sampling_percentage,
        "bigqueryTableName": bq_full_table_name
        }
    }

response = service.projects().models().versions().patch(
    name=name,
    body=logging_config,
    updateMask="requestLoggingConfig"
    ).execute()

response

{'name': 'projects/mlops-dev-env/operations/update_forest_cover_classifier_v01_1588297334182',
 'metadata': {'@type': 'type.googleapis.com/google.cloud.ml.v1.OperationMetadata',
  'createTime': '2020-05-01T01:42:14Z',
  'operationType': 'UPDATE_VERSION',
  'modelName': 'projects/mlops-dev-env/models/forest_cover_classifier',
  'version': {'name': 'projects/mlops-dev-env/models/forest_cover_classifier/versions/v01',
   'deploymentUri': 'gs://mlops-dev-env-staging/model/',
   'createTime': '2020-05-01T00:28:16Z',
   'runtimeVersion': '1.15',
   'state': 'READY',
   'etag': 'UStD2oUPU7E=',
   'framework': 'SCIKIT_LEARN',
   'machineType': 'mls1-c1-m2',
   'pythonVersion': '3.7',
   'requestLoggingConfig': {'samplingPercentage': 1,
    'bigqueryTableName': 'mlops-dev-env.data_validation.covertype_classifier_logs'}}}}

### 5.4. Test request-response logging

In [77]:
instance_iterator = X_validation.iterrows()

for num_calls in range(100):
    instances = []
    for num_instances in range(3):
        instances.append(list(next(instance_iterator)[1].values))
    response = caip_predict(instances)
    print(response)
    print("***")
    time.sleep(0.25)

{'predictions': [1, 1, 0]}
***
{'predictions': [1, 1, 1]}
***
{'predictions': [1, 1, 0]}
***
{'predictions': [0, 0, 0]}
***
{'predictions': [2, 1, 1]}
***
{'predictions': [1, 1, 1]}
***
{'predictions': [1, 1, 1]}
***
{'predictions': [0, 1, 1]}
***
{'predictions': [0, 0, 1]}
***
{'predictions': [1, 1, 1]}
***
{'predictions': [1, 1, 1]}
***
{'predictions': [1, 0, 0]}
***
{'predictions': [0, 0, 0]}
***
{'predictions': [0, 0, 1]}
***
{'predictions': [0, 1, 0]}
***
{'predictions': [0, 0, 0]}
***
{'predictions': [1, 0, 0]}
***
{'predictions': [0, 0, 1]}
***
{'predictions': [1, 0, 1]}
***
{'predictions': [1, 1, 1]}
***
{'predictions': [1, 1, 0]}
***
{'predictions': [0, 0, 1]}
***
{'predictions': [0, 1, 1]}
***
{'predictions': [0, 1, 1]}
***
{'predictions': [1, 0, 1]}
***
{'predictions': [1, 0, 1]}
***
{'predictions': [1, 0, 1]}
***
{'predictions': [0, 5, 1]}
***
{'predictions': [1, 1, 1]}
***
{'predictions': [1, 1, 1]}
***
{'predictions': [1, 1, 1]}
***
{'predictions': [1, 1, 1]}
***
{'predic

In [79]:
query = '''
  SELECT * FROM 
  `{}.{}` 
  WHERE model_version = '{}'
  ORDER BY time desc
  LIMIT {}
'''.format(BQ_DATASET_NAME, BQ_TABLE_NAME, model_version, 3)

pd.io.gbq.read_gbq(
    query, project_id=PROJECT_ID).T


Downloading: 100%|██████████| 3/3 [00:00<00:00, 18.63rows/s]


Unnamed: 0,0,1,2
model,forest_cover_classifier,forest_cover_classifier,forest_cover_classifier
model_version,v01,v01,v01
time,2020-05-01 01:44:53+00:00,2020-05-01 01:44:52+00:00,2020-05-01 01:44:52+00:00
raw_data,"{""instances"": [[3086.0, 308.0, 3.0, 295.0, 43....","{""instances"": [[3192.0, 21.0, 3.0, 210.0, 33.0...","{""instances"": [[2788.0, 22.0, 3.0, 30.0, -1.0,..."
raw_prediction,"{""predictions"": [0, 0, 0]}","{""predictions"": [0, 0, 1]}","{""predictions"": [1, 0, 0]}"
groundtruth,,,


<font size=-1>Licensed under the Apache License, Version 2.0 (the \"License\");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at [https://www.apache.org/licenses/LICENSE-2.0](https://www.apache.org/licenses/LICENSE-2.0)

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and limitations under the License.</font>