A Jupyter Notebook for performing ETL on the Chicago Taxi Dataset and training a modl to predict whether customers will pay with cash or credit card.

In [None]:
!gcloud services enable ml.googleapis.com
!gcloud services enable compute.googleapis.com
# !pip install -e git+https://github.com/SohierDane/BigQuery_Helper#egg=bq_helper

We initially query the dataset to view all the fields and decide which fields are useful and what kind of prediction we can make.

Of the 23 fields, we decided to cut the dataset down significantly since many fields did not have complete data, and others share the same information (i.e. community area and the latitude/longitude fields).

In [None]:
import bq_helper
from bq_helper import BigQueryHelper


#Displays a table with all the labels
chicago_taxi = bq_helper.BigQueryHelper(active_project="bigquery-public-data", dataset_name="chicago_taxi_trips")
bq_assistant = BigQueryHelper("bigquery-public-data", "chicago_taxi_trips")
bq_assistant.list_tables()
bq_assistant.head("taxi_trips", num_rows=3)
bq_assistant.table_schema("taxi_trips")

After deciding which fields were useful and the use-case of our model, we ran the query to collect and pre-processed the data:

In [None]:
from google.cloud import bigquery
client = bigquery.Client(project='ml-sandbox-1-191918')


dataset_id = 'chicagotaxi'
job_config = bigquery.QueryJobConfig()

client.delete_table('ml-sandbox-1-191918.chicagotaxi.chicago_taxi_processed', not_found_ok=True)
table_ref = client.dataset(dataset_id).table('chicago_taxi_processed')
job_config.destination = table_ref


query = '''SELECT
  IF(payment_type='Cash',1,0) cash,
  EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS day_of_week,
  (((EXTRACT(HOUR from trip_start_timestamp)*3600)+(EXTRACT(MINUTE from trip_start_timestamp)*60)+(EXTRACT(SECOND from trip_start_timestamp)))/86400) as start_time,
  (((EXTRACT(HOUR from trip_end_timestamp)*3600)+(EXTRACT(MINUTE from trip_end_timestamp)*60)+(EXTRACT(SECOND from trip_end_timestamp)))/86400) as end_time,
  EXTRACT(DAYOFYEAR FROM trip_start_timestamp) as day_of_year,
  EXTRACT(MONTH FROM trip_start_timestamp) as month,
  EXTRACT(YEAR FROM trip_start_timestamp) as year,
  trip_miles,
  (pickup_latitude - 41.660136051)/(42.021223593 - 41.660136051) AS standard_pickup_lat,
  (pickup_longitude + 87.913624596)/(-87.531386257 + 87.913624596) AS standard_pickup_long,
  (dropoff_latitude - 41.650221676)/(42.021223593 - 41.650221676 ) AS standard_dropoff_lat,
  (dropoff_longitude + 87.913624596)/(-87.531386257 + 87.913624596) AS standard_dropoff_long
FROM
  `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE
  trip_miles > 0
  AND trip_seconds > 0
  AND fare > 0
  AND payment_type in ('Cash', 'Credit Card')
  AND trip_start_timestamp IS NOT NULL
  AND trip_end_timestamp IS NOT NULL
  AND trip_miles IS NOT NULL
  AND pickup_latitude IS NOT NULL
  AND pickup_longitude IS NOT NULL
  AND dropoff_latitude IS NOT NULL
  AND dropoff_longitude IS NOT NULL;
'''

query_job = client.query(query, location='US', job_config=job_config)

query_job.result()  # Waits for the query to finish
print('Query results loaded to table {}'.format(table_ref.path))


The pickup and dropoff latitudes and longitudes were normalized using $\frac{x-x_{min}}{x_{max}-x_{min}}$. These minimum and maximum values were found using the following query:

In [None]:
%%bigquery
SELECT
  MIN(pickup_latitude) as min_pick_lat,
  MAX(pickup_latitude) as max_pick_lat,
  MIN(pickup_longitude) as min_pick_lon,
  MAX(pickup_longitude) as max_pick_lon,
  MIN(dropoff_latitude) as min_drop_lat,
  MAX(dropoff_latitude) as max_drop_lat,
  MIN(dropoff_longitude) as min_drop_lon,
  MAX(dropoff_longitude) as max_drop_lon
FROM
  `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE
  trip_miles > 0
  AND trip_seconds > 0
  AND fare > 0
  AND payment_type in ('Cash', 'Credit Card')
  AND trip_start_timestamp IS NOT NULL
  AND trip_end_timestamp IS NOT NULL
  AND trip_miles IS NOT NULL
  AND pickup_latitude IS NOT NULL
  AND pickup_longitude IS NOT NULL
  AND dropoff_latitude IS NOT NULL
  AND dropoff_longitude IS NOT NULL;

After pre-processing the data, we ran matrix correlation to check if our problem could be solved simply. We found that there was no direct correlation between payment type and any of the other fields, so we moved on to using a Linear ML Classifier. 

In [None]:
%%bigquery
SELECT
  CORR(cash,
    trip_miles) AS trip_miles_corr,
  CORR(cash,
    standard_pickup_lat) AS pickup_latitude_corr,
  CORR(cash,
    standard_pickup_long) AS pickup_longitude_corr,
  CORR(cash,
    standard_dropoff_lat) AS dropoff_latitude_corr,
  CORR(cash,
    standard_dropoff_long) AS dropoff_longitude_corr,
  CORR(cash,
    start_time) AS dropoff_time_corr,
  CORR(cash,
    year) AS dropoff_year_corr,
  CORR(cash,
    month) AS month_corr,
  CORR(cash,
    day_of_year) AS day_corr,
  CORR(cash,
    day_of_week) AS weekday_corr
FROM
  `ml-sandbox-1-191918.chicagotaxi.chicago_taxi_processed`

We then send the processed BigQuery table to a Google Cloud Storage bucket, where it can be accessed by our model for training:

In [None]:
from google.cloud import bigquery
client = bigquery.Client()
bucket_name = 'chicago-taxi-data-processed'
project = 'ml-sandbox-1-191918'
dataset_id = 'chicagotaxi'
table_id = 'final_taxi_standardized'
destination_uri = 'gs://{}/{}'.format(bucket_name, 'chicago-taxi-*.csv')
dataset_ref = client.dataset(dataset_id, project=project)
table_ref = dataset_ref.table(table_id)

job_config = bigquery.job.ExtractJobConfig(print_header=False)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    # Location must match that of the source table.
    location='US',
    job_config=job_config)  # API request

extract_job.result()  # Waits for job to complete.

print('Exported {}:{}.{} to {}'.format(
    project, dataset_id, table_id, destination_uri))

Combine files into one for training:

In [None]:
!gsutil compose gs://gcp-cert-demo-1/data/csv/train/*.csv gs://gcp-cert-demo-1/data/csv/train-single.csv!gsutil compose gs://gcp-cert-demo-1/data/csv/test/*.csv gs://gcp-cert-demo-1/data/csv/test-single.csv!gsutil compose gs://gcp-cert-demo-1/data/csv/validate/*.csv gs://gcp-cert-demo-1/data/csv/validate-single.csv

Execute training job:

In [None]:
%%writefile config.yaml
trainingInput:
  scaleTier: CUSTOM
  masterType: large_model_v100
  args:
    - "--preprocess"
    - "--training_data_path=gs://chicago-taxi-data-processed/processed-chicago-taxi.csv"
    - "--validation_split=0.2"
    - "--test_split=0.1"
    - "--job-dir=gs://chicago-taxi-data-processed/tuesday_taxi_2"
    - "--model_type=classification"
    - "--max_steps=10000000"
    - "--learning_rate=0.0002"
    - "--eval_steps=1000"
    - "--batch_size=10"
    - "--eval_frequency_secs=100"
    - "--optimizer_type=ftrl"
  region: us-east1
  jobDir: gs://chicago-taxi-data-processed/
  masterConfig:
    imageUri: gcr.io/cloud-ml-algos/linear_learner_gpu:latest

In [None]:
import time
BUCKET_NAME='chicago-taxi-data-processed'
IMAGE_URI='gcr.io/cloud-ml-algos/linear_learner_cpu:latest'

# Specify the Cloud Storage path to your training input data.
TRAINING_DATA='gs://chicago-taxi-data-processed/processed-chicago-taxi.csv'

MODEL_TYPE='classification'
JOB_ID = "tuesday_taxi_{}".format(int(time.time()))

JOB_DIR="gs://chicago-taxi-data-processed/algorithm_training/"

In [None]:
!gcloud beta ai-platform jobs submit training $JOB_ID \
  --master-image-uri=$IMAGE_URI --config $CONFIG --job-dir=$JOB_DIR --region us-central1\
  -- \
  --preprocess --model_type=$MODEL_TYPE --batch_size=4 \
  --learning_rate=0.001 --max_steps=1000 \
  --training_data_path=$TRAINING_DATA

Deploy the model:

In [None]:
VERSION_NAME="taxi_version_{}".format(int(time.time()))
MODEL_NAME="taxi_model_{}".format(int(time.time()))
MODEL_DIR="gs://chicago-taxi-data-processed/algorithm_training/model"
FRAMEWORK="TENSORFLOW"

In [None]:
!gcloud ml-engine models create $MODEL_NAME --regions us-east1
!gcloud ai-platform versions create $VERSION_NAME \
  --model=$MODEL_NAME \
  --origin=$MODEL_DIR \
  --runtime-version=1.14 \
  --framework $FRAMEWORK \
  --python-version=3.5

Generate a prediction:

In [None]:
%%writefile $INPUT_FILE
{"csv_row":"6,0.927083333,0.9375,116,4,2013,0.1,0.606309877,0.671360099,0.653137315,0.663879711","key" : "dummy-key"}
{"csv_row":"7,0.479166667,0.479166667,82,3,2013,0.1,0.770603177,0.719576369,0.751629483,0.699383324","key" : "dummy-key"}

In [None]:
!gcloud ai-platform predict --model $MODEL_NAME --version \
  $VERSION_NAME --json-instances $INPUT_FILE

In [None]:
!export GOOGLE_APPLICATION_CREDENTIALS=path/to/credentials.json

In [None]:
from gcpdemo1.predictor import Predictor

project = 'ml-sandbox-1-191918'
model = 'taxi_model_1565105801'

instances = [{"csv_row": "6,0.927083333,0.9375,116,4,2013,0.1,0.606309877,0.671360099,0.653137315,0.663879711", "key": "dummy-key"}]

predictor = Predictor(project, model)

print(predictor.predict(instances))

In [None]:
# HP Tuning from commmand line testing
python tune.py gs://gcp-cert-demo-1/test/test_results-20191007-193432.csv gs://gcp-cert-demo-1/hp_tune_tets/hp_tuning.csv

In [None]:
from gcpdemo1.tune import HPTuner

output_path = 'gs://gcp-cert-demo-1/hp_tune_tets/hp_tuning.csv'
project_name = 'ml-sandbox-1-191918'
job_id_prefix = 'gcpdemo1_mlp_tuning'
job_dir_prefix = 'gs://gcp-cert-demo-1/gcpdemo1_mlp_tuning'

# optimizer parameters:
#      "Adam" - for tf.keras.optimizers.Adam
parameters = {
    'dense_neurons_1': [64, 9],
    'dense_neurons_2': [32],
    'dense_neurons_3': [8],
    'activation': ['relu'],
    'dropout_rate_1': [0.5],
    'dropout_rate_2': [0.5],
    'dropout_rate_3': [0.5],
    'optimizer': ['Adam'],
    'learning_rate': [.0001],
    'kernel_initial_1': ['normal'],
    'kernel_initial_2': ['normal'],
    'kernel_initial_3': ['normal']
}

# Todo - change these paths, test
hp_tuner = HPTuner(project_name=project_name,
                   job_id_prefix=job_id_prefix,
                   master_type='large_model_v100',
                   job_dir_prefix=job_dir_prefix,
                   training_data_path='gs://gcp-cert-demo-1/data/csv/train-single.csv')

tuning_log_path = hp_tuner.tune(parameters, output_path)  # Make a prefix

tuning_output = pd.from_csv(tuning_log_path)

head(tuning_output)