# Structured data prediction using Cloud ML Engine
This notebook illustrates:

1. Exploring a BigQuery dataset using Datalab
2. Creating datasets for Machine Learning using Dataflow
3. Creating a model using the high-level Estimator API 
4. Training on Cloud ML Engine
5. Deploying the model
6. Predicting with the model


We will create a toy binary classifier to predict if the Standard&Poor 500 index will close positively or negatively. We will build our features using the close values of these indexes:

|Index|Country|Closing Time (EST)|Hours Before S&P Close|
|---|---|---|---|
|[All Ords](https://en.wikipedia.org/wiki/All_Ordinaries)|Australia|0100|15|
|[Nikkei 225](https://en.wikipedia.org/wiki/Nikkei_225)|Japan|0200|14|
|[Hang Seng](https://en.wikipedia.org/wiki/Hang_Seng_Index)|Hong Kong|0400|12|
|[DAX](https://en.wikipedia.org/wiki/DAX)|Germany|1130|4.5|
|[FTSE 100](https://en.wikipedia.org/wiki/FTSE_100_Index)|UK|1130|4.5|
|[NYSE Composite](https://en.wikipedia.org/wiki/NYSE_Composite)|US|1600|0|
|[Dow Jones Industrial Average](https://en.wikipedia.org/wiki/Dow_Jones_Industrial_Average)|US|1600|0|
|[S&P 500](https://en.wikipedia.org/wiki/S%26P_500_Index)|US|1600|0|

## Housekeeping 
Let us start by installing few packages that we are going to need in the following 

In [None]:
%%bash
pip uninstall -y google-cloud-dataflow
pip install --upgrade --force apache-beam[gcp]==2.2.0

After the installation is done, please restart the session. Now we are ready to set few environment variables.

In [None]:
BUCKET = # insert your bucket name (no underscore)
PROJECT = #insert your project name
REGION = #insert your region (us-central1, europe-west1)

In [None]:
BUCKET = 'lf-gcp-demo-eu-w2'
PROJECT = 'lf-gcp-demo'
REGION = 'europe-west1'

In [None]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [None]:
%%bash
# create a bucket
gsutil mb -l ${REGION} -p ${PROJECT} gs://${BUCKET}
# copy file from the vm to the newly created bucket
gsutil -m cp data/financialtimeseries/all_data.csv gs://${BUCKET}/data/financialtimeseries/

In [None]:
gcs_data_dir = 'gs://{0}/data/financialtimeseries/'.format(BUCKET)
gcs_model_dir = 'gs://{0}/ml-models/financialtimeseries/'.format(BUCKET)

If you are rerunning the notebook and you are willing to restart from scratch de-comment the statements in the following block

In [None]:
%%bash
#gsutil -m rm -rf gs://${BUCKET}/ml-models/financialtimeseries/*
#gsutil -m rm -rf gs://${BUCKET}/data/financialtimeseries/big_data/*

Create a dataset and a table by importing the data in gs://${BUCKET}/data/financialtimeseries/all_data.csv. Instructions are [here](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv).

## Exploring a BigQuery dataset using Datalab

Now that we have the data in BigQuey we can start exploring them. Let us start by writing a query to find the maximum close value for the S&P500 index. 

[Here](https://cloud.google.com/bigquery/docs/reference/standard-sql/) the reference for the Standard SQL in BigQuery and [here](http://googledatalab.github.io/pydatalab/google.datalab%20Commands.html) the information about the magic %%bg

Use the Google Charts api to plot the evolution of S&P500. You can get inspiration [here](https://cloud.google.com/bigquery/docs/visualize-datalab).

Use the Google Charts api to plot the rolling average of S&P500's close value.

Using BigQuery to perform complex calculations and aggregations is very powerful because it allows to work with PB of data. However, it does not allow to easily slice and dice the data to find interesting patterns. The usual approach is to sample the data and load them in [pandas](https://pandas.pydata.org/)' dataframes. 

In our case, we do not need to sample the data as our database fits in mememory. You can get inspiration [here](https://cloud.google.com/bigquery/docs/visualize-datalab).

Once you have laod the data, plot :
1. the time evolution of the all indexes normalized by their maximum close value
2. their [autocorrelations](https://pandas.pydata.org/pandas-docs/stable/visualization.html#visualization-autocorrelation)
3. a [pair plot](https://seaborn.pydata.org/generated/seaborn.pairplot.html)

Usually, when dealing with finacial timeseries the value at time $t$, $V_t$, is replaced by the log return $r_t = \log \big ( \frac{V_t}{V_{t-1}} \big )$. What does it changes in our case? Recreate the plots of before using the log returns.

## Creating datasets for Machine Learning using Dataflow

For building our toy binary classifier to predict if the Standard&Poor 500 index will close positively or negatively, we are going to use the following features:
1. the close value of S&P 500 of the day before
1. the close value of S&P 500 of two day before
1. the close value of Dow Jones Industrial Average of the day before
1. the close value of Dow Jones Industrial Average of two day before
1. the close value of NYSE Composite of the day before
1. the close value of NYSE Composite of two day before
1. the close value of FTSE 100 of the same day
1. the close value of FTSE 100 of the day before
1. the close value of DAX of the same day
1. the close value of DAX of the day before
1. the close value of Hang Seng of the same day
1. the close value of Hang Seng of the day before
1. the close value of Nikkei 225 of the same day
1. the close value of Nikkei 225 of the day before
1. the close value of All Ords of the same day
1. the close value of All Ords of the day before

The first step to train our model is to prepare the training data. We are going to use [Cloud Dataflow](https://cloud.google.com/dataflow/), the GCP managed version of [Apache Beam](https://beam.apache.org/), to read the data from BigQuery data and write them out as CSV files in [Google Cloud Storage](https://cloud.google.com/storage/).


The skeleton of our pipeline is going to be the following

```python
def run_pipeline():
    """Function that runs the pipeline in Dataflow"""
    job_name = 'preprocess-financialtimeseries-data' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    print 'Launching Dataflow job {} ... hang on'.format(job_name)

    options = {
        'staging_location': os.path.join(out_dir, 'tmp', 'staging'),
        'temp_location': os.path.join(out_dir, 'tmp'),
        'job_name': job_name,
        'project': PROJECT,
        'region' : REGION,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True
    }
  
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    RUNNER = 'DataflowRunner'
  
    with beam.Pipeline(RUNNER, options=opts) as pipeline:  
      
      for step in ['train', 'eval']:
          if step == 'train':
              source_query = 'SELECT * FROM ({}) WHERE MOD(hashdate,5) < 4'.format(query)
          else:
              source_query = 'SELECT * FROM ({}) WHERE MOD(hashdate,5) = 4'.format(query)

          sink_location = os.path.join(out_dir, '{}-data'.format(step))

          (pipeline 
             | '{} - Read from BigQuery'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
             | '{} - Process single row'.format(step) >> beam.ParDo(EmitShiftedValues())
             | '{} - Group by date'.format(step) >> beam.GroupByKey()
             | '{} - Create output row'.format(step) >> beam.FlatMap(create_output_row)
             | '{} - Write to GCS '.format(step) >> beam.io.Write(beam.io.WriteToText(sink_location,
                                                                  file_name_suffix='.csv',
                                                                  num_shards=5))
          )
    
   
    job = pipeline.run()
```


The starting point is a standard sql query saved in the string variable ```query```. The field ```hashdate``` is used to split the data in training and evaluation (see [here](https://www.oreilly.com/learning/repeatable-sampling-of-data-sets-in-bigquery-for-machine-learning).)

The output of this first step is a [PCollection](https://beam.apache.org/documentation/programming-guide/#pcollection-characteristics) of dictionaries whose keys are the fields of ```query```.

```EmitShiftedValues``` is a subclass of [```beam.DoFn```](https://beam.apache.org/documentation/programming-guide/#pardo) which implements the Map phase of a MapReduce algorithm.

[```beam.GroupByKey()``` ](https://beam.apache.org/documentation/programming-guide/#groupbykey) implements the Reduce phase of a MapReducte algorithm creating a PCollection.

[```beam.FlatMap```](https://beam.apache.org/documentation/programming-guide) applies to each element of the PCollection the function ```create_output_row``` creating a new PCollection. The scope of ```create_output_row``` is to create a line of the fineal csv files.

The last step writes out the data in GCS.

Now that you know the plan, implement the diffrenet pieces!

In [None]:
import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=DeprecationWarning)
    import apache_beam as beam
    print(beam.__version__)

import datetime


# Write here your query. You can test it in the BigQuery UI.
query = """

"""

out_dir = gcs_data_dir + "big_data"

  
  
class EmitShiftedValues(beam.DoFn):

  def process(self, element):
    output = []
    # Add to output the tuples that you want to emit
    
    for pair in output:
      yield pair

      
def create_output_row(pair):
  """Function that creates a row of the output
     Args:
        pairs : (key, values)
     Yield:
        a line of a csv file
  """
  # Write here code to create the csv row
  output_row = ''
  
  yield output_row
  
      
def run_pipeline():
    """Function that runs the pipeline in Dataflow"""
    job_name = 'preprocess-financialtimeseries-data' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    print 'Launching Dataflow job {} ... hang on'.format(job_name)

    options = {
        'staging_location': os.path.join(out_dir, 'tmp', 'staging'),
        'temp_location': os.path.join(out_dir, 'tmp'),
        'job_name': job_name,
        'project': PROJECT,
        'region' : REGION,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True
    }
  
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    RUNNER = 'DataflowRunner'
  
    with beam.Pipeline(RUNNER, options=opts) as pipeline:  
      
      for step in ['train', 'eval']:
          if step == 'train':
              source_query = 'SELECT * FROM ({}) WHERE MOD(hashdate,5) < 4'.format(query)
          else:
              source_query = 'SELECT * FROM ({}) WHERE MOD(hashdate,5) = 4'.format(query)

          sink_location = os.path.join(out_dir, '{}-data'.format(step))

          (pipeline 
             | '{} - Read from BigQuery'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
             | '{} - Process single row'.format(step) >> beam.ParDo(EmitShiftedValues())
             | '{} - Group by date'.format(step) >> beam.GroupByKey()
             | '{} - Create output row'.format(step) >> beam.FlatMap(create_output_row)
             | '{} - Write to GCS '.format(step) >> beam.io.Write(beam.io.WriteToText(sink_location,
                                                                  file_name_suffix='.csv',
                                                                  num_shards=5))
          )
    
   
    job = pipeline.run()

Now that we have our pipeline, we just need to submit our job to Cloud Dataflow.

In [None]:
run_pipeline()

Let us check the data created by our pipeline

In [None]:
%%bash
gsutil ls gs://${BUCKET}/data/financialtimeseries/big_data

## Creating a model using the high-level Estimator API

Before we start implementing our model, let us define a baseline. We need a baseline to judge how well we are doing. So what is your baseline?

Now let us define our model. You can find inspiration [here](https://github.com/GoogleCloudPlatform/tf-estimator-tutorials/blob/master/02%20-%20Classification/05.0%20-%20Classification%20Example%20-%20%20Census%20Income%20Prediction.ipynb).

In [None]:
import tensorflow as tf
from tensorflow import data

### Define Dataset Metadata

### Define Data Input Function

### Define Feature Columns

### Define Your Estimator

### Run Experiment

Notice that you have local file for training and evaluation at data/financialtimeseries/train-data.csv and data/financialtimeseries/eval-data.csv .

### Evaluate the Model

### Prediction

## Training on Cloud ML Engine

Submit a training job to CMLE requires to create a python package as CMLE will pip installed it. [Here](https://cloud.google.com/ml-engine/docs/packaging-trainer) are the instructions. 

To facilitate your task I have copied in the directory mk-packeges/trainer the example taken for the officil [GoogleCloudPlatform](https://github.com/GoogleCloudPlatform/cloudml-samples) github repository.

Once you have finished with writing your code, the first step is test the package [locally](https://cloud.google.com/sdk/gcloud/reference/ml-engine/local/train). 

In [None]:
%%bash
#to adapt according to what you have achieved

MODULE_NAME=trainer.task 
PACKAGE_PATH=ml-packages/trainer

gcloud ml-engine local train --module-name=$MODULE_NAME \
                             --package-path=$PACKAGE_PATH \
                             -- \
                             --your-flags

Now that your code runs locally, we can submit our training job to [CMLE](https://cloud.google.com/sdk/gcloud/reference/ml-engine/jobs/submit/training).

In [None]:
%%bash

echo "Submitting a Cloud ML Engine job..."

REGION=${REGION}
TIER=BASIC # BASIC | BASIC_GPU | STANDARD_1 | PREMIUM_1
BUCKET=${BUCKET}

MODEL_NAME="sp500_classifier"

PACKAGE_PATH=ml-packages/trainer
MODEL_DIR=gs://${BUCKET}/models/${MODEL_NAME}

#remove model directory, if you don't want to resume training, or if you have changed the model structure
#gsutil -m rm -r ${MODEL_DIR}

CURRENT_DATE=`date +%Y%m%d_%H%M%S`
JOB_NAME=train_${MODEL_NAME}_${TIER}_${CURRENT_DATE}

gcloud ml-engine jobs submit training ${JOB_NAME} \
       --job-dir=${MODEL_DIR} \
       --runtime-version=1.4 \
       --region=${REGION} \
       --scale-tier=${TIER} \
       --module-name=trainer.task \
       --package-path=${PACKAGE_PATH} \
       -- \
       --your-flags

## Deploying the model

In [None]:
%%bash

REGION=${REGION}
BUCKET=${BUCKET}

MODEL_NAME="sp500_classifier"
MODEL_VERSION="v1"

MODEL_BINARIES=<directory on gcs containing the binaries>

# deploy model to GCP
gcloud ml-engine models create ${MODEL_NAME} --regions=${REGION}

#deploy model version
gcloud ml-engine versions create ${MODEL_VERSION} --model=${MODEL_NAME} --origin=${MODEL_BINARIES} --runtime-version=1.4

echo  ${MODEL_NAME} ${MODEL_VERSION} 

## Predicting with the model

Once we have created our version, we could use it for getting predictions [online](https://cloud.google.com/ml-engine/docs/online-predict) or in [batch mode](https://cloud.google.com/ml-engine/docs/batch-predict).

We will limit ourself to testing online prediction with gcloud

In [None]:
%%bash

MODEL_NAME="sp500_classifier"
MODEL_VERSION="v1"

# invoke deployed model to make prediction given new data instances
gcloud ml-engine predict --model=${MODEL_NAME} --version=${MODEL_VERSION} --json-instances=data/financialtimeseries/new-data.json