##### Copyright Deloitte

# Chicago Taxi Geographic zones prediction (Evaluation)


## Overview

This notebook is intended to showcase the steps taken to train a model for the Chicago Taxi Geographic zones prediction. Before we jump into the detailed steps used, let us get an eagle view of the entire process. Model Development has the following life cycle:

1. Data Preprocessing
2. Model Design and Training 
3. Model Evaluation and HyperParameter Tuning **[Here]**
4. Model Deployment and Inference

Our current hypothesis dictates the prediction of the High Demand Geographic zones based on several factors (more details provided in the technical document and white paper). This notebook details primarily with the Model Training aspect of the Modeling Life Cycle and showcases the necessary steps required to run a training job using AI Platform.



## Setup

Set up your GCP project

**The following steps are required, regardless of your notebook environment.**

#### 1. Define your configuration Variables :

This is the process of providing static configuration details such as the Project Name, Bucket Name etc along with some variable details such as the Number of Epochs etc. The static details can be changed to work with any other GCP project while the Variable details can be used to run HyperParameter Tuning jobs.

In [1]:
# Use project ID from console page
!gcloud config set project us-gcp-ame-con-01e-npd-1

#project name
PROJECT_NAME = "us-gcp-ame-con-01e-npd-1"

# Shows up in models pane of AI platform, helps you track your key project
MODEL_NAME = "Evaluation_One_Final"

# This defines your each experiment, try to keep it meaningful
MODEL_VERSION = "v3"

#region
REGION = "us-east4"

# JOB_NAME is auto-incremental. It will be appended by the latest timestamp when submitting the training job.
JOB_NAME = MODEL_NAME + '_' + MODEL_VERSION

#Google Storage Bucket
BUCKET_NAME = "us-gcp-ame-con-01e-npd-1-modelartifacts"

#sub folder name 
SUB_FOLDER_NAME = MODEL_NAME + "/" + MODEL_VERSION

# Creates gs directory with this name - persists transient files during AI Platform jobs tun
JOB_DIR = 'gs://' + BUCKET_NAME + '/' + SUB_FOLDER_NAME

# Should be same as JOB_DIR with model_save as folder added
MODEL_DIR = JOB_DIR + "/model_save/"

#The number of times the data is shown to the Model. This is a hyper parameter and can be tuned
NUM_EPOCHS = 100

#The restriction placed on the dataset to retrive our train data. 
TRAIN_RESTRICTION = "'trip_start_timestamp <= 1546300800'"

#The restriction placed on the dataset to retrieve our validation data
EVAL_RESTRICTION = "'trip_start_timestamp > 1546300800 and trip_start_timestamp<=1567276140'"

#Batch size for training
TRAIN_BATCH_SIZE = 5120

#Batch size for evaluation
EVAL_BATCH_SIZE = 5120


Updated property [core/project].


#### 2. Create a skeleton for the Trainer
According to the AI Platform documentation the training code has to be modularised into a Trainer package with a defined layout. The following steps create that layout.

In [2]:
### Delete the folder if it already exist or change the MODEL_VERSION to keep it unique
import os, shutil

PACKAGE_NAME = 'evaluation1'

MODEL_FILE_PATH = '{}'.format(PACKAGE_NAME)

if os.path.isdir(MODEL_FILE_PATH):
    
    shutil.rmtree(MODEL_FILE_PATH)
    
!mkdir $MODEL_FILE_PATH

trainer_path = MODEL_FILE_PATH + '/trainer.py'

init_path = MODEL_FILE_PATH + '/__init__.py'


#### 3. Create the Setup File

The Setup File is required to download any additional package that was used during the Modeling process. This is placed outside the Trainer model 

In [3]:
SETUP_FILENAME =  "setup.py"

print("setup filename: " + SETUP_FILENAME)


setup filename: setup.py


In [4]:
%%writefile $SETUP_FILENAME

# Setup file to install necessary packages
from setuptools import find_packages
from setuptools import setup

# You can define all the packages you need for your job below along with the versions

REQUIRED_PACKAGES = ['tensorflow-io==0.11.0','fastavro','cloudml-hypertune']

setup(
    name='trainer',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    description='My training application package.'
)

Overwriting setup.py


#### 4. Create the Training Module

The following cell is the Driver program of the entire module. It contains the functions to generate the Train and Evaluation Datasets along with the design of the model architecture and Model Training steps

In [5]:
MODEL_FILE = MODEL_FILE_PATH + "/trainer.py"
print("model filename: " + MODEL_FILE)

model filename: evaluation1/trainer.py


In [6]:
%%writefile $MODEL_FILE
import os
import hypertune
import argparse
from io import BytesIO
import tensorflow as tf
from google.cloud import storage
from google.cloud import bigquery
from tensorflow import feature_column
from tensorflow.keras.optimizers import SGD
from tensorflow.python.lib.io import file_io
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession


#Defining Static Params
params = \
    {
        "use_seed": True,
        "num_gpus": 0,
        "use_tpu": False,
        "tpu": None,
        "tpu_zone": None,
        "tpu_gcp_project": None,
        "beta1": 0.9,
        "beta2": 0.999,
        "epsilon": 0.1,
        "use_xla_for_gpu": False,
        "learning_rate" : 0.09
    }

def get_max_min(column, restriction_rows='trip_start_timestamp <= 1546300800'):
    
    PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID,  = 'us-gcp-ame-con-01e-npd-1.chcg_dtst.tx_trps_undrsmpld'.split('.')
    
    query = 'SELECT Max({}) as max, Min({}) as min FROM `{}`.{}.{} where {}'.format(column, column,PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID,
                                                                                    restriction_rows)
    
    client = bigquery.Client(project=PROJECT_ID)
    
    dataset_ref = client.dataset(DATASET_ID)
    
    job_config = bigquery.QueryJobConfig()
    
    query_job = client.query(query, job_config=job_config)
    
    result = query_job.to_dataframe()
    
    return result.values[0]

def get_categorical_feature_values(column):
    
    PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID,  = 'us-gcp-ame-con-01e-npd-1.chcg_dtst.tx_trps_undrsmpld'.split('.')
    
    query = 'SELECT DISTINCT {} FROM `{}`.{}.{}'.format(column, PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID)
    
    client = bigquery.Client(project=PROJECT_ID)
    
    dataset_ref = client.dataset(DATASET_ID)
    
    job_config = bigquery.QueryJobConfig()
    
    query_job = client.query(query, job_config=job_config)
    
    result = query_job.to_dataframe()
    
    return result.values[:,0]

def features_and_labels(features):
    
    label = features.pop('side')
    
    drop_column = features.pop('trip_start_timestamp')
    
    return features, label

def read_dataset( row_restriction, batch_size=2084):
    
    client = BigQueryClient()
    
    GCP_PROJECT_ID='us-gcp-ame-con-01e-npd-1'  
    
    COL_NAMES = ['payment_type','side','trip_start_timestamp','hour','day_of_week','month','weekend','special_days',
                 'trip_total','trip_miles','crime_rating','avg_prcp','avg_snow','avg_tavg']
    
    COL_TYPES = [dtypes.string, dtypes.int64, dtypes.int64] + [dtypes.int64 for i in range(0,5)] + [dtypes.float64 for i in range(0,6)]
    
    DATASET_GCP_PROJECT_ID, DATASET_ID, TABLE_ID,  = 'us-gcp-ame-con-01e-npd-1.chcg_dtst.tx_trps_undrsmpld'.split('.')
    
    bqsession = client.read_session(
        "projects/" + GCP_PROJECT_ID,
        DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
        COL_NAMES, COL_TYPES,
        requested_streams=2,
        row_restriction=row_restriction)
    
    dataset = bqsession.parallel_read_rows(num_parallel_calls=tf.data.experimental.AUTOTUNE)
    
    return dataset.prefetch(1).map(features_and_labels).shuffle(batch_size*100).batch(batch_size)

def copy_file_to_gcs(job_dir, file_path):
    
    with file_io.FileIO(file_path, mode='rb') as input_f:
        with file_io.FileIO(os.path.join(job_dir, file_path), mode='w+') as fp:
            fp.write(input_f.read())

def train_and_evaluate(params):
    
    job_dir = params['job_dir']
    MODEL_DIR = params['SUB_MODEL_DIRECTORY']
    MODEL = params['MODEL']
    train_batch_size = params['train_batch_size']
    eval_batch_size = params['eval_batch_size']
    train_epochs = params['num_epochs']
    train_restriction = params['train_restriction']
    eval_restriction = params['eval_restriction']
    train_df = read_dataset(train_restriction, train_batch_size)
    eval_df = read_dataset(eval_restriction, eval_batch_size)
    
    print("Data Ingested from the source")

    feature_columns = []

    # numeric cols
    def get_scal(feature):
        
        value = get_max_min(feature, params['train_restriction'])
        
        max_value, min_value = value[0], value[1]
        
        def minmax(x):
            
            mini = max_value
            
            maxi = min_value
            
            return (x - mini)/(maxi-mini)
        
        return(minmax)

    for header in ['trip_total','trip_miles','crime_rating','avg_prcp','avg_snow','avg_tavg']:
        
        scal_input_fn = get_scal(header)
        
        feature_columns.append(feature_column.numeric_column(header, normalizer_fn = scal_input_fn ))

    hpt = hypertune.HyperTune()

        
    class MyMetricCallback(tf.keras.callbacks.Callback):

        def on_epoch_end(self, epoch, logs=None):
            hpt.report_hyperparameter_tuning_metric(
                hyperparameter_metric_tag='val_loss',
                metric_value=logs['val_loss'],
                global_step=epoch)
            
           
    
    tb_log = tf.keras.callbacks.TensorBoard(
        log_dir=os.path.join(job_dir, '{}/{}'.format(MODEL_DIR,'logs')),
        histogram_freq=0,
        write_graph=True,
        update_freq='epoch',
        embeddings_freq=0)


    optimizer = tf.keras.optimizers.Adam(
            lr=params["learning_rate"],
            beta_1=params["beta1"],
            beta_2=params["beta2"],
            epsilon=params["epsilon"])

    crossed_feature = feature_column.crossed_column(['hour','month','day_of_week'], hash_bucket_size=1000)
    
    crossed_feature = feature_column.indicator_column(crossed_feature)
    
    feature_columns.append(crossed_feature)
    
    # categorical cols
    for header in ['payment_type','weekend','special_days']:
        
      categorical_feature = feature_column.categorical_column_with_vocabulary_list(
            header, get_categorical_feature_values(header))
    
      categorical_feature_one_hot = feature_column.indicator_column(categorical_feature)
        
      feature_columns.append(categorical_feature_one_hot)

    feature_layer = tf.keras.layers.DenseFeatures(feature_columns)

    print("Model initialised")

    Dense = tf.keras.layers.Dense
    
    model = tf.keras.Sequential(
      [
        feature_layer,
          Dense(45, activation=tf.nn.relu),
          Dense(36, activation=tf.nn.relu),
          Dense(27, activation=tf.nn.relu),
          Dense(18, activation=tf.nn.relu),
          Dense(9, activation="softmax")
      ])
  
    model.compile(
       loss='sparse_categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

    print("Model training")
    
    model.fit(train_df,
              epochs=train_epochs,
              validation_data=eval_df,
              verbose=1,
              callbacks=[MyMetricCallback()])
    
    if job_dir.startswith('gs://'):
        
        model.save(params['MODEL'],save_format='h5')
        
        copy_file_to_gcs(job_dir+'/{}'.format(MODEL_DIR), MODEL)
    
    else:
        
        model.save(os.path.join(job_dir, '{}/{}'.format(MODEL_DIR, MODEL)), save_format='h5')

    tf.saved_model.save(model, os.path.join(job_dir, '{}/{}'.format(MODEL_DIR,"export")))
    
    return 1

if __name__ == '__main__':
    
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--job-dir',
        type=str,
        help='GCS or local dir to write checkpoints and export model',
        default='/tmp/chicago-keras')

    parser.add_argument(
        '--train-batch-size',
        type=int,
        default=512,
        help='Batch size for training steps')
    
    parser.add_argument(
        '--eval-batch-size',
        type=int,
        default=512,
        help='Batch size for evaluation steps')
    
    parser.add_argument(
        '--learning-rate',
        type=float,
        default=0.00714,
        help='Learning rate for SGD')
    
    parser.add_argument(
        '--beta1',
        type=float,
        default=0.9,
        help='Learning rate for SGD')
    
    parser.add_argument(
        '--beta2',
        type=float,
        default=0.99,
        help='Learning rate for SGD')

    parser.add_argument(
        '--epsilon',
        type=float,
        default=0.99,
        help='Learning rate for SGD')
    
    parser.add_argument(
        '--sub-model-directory',
        type=str,
        default='Experiment_One',
        help='Sub Directory structure for the expirement')
    
    parser.add_argument(
        '--train-restriction',
        type=str,
        default="'trip_start_timestamp <= 1546300800'",
        help='Retriction for Train Set')
    
    parser.add_argument(
        '--num-epochs',
        type=int,
        default=114,
        help='Maximum number of epochs on which to train')
    
    parser.add_argument(
        '--eval-restriction',
        type=str,
        default="'trip_start_timestamp > 1546300800 and trip_start_timestamp<=1567276140'",
        help='Restriction for Evaluation Set')

    args, _ = parser.parse_known_args()
    print("Args available {}".format(args))
    params['SUB_MODEL_DIRECTORY'] = args.sub_model_directory
    params['MODEL'] = params['SUB_MODEL_DIRECTORY']+".h5"
    params['learning_rate'] = args.learning_rate
    params['beta1'] = args.beta1
    params['beta2'] = args.beta2
    params['epsilon'] = args.epsilon
    params['num_epochs'] = args.num_epochs
    params['train_batch_size'] = args.train_batch_size
    params['job_dir'] = args.job_dir
    params['train_restriction'] = args.train_restriction
    params['eval_restriction'] = args.eval_restriction
    params['eval_batch_size'] = args.eval_batch_size
    train_and_evaluate(params)

Writing evaluation1/trainer.py


#### 5. HyperParameters Configuration File
The Hyperparameters can be defined in a Configuration File and be used to run experiments

In [7]:

CONFIG_FILE = 'config.yaml'

print("Configuration File : "+ CONFIG_FILE )


Configuration File : config.yaml


In [8]:
%%writefile $CONFIG_FILE
trainingInput:
  hyperparameters:
    goal: MINIMIZE
    hyperparameterMetricTag: val_loss
    enableTrialEarlyStopping: True
    maxTrials: 48
    maxParallelTrials: 4
    params:
      - parameterName: learning_rate
        type: DOUBLE
        minValue: 0.0001
        maxValue: 0.01
        scaleType: UNIT_REVERSE_LOG_SCALE
      - parameterName: beta1
        type: DOUBLE
        minValue: 0.0009
        maxValue: 0.9
        scaleType: UNIT_REVERSE_LOG_SCALE
      - parameterName: beta2
        type: DOUBLE
        minValue: 0.0009
        maxValue: 0.9
        scaleType: UNIT_REVERSE_LOG_SCALE
      - parameterName: epsilon
        type: DOUBLE
        minValue: 0.0001
        maxValue: 0.01
        scaleType: UNIT_REVERSE_LOG_SCALE
      - parameterName: train_batch_size
        type: INTEGER
        minValue: 1000
        maxValue: 20000
        scaleType: UNIT_LOG_SCALE
      - parameterName: num-epochs
        type: INTEGER
        minValue: 512
        maxValue: 1000
        scaleType: UNIT_LINEAR_SCALE

Overwriting config.yaml


#### 6. Create placeholders for Python Package Structure
To convert a Python file into a package a set of arbritary files are needed. This step creates those files

In [9]:
INIT_FILE = MODEL_FILE_PATH + "/__init__.py"
print("INIT file: " + INIT_FILE)

INIT file: evaluation1/__init__.py


In [10]:
%%writefile $INIT_FILE
# __init__.py

Writing evaluation1/__init__.py


In [11]:
## setting the 'module-name' variable dynamically here.
trainer_module = PACKAGE_NAME + '.trainer'
trainer_module

'evaluation1.trainer'

## AI Platform Job Trigger with HyperParameter Tuning

This triggers the AI Platform Job and transfers the logs to the Kernel. You can also check the logs in the AI Platform jobs section. Further more, to make sure that the jobs are unique, a time stamp is appended to the Job Name.

In [None]:
!gcloud ai-platform jobs submit training $JOB_NAME$(date +"%Y%m%d_%H%M%S") \
  --package-path $MODEL_FILE_PATH/ \
  --module-name $trainer_module \
  --region $REGION \
  --job-dir $JOB_DIR \
  --python-version 3.7 \
  --stream-logs \
  --runtime-version 2.2 \
  --config $CONFIG_FILE \
  -- \
  --sub-model-directory $MODEL_NAME \
  --train-restriction $TRAIN_RESTRICTION \
  --eval-restriction $EVAL_RESTRICTION 


#Uncomment the steps below to enable distributed training

#   --scale-tier custom \
#   --master-machine-type n1-highmem-8 \
#   --worker-machine-type n1-highmem-4 \
#   --parameter-server-machine-type n1-highmem-4 \
#   --parameter-server-count 1 \
#   --worker-count 1


Job [Evaluation_One_Final_v320210226_235328] submitted successfully.


Since the model architecture remains the same for all the models, we are only going to run one Hyper Parameter Job and use the final parameters for all the Experiments