# Part 1 - Experimentation



In [None]:
import os
import numpy as np
import pandas as pd
import uuid
import time
import tempfile

from googleapiclient import discovery
from googleapiclient import errors

from google.cloud import bigquery
from jinja2 import Template
from kfp.components import func_to_container_op
from typing import NamedTuple

from sklearn.metrics import accuracy_score
from sklearn.linear_model import SGDClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer

## Configure environment settings
Make sure to update the constants to reflect your environment settings.

In [None]:
PROJECT_ID = 'mlops-dev-4'
LAB_GCS_BUCKET='gs://mlops-dev-4-lab-11'

DATASET_LOCATION = 'US'
CLUSTER_ZONE = 'us-central1-a'
REGION = 'us-central1'

DATASET_ID = 'lab_11'
SOURCE_TABLE_ID = 'covertype'
TRAINING_TABLE_ID = 'training_split'
VALIDATION_TABLE_ID = 'validation_split'
TESTING_TABLE_ID = 'testing_split'
TRAINING_FILE_PATH = LAB_GCS_BUCKET + '/datasets/training/data.csv'
VALIDATION_FILE_PATH = LAB_GCS_BUCKET + '/datasets/validation/data.csv'
TESTING_FILE_PATH = LAB_GCS_BUCKET + '/datasets/testing/data.csv'

## Explore the source dataset 
Bring a few rows from the source dataset.

In [None]:
client = bigquery.Client(project=PROJECT_ID, location=DATASET_LOCATION)

query_template = """
SELECT *
FROM `{{ source_table }}`
LIMIT 10
"""

query = Template(query_template).render(
    source_table='{}.{}.{}'.format(PROJECT_ID, DATASET_ID, SOURCE_TABLE_ID))
df = client.query(query).to_dataframe()
num_of_columns = len(df.columns)
df

Count the number of rows and columns in the source.

In [None]:
query_template = """
SELECT count(*)
FROM `{{ source_table }}`
"""

query = Template(query_template).render(
    source_table='{}.{}.{}'.format(PROJECT_ID, DATASET_ID, SOURCE_TABLE_ID))
df = client.query(query).to_dataframe()
number_of_rows_in_full_dataset = df.iloc[0,0]
print('{} x {}'.format(number_of_rows_in_full_dataset, num_of_columns))

## Create the training, validation and testing splits
Define the sampling query template.

In [None]:
sampling_query_template = """
SELECT *
FROM 
  `{{ source_table }}` AS cover
WHERE 
  MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(cover))), {{ num_lots }}) in {{ lots_to_select }}
"""

Configure the sampling query job settings.

In [None]:
job_config = bigquery.QueryJobConfig()
job_config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
job_config.write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE
dataset_ref = client.dataset(DATASET_ID)

Create the training split table.

In [None]:
query = Template(sampling_query_template).render(
    source_table='{}.{}.{}'.format(PROJECT_ID, DATASET_ID, SOURCE_TABLE_ID),
    num_lots=10,
    lots_to_select='(1, 2, 3)')

job_config.destination = dataset_ref.table(TRAINING_TABLE_ID)
client.query(query, job_config).result()

Extract the training split table to GCS.

In [None]:
client.extract_table(dataset_ref.table(TRAINING_TABLE_ID), TRAINING_FILE_PATH).result()

Inspect the extracted file.

In [None]:
!gsutil cat -r 0-500 {TRAINING_FILE_PATH}

Create the validation split table.

In [None]:
query = Template(sampling_query_template).render(
    source_table='{}.{}.{}'.format(PROJECT_ID, DATASET_ID, SOURCE_TABLE_ID),
    num_lots=10,
    lots_to_select='(8)')

job_config.destination = dataset_ref.table(VALIDATION_TABLE_ID)
client.query(query, job_config).result()

Extract the validation split table to GCS.

In [None]:
client.extract_table(dataset_ref.table(VALIDATION_TABLE_ID), VALIDATION_FILE_PATH).result()

Create the testing split table.

In [None]:
query = Template(sampling_query_template).render(
    source_table='{}.{}.{}'.format(PROJECT_ID, DATASET_ID, SOURCE_TABLE_ID),
    num_lots=10,
    lots_to_select='(9)')

job_config.destination = dataset_ref.table(TESTING_TABLE_ID)
client.query(query, job_config).result()

Extract the testing split table to GCS.

In [None]:
client.extract_table(dataset_ref.table(TESTING_TABLE_ID), TESTING_FILE_PATH).result()

## Develop the training script

Configure the `sklearn` training pipeline.

In [None]:
numeric_features = ['Elevation', 'Aspect', 'Slope', 'Horizontal_Distance_To_Hydrology',
       'Vertical_Distance_To_Hydrology', 'Horizontal_Distance_To_Roadways',
       'Hillshade_9am', 'Hillshade_Noon', 'Hillshade_3pm',
       'Horizontal_Distance_To_Fire_Points']
categorical_features = ['Wilderness_Area', 'Soil_Type']

preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_features),
        ('cat', OneHotEncoder(), categorical_features) 
    ])

pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', SGDClassifier(loss='log'))
])

Run the pipeline locally.

In [None]:
df_train = pd.read_csv(TRAINING_FILE_PATH)
df_validation = pd.read_csv(VALIDATION_FILE_PATH)

X_train = df_train.drop('Cover_Type', axis=1)
y_train = df_train['Cover_Type']
X_validation = df_validation.drop('Cover_Type', axis=1)
y_validation = df_validation['Cover_Type']

pipeline.set_params(classifier__alpha=0.001, classifier__max_iter=200)
pipeline.fit(X_train, y_train)
pipeline.score(X_validation, y_validation)

#### Prepare the hyperparameter tuning application.
Since the training run on this dataset is computationally expensive you can benefit from running a distributed hyperparameter tuning job on AI Platform Training.

In [None]:
TRAINING_APP_FOLDER = 'training_app'
os.makedirs(TRAINING_APP_FOLDER, exist_ok=True)

Write the tuning script.

In [None]:
%%writefile {TRAINING_APP_FOLDER}/train.py

import os
import subprocess
import sys

import fire
import pickle
import numpy as np
import pandas as pd

import hypertune

from sklearn.compose import ColumnTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


def train_evaluate(job_dir, training_dataset_path, validation_dataset_path, alpha, max_iter, hptune):
    
  df_train = pd.read_csv(training_dataset_path)
  df_validation = pd.read_csv(validation_dataset_path)
  if not hptune:
    df_train = pd.concat([df_train, df_validation])

  numeric_features = ['Elevation', 'Aspect', 'Slope', 'Horizontal_Distance_To_Hydrology',
    'Vertical_Distance_To_Hydrology', 'Horizontal_Distance_To_Roadways',
    'Hillshade_9am', 'Hillshade_Noon', 'Hillshade_3pm',
    'Horizontal_Distance_To_Fire_Points']
    
  categorical_features = ['Wilderness_Area', 'Soil_Type']

  preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_features),
        ('cat', OneHotEncoder(), categorical_features) 
    ])

  pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', SGDClassifier(loss='log'))
  ])

  print('Starting training: alpha={}, max_iter={}'.format(alpha, max_iter))
  X_train = df_train.drop('Cover_Type', axis=1)
  y_train = df_train['Cover_Type']
  
    
  pipeline.set_params(classifier__alpha=alpha, classifier__max_iter=max_iter)
  pipeline.fit(X_train, y_train)
  
  if hptune:
    X_validation = df_validation.drop('Cover_Type', axis=1)
    y_validation = df_validation['Cover_Type']
    accuracy = pipeline.score(X_validation, y_validation)
    print('Model accuracy: {}'.format(accuracy))
    # Log it with hypertune
    hpt = hypertune.HyperTune()
    hpt.report_hyperparameter_tuning_metric(
      hyperparameter_metric_tag='accuracy',
      metric_value=accuracy
    )

  # Save the model
  if not hptune:
    model_filename = 'model.pkl'
    with open(model_filename, 'wb') as model_file:
        pickle.dump(pipeline, model_file)
    gcs_model_path = "{}/{}".format(job_dir, model_filename)
    subprocess.check_call(['gsutil', 'cp', model_filename, gcs_model_path], stderr=sys.stdout)
    print("Saved model in: {}".format(gcs_model_path)) 
    
if __name__ == "__main__":
  fire.Fire(train_evaluate)

Package the script into a docker image.

In [None]:
%%writefile {TRAINING_APP_FOLDER}/Dockerfile

FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U fire cloudml-hypertune
WORKDIR /app
COPY train.py .

ENTRYPOINT ["python", "train.py"]

Build the docker image.

In [None]:
IMAGE_NAME='trainer_image'
IMAGE_TAG='latest'
IMAGE_URI='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, IMAGE_TAG)

!gcloud builds submit --tag $IMAGE_URI $TRAINING_APP_FOLDER

Create the hyperparameter configuration file.

In [None]:
%%writefile {TRAINING_APP_FOLDER}/hptuning_config.yaml

trainingInput:
  hyperparameters:
    goal: MAXIMIZE
    maxTrials: 6
    maxParallelTrials: 3
    hyperparameterMetricTag: accuracy
    enableTrialEarlyStopping: TRUE 
    params:
    - parameterName: max_iter
      type: DISCRETE
      discreteValues: [
          200,
          500
          ]
    - parameterName: alpha
      type: DOUBLE
      minValue:  0.00001
      maxValue:  0.001
      scaleType: UNIT_LINEAR_SCALE

#### Submit the hyperparameter tuning job.

In [None]:
JOB_NAME = "JOB_{}".format(time.strftime("%Y%m%d_%H%M%S"))
JOB_DIR = "{}/{}".format(LAB_GCS_BUCKET, JOB_NAME)
SCALE_TIER = "BASIC"

!gcloud ai-platform jobs submit training $JOB_NAME \
--region=$REGION \
--job-dir=$LAB_GCS_BUCKET/$JOB_NAME \
--master-image-uri=$IMAGE_URI \
--scale-tier=$SCALE_TIER \
--config $TRAINING_APP_FOLDER/hptuning_config.yaml \
-- \
--training_dataset_path=$TRAINING_FILE_PATH \
--validation_dataset_path=$VALIDATION_FILE_PATH \
--hptune

#### Monitor the job.

In [None]:
!gcloud ai-platform jobs describe $JOB_NAME

In [None]:
!gcloud ai-platform jobs stream-logs $JOB_NAME

### Retrieve HP-tuning results.

Call AI Platform Training end-point.

In [None]:
ml = discovery.build('ml', 'v1')

job_id = 'projects/{}/jobs/{}'.format(PROJECT_ID, JOB_NAME)
request = ml.projects().jobs().get(name=job_id)

try:
    response = request.execute()
except errors.HttpError as err:
    print(err)
except:
    print("Unexpected error")
    
response

Retrieve the best run.

In [None]:
response['trainingOutput']['trials'][0]