## Structured data prediction using Cloud ML Engine 

This notebook illustrates:

1. Exploring a BigQuery dataset using Datalab
2. Creating datasets for Machine Learning using Dataflow
3. Creating a model using the high-level Estimator API 
4. Training on Cloud ML Engine
5. Deploying the model
6. Predicting with the model


We will create a toy binary classifier to predict if the Standard&Poor 500 index will close positively or negatively. We will build our features using the close values of these indexes:

|Index|Country|Closing Time (EST)|Hours Before S&P Close|
|---|---|---|---|
|[All Ords](https://en.wikipedia.org/wiki/All_Ordinaries)|Australia|0100|15|
|[Nikkei 225](https://en.wikipedia.org/wiki/Nikkei_225)|Japan|0200|14|
|[Hang Seng](https://en.wikipedia.org/wiki/Hang_Seng_Index)|Hong Kong|0400|12|
|[DAX](https://en.wikipedia.org/wiki/DAX)|Germany|1130|4.5|
|[FTSE 100](https://en.wikipedia.org/wiki/FTSE_100_Index)|UK|1130|4.5|
|[NYSE Composite](https://en.wikipedia.org/wiki/NYSE_Composite)|US|1600|0|
|[Dow Jones Industrial Average](https://en.wikipedia.org/wiki/Dow_Jones_Industrial_Average)|US|1600|0|
|[S&P 500](https://en.wikipedia.org/wiki/S%26P_500_Index)|US|1600|0|

### Housekeeping 

In [None]:
%bash
pip uninstall -y google-cloud-dataflow
pip install --upgrade --force apache-beam[gcp]==2.2.0

Restart the session

In [None]:
BUCKET = 'lf-gcp-demo-eu-w1'
PROJECT = 'lf-gcp-demo'
REGION = 'europe-west1'

In [None]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [None]:
gcs_data_dir = 'gs://{0}/data/financialtimeseries/'.format(BUCKET)
gcs_model_dir = 'gs://{0}/ml-models/financialtimeseries/'.format(BUCKET)

In [None]:
%%bash

gsutil -m rm -rf gs://${BUCKET}/ml-models/financialtimeseries/*
gsutil -m rm -rf gs://${BUCKET}/data/financialtimeseries/big_data/*
gsutil -m cp data/financialtimeseries/all_data.csv gs://${BUCKET}/data/financialtimeseries/

## Query data in BigQuery


In [None]:
%%bq query --name data

SELECT
  Date as date,
  Close as close,
  Market as market
FROM
  `lf-gcp-demo.demo.financialtimeseries`
WHERE
  market = 'snp'
ORDER BY
  market, date 

## Visualise with Datalab commands 
http://googledatalab.github.io/pydatalab/google.datalab%20Commands.html

In [None]:
%chart line --data data --fields date,close
title: Close Value
height: 400
width: 900
hAxis:
  title: Date
vAxis:
  title: Close Value of S&P

### Fetch data from BigQuery as a pandas dataframe

In [None]:
%sql --module query 

SELECT
  Date as date,
  Close as close,
  Market as market
FROM
  `lf-gcp-demo.demo.financialtimeseries`
WHERE
  Close > 0

In [None]:
import datalab.bigquery as bq
import sys
data = bq.Query(query).to_dataframe(dialect='standard')
data = data.pivot_table(index='date', columns='market', aggfunc='sum')
print('Row count:{}'.format(data.shape[0]))
data.head()

In [None]:
data.describe()

In [None]:
data.fillna(method='ffill', inplace=True)
data.fillna(method='bfill', inplace=True)

In [None]:
data.head()

### Explore & Visualise

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from pandas.plotting import autocorrelation_plot
from pandas.plotting import scatter_matrix
import numpy as np
%matplotlib inline

In [None]:
_ = data.plot(figsize=(20, 15))

In [None]:
data_scaled = data.apply(lambda x: x/x.max(), axis=0)

In [None]:
_ = data_scaled.plot(figsize=(20, 15))

In [None]:
fig = plt.figure(figsize=(20, 15))
ax = fig.add_axes()

for col in data_scaled.columns.tolist():
  ax = autocorrelation_plot(
          data_scaled[col],
          label='{1}_{0}'.format(*col),
          axes=ax)
  
_ = ax.legend(loc='upper right')

In [None]:
_ = sns.pairplot(data_scaled)

In [None]:
data.head()

In [None]:
log_return_data = data.apply(lambda x : x/x.shift(), axis=0).apply(np.log)
log_return_data.dropna(inplace=True)

In [None]:
log_return_data.describe()

In [None]:
_ = log_return_data.plot(figsize=(20, 15))

In [None]:
fig = plt.figure(figsize=(20, 15))
ax = fig.add_axes()

for col in log_return_data.columns.tolist():
  ax = autocorrelation_plot(
          log_return_data[col],
          label='{1}_{0}'.format(*col),
          axes=ax)
  
_ = ax.legend(loc='upper right')

In [None]:
_ = sns.pairplot(log_return_data)

In [None]:
log_return_data.corr()[(u'close', u'snp')]

In [None]:
log_return_data[(u'close', u'nyse_1')] = log_return_data[(u'close', u'nyse')].shift(-1)
log_return_data[(u'close', u'djia_1')] = log_return_data[(u'close', u'djia')].shift(-1)
log_return_data[(u'close', u'nyse_2')] = log_return_data[(u'close', u'nyse')].shift(-2)
log_return_data[(u'close', u'djia_2')] = log_return_data[(u'close', u'djia')].shift(-2)
log_return_data[(u'close', u'nyse_3')] = log_return_data[(u'close', u'nyse')].shift(-3)
log_return_data[(u'close', u'djia_3')] = log_return_data[(u'close', u'djia')].shift(-3)
log_return_data[(u'close', u'ftse_2')] = log_return_data[(u'close', u'djia')].shift(-2)
log_return_data[(u'close', u'dax_2')] = log_return_data[(u'close', u'nyse')].shift(-2)
log_return_data[(u'close', u'hangseng_2')] = log_return_data[(u'close', u'djia')].shift(-2)
log_return_data[(u'close', u'nikkei_2')] = log_return_data[(u'close', u'nyse')].shift(-2)
log_return_data[(u'close', u'aord_2')] = log_return_data[(u'close', u'djia')].shift(-2)

In [None]:
log_return_data.corr()[(u'close', u'snp')]

We will use as features:
1. the close value of S&P 500 of the day before
1. the close value of S&P 500 of two day before
1. the close value of Dow Jones Industrial Average of the day before
1. the close value of Dow Jones Industrial Average of two day before
1. the close value of NYSE Composite of the day before
1. the close value of NYSE Composite of two day before
1. the close value of FTSE 100 of the same day
1. the close value of FTSE 100 of the day before
1. the close value of DAX of the same day
1. the close value of DAX of the day before
1. the close value of Hang Seng of the same day
1. the close value of Hang Seng of the day before
1. the close value of Nikkei 225 of the same day
1. the close value of Nikkei 225 of the day before
1. the close value of All Ords of the same day
1. the close value of All Ords of the day before

### BEST PRACTICE : Average Weight as a Baseline Estimator

In [None]:
import numpy as np

positive_log_returns = log_return_data[(u'close',u'snp')].apply(lambda x : 1 if x > 0 else 0).sum()
total_log_returns = log_return_data[(u'close',u'snp')].dropna().shape[0]

print("Fraction of positive: {}\nFraction of negative: {}".format(
         round(1.0*positive_log_returns/total_log_returns,3),
         round(1.0-1.0*positive_log_returns/total_log_returns,3)
       )
     )

### Create ML dataset using Dataflow

Let's use Cloud Dataflow to read in the BigQuery data and write it out as CSV files. 


In [None]:
import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=DeprecationWarning)
    import apache_beam as beam
    print(beam.__version__)

import datetime


query = """
SELECT
  date,
  next_date,
  nnext_date,
  market,
  IFNULL(LOG(close/previous_close),0) log_return,
  ABS(FARM_FINGERPRINT(CAST(date AS STRING))) AS hashdate
FROM
(
  SELECT
    Date as date,
    Close as close,
    Market as market,
    LAG(close) OVER (PARTITION BY market ORDER BY date) as previous_close,
    LEAD(date) OVER (PARTITION BY market ORDER BY date) as next_date,
    LEAD(date, 2) OVER (PARTITION BY market ORDER BY date) as nnext_date
  FROM
    `lf-gcp-demo.demo.financialtimeseries`
)
    """

out_dir = gcs_data_dir + "big_data"

  
  
class EmitShiftedValues(beam.DoFn):
  
  def process(self, element):
    output = []
    
    if (
      element['date'] != None and 
      element['next_date'] != None and 
      element['nnext_date'] != None ):
      if element['market'] in ['snp']:
        output.append(
          (element['date'],('snp_label', '+' if element['log_return'] > 0 else '-'))
        )
        output += [
          (element['next_date'], ('snp_m1', element['log_return'])),
          (element['nnext_date'], ('snp_m2', element['log_return']))
        ]
      elif element['market'] in ['nyse', 'djia']:
        output += [
          (element['next_date'], ('{0}_m1'.format(element['market']), element['log_return'])),
          (element['nnext_date'], ('{0}_m2'.format(element['market']), element['log_return']))
        ]
      elif element['market'] in ['nikkei', 'hangseng', 'ftse', 'dax', 'aord']:
        output += [
          (element['date'], ('{0}_m0'.format(element['market']), element['log_return'])),
          (element['next_date'], ('{0}_m1'.format(element['market']), element['log_return']))
        ]
    for pair in output:
      yield pair

def create_output_row(pair):
  (date, data) = pair
  data = dict(data)
  OUTPUT_ROWS=[
    'snp_label', 'snp_m1', 'snp_m2', 
    'nyse_m1', 'nyse_m2',  
    'djia_m1', 'djia_m2',  
    'nikkei_m0', 'nikkei_m1', 
    'hangseng_m0', 'hangseng_m1',  
    'ftse_m0', 'ftse_m1', 
    'dax_m0', 'dax_m1',  
    'aord_m0', 'aord_m1']
  
  
  #check that all values are present
  if all(k in data for k in OUTPUT_ROWS):
    output_row = ','.join([str(data[k]) for k in OUTPUT_ROWS])
    yield output_row
  
      
def run_pipeline():
    
    job_name = 'preprocess-financialtimeseries-data' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    print 'Launching Dataflow job {} ... hang on'.format(job_name)

    options = {
        'staging_location': os.path.join(out_dir, 'tmp', 'staging'),
        'temp_location': os.path.join(out_dir, 'tmp'),
        'job_name': job_name,
        'project': PROJECT,
        'region' : REGION,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True
    }
  
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    RUNNER = 'DataflowRunner'
  
    with beam.Pipeline(RUNNER, options=opts) as pipeline:  
      
      for step in ['train', 'eval']:
          if step == 'train':
              source_query = 'SELECT * FROM ({}) WHERE MOD(hashdate,5) < 4'.format(query)
          else:
              source_query = 'SELECT * FROM ({}) WHERE MOD(hashdate,5) = 4'.format(query)

          sink_location = os.path.join(out_dir, '{}-data'.format(step))

          (pipeline 
             | '{} - Read from BigQuery'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
             | '{} - Process single row'.format(step) >> beam.ParDo(EmitShiftedValues())
             | '{} - Group by date'.format(step) >> beam.GroupByKey()
             | '{} - Create output row'.format(step) >> beam.FlatMap(create_output_row)
             | '{} - Write to GCS '.format(step) >> beam.io.Write(beam.io.WriteToText(sink_location,
                                                                  file_name_suffix='.csv',
                                                                  num_shards=5))
          )
    
   
    job = pipeline.run()

## Run Dataflow Preprocessing Pipeline

In [None]:
run_pipeline()

In [None]:
%%bash

gsutil ls gs://${BUCKET}/data/financialtimeseries/big_data

## Create TensorFlow Models using Estimator API

In [None]:
import tensorflow as tf
from tensorflow import data

print(tf.__version__)

## Train Linear Regression Model

1. Define dataset metadata + input function (to read and parse the data files)

2. Create feature columns based on metadata

3. Instantiate the model with feature columns 

4. Train, evaluate, and predict using the model and the data input function


### 1 - Define Metadata &  Input Function

In [None]:
HEADER = [
  'snp_label', 'snp_m1', 'snp_m2', 'nyse_m1', 'nyse_m2',  
  'djia_m1', 'djia_m2', 'nikkei_m0', 'nikkei_m1', 
  'hangseng_m0', 'hangseng_m1', 'ftse_m0', 'ftse_m1', 
  'dax_m0', 'dax_m1', 'aord_m0', 'aord_m1'
]
TARGET_NAME = 'snp_label'
TARGET_VALUES = ['+','-']
DEFAULTS = [
  ['null'], [0.0], [0.0], [0.0], [0.0],
  [0.0], [0.0], [0.0], [0.0], 
  [0.0], [0.0], [0.0], [0.0], 
  [0.0], [0.0], [0.0], [0.0]
]

In [None]:
def parse_csv_row(csv_row):
  columns = tf.decode_csv(tf.expand_dims(csv_row, -1), record_defaults=DEFAULTS)
  features = dict(zip(HEADER, columns))
  target = features.pop(TARGET_NAME)
  return features, target

In [None]:
def csv_input_fn(file_name, mode=tf.estimator.ModeKeys.EVAL, 
                 skip_header_lines=0, 
                 num_epochs=1, 
                 batch_size=500):
    
  shuffle = True if mode == tf.estimator.ModeKeys.TRAIN else False

  file_names = tf.matching_files(file_name)

  dataset = data.TextLineDataset(filenames=file_names)
  dataset = dataset.skip(skip_header_lines)

  if shuffle:
      dataset = dataset.shuffle(buffer_size=2 * batch_size + 1)

  dataset = dataset.batch(batch_size)
  dataset = dataset.map(lambda csv_row: parse_csv_row(csv_row))
  dataset = dataset.repeat(num_epochs)
  iterator = dataset.make_one_shot_iterator()

  features, target = iterator.get_next()
  return features, target

### 2 - Create Feature Columns

In [None]:
def create_feature_columns():
  
  snp_m1=tf.feature_column.numeric_column('snp_m1')
  snp_m2=tf.feature_column.numeric_column('snp_m2')
  nyse_m1=tf.feature_column.numeric_column('nyse_m1')
  nyse_m2=tf.feature_column.numeric_column('nyse_m2')
  djia_m1=tf.feature_column.numeric_column('djia_m1')
  djia_m2=tf.feature_column.numeric_column('djia_m2')
  nikkei_m0=tf.feature_column.numeric_column('nikkei_m0')
  nikkei_m1=tf.feature_column.numeric_column('nikkei_m1')    
  hangseng_m0=tf.feature_column.numeric_column('hangseng_m0')
  hangseng_m1=tf.feature_column.numeric_column('hangseng_m1')
  ftse_m0=tf.feature_column.numeric_column('ftse_m0')
  ftse_m1=tf.feature_column.numeric_column('ftse_m1') 
  dax_m0=tf.feature_column.numeric_column('dax_m0')
  dax_m1=tf.feature_column.numeric_column('dax_m1') 
  aord_m0=tf.feature_column.numeric_column('aord_m0')
  aord_m1=tf.feature_column.numeric_column('aord_m1')   

  features_columns = [
     snp_m1, snp_m2, nyse_m1, nyse_m2,
     djia_m1, djia_m2, nikkei_m0, nikkei_m1,
     hangseng_m0, hangseng_m1, ftse_m0, ftse_m1,
     dax_m0, dax_m1, aord_m0, aord_m1
  ]
  
  return features_columns

### 3 - Instantiate a Regression Estimator

In [None]:
local_model_dir = "trained_models/financialtimeseries_lc"

feature_columns = create_feature_columns()

lc_estimator = tf.estimator.LinearClassifier(
  feature_columns=feature_columns,
  model_dir=local_model_dir,
  label_vocabulary=TARGET_VALUES
  )


### 4 - Train, Evaluate, and Predict

In [None]:
%%bash

ls data/financialtimeseries

### a. train the model with the

In [None]:
import shutil

train_data_files = "data/financialtimeseries/train-data.csv"

train_input_fn = lambda: csv_input_fn(train_data_files, 
                                              mode=tf.estimator.ModeKeys.TRAIN, 
                                              num_epochs=None,
                                              batch_size = 200
                                         )

# remove the following line of code to resume training
shutil.rmtree(local_model_dir, ignore_errors=True)

lc_estimator.train(train_input_fn, max_steps=2000)

In [None]:
%%bash

ls trained_models/financialtimeseries_lc

### b. evaluate the trained model

In [None]:
eval_data_files = "data/financialtimeseries/train-data.csv"

eval_input_fn =lambda: csv_input_fn(eval_data_files)

lc_estimator.evaluate(eval_input_fn)

### c. predict using the trained model

In [None]:
import itertools

predictions = lc_estimator.predict(eval_input_fn)

values = [item['classes'][0] for item in itertools.islice(predictions, 5)]

print("Predicted Classes: {}".format(values))

## Train a DNN Liner Combined Classification Model + Feature Engineering

1. Define dataset metadata + input function (to read and parse the data files, + **process features**) 

2. Create feature columns based on metadata + **Extended Feature Columns**

3. Initialise the Estimator + **Wide & Deep Columns for the combined DNN model**

4. Run **train_and_evaluate** experiment: Supply TrainSpec, EvalSepc, config, and params


### 1. Define input function with process features

In [None]:
HEADER = [
  'snp_label', 'snp_m1', 'snp_m2', 'nyse_m1', 'nyse_m2',  
  'djia_m1', 'djia_m2', 'nikkei_m0', 'nikkei_m1', 
  'hangseng_m0', 'hangseng_m1', 'ftse_m0', 'ftse_m1', 
  'dax_m0', 'dax_m1', 'aord_m0', 'aord_m1'
]
TARGET_NAME = 'snp_label'
TARGET_VALUES = ['+', '-']
DEFAULTS = [
  ['null'], [0.0], [0.0], [0.0], [0.0],
  [0.0], [0.0], [0.0], [0.0], 
  [0.0], [0.0], [0.0], [0.0], 
  [0.0], [0.0], [0.0], [0.0]
]

In [None]:
def parse_csv_row(csv_row):
  columns = tf.decode_csv(tf.expand_dims(csv_row, -1), record_defaults=DEFAULTS)
  features = dict(zip(HEADER, columns))
  target = features.pop(TARGET_NAME)
  return features, target

In [None]:
# to be applied in traing and serving
def process_features(features):
    return features

In [None]:
def csv_input_fn(file_name, mode=tf.estimator.ModeKeys.EVAL, 
                 skip_header_lines=0, 
                 num_epochs=1, 
                 batch_size=500):
    
  shuffle = True if mode == tf.estimator.ModeKeys.TRAIN else False

  file_names = tf.matching_files(file_name)

  dataset = data.TextLineDataset(filenames=file_names)
  dataset = dataset.skip(skip_header_lines)

  if shuffle:
      dataset = dataset.shuffle(buffer_size=2 * batch_size + 1)

  dataset = dataset.batch(batch_size)
  dataset = dataset.map(lambda csv_row: parse_csv_row(csv_row))
  dataset = dataset.repeat(num_epochs)
  iterator = dataset.make_one_shot_iterator()

  features, target = iterator.get_next()
  return features, target

### 2. Create Feature Columns with Extensions

In [None]:
def get_deep_and_wide_columns():
  snp_m1=tf.feature_column.numeric_column('snp_m1')
  snp_m2=tf.feature_column.numeric_column('snp_m2')
  nyse_m1=tf.feature_column.numeric_column('nyse_m1')
  nyse_m2=tf.feature_column.numeric_column('nyse_m2')
  djia_m1=tf.feature_column.numeric_column('djia_m1')
  djia_m2=tf.feature_column.numeric_column('djia_m2')
  nikkei_m0=tf.feature_column.numeric_column('nikkei_m0')
  nikkei_m1=tf.feature_column.numeric_column('nikkei_m1')    
  hangseng_m0=tf.feature_column.numeric_column('hangseng_m0')
  hangseng_m1=tf.feature_column.numeric_column('hangseng_m1')
  ftse_m0=tf.feature_column.numeric_column('ftse_m0')
  ftse_m1=tf.feature_column.numeric_column('ftse_m1') 
  dax_m0=tf.feature_column.numeric_column('dax_m0')
  dax_m1=tf.feature_column.numeric_column('dax_m1') 
  aord_m0=tf.feature_column.numeric_column('aord_m0')
  aord_m1=tf.feature_column.numeric_column('aord_m1')   
  
  snp_m1_bucketized = tf.feature_column.bucketized_column(
      snp_m1, 
      boundaries=[-.05, -.04, -.03, -.02, -.01, .00, .01, .02, .03, 0.4, 0.5]
  )
  
  nyse_m1_bucketized = tf.feature_column.bucketized_column(
      nyse_m1, 
      boundaries=[-.05, -.04, -.03, -.02, -.01, .00, .01, .02, .03, 0.4, 0.5]
  )
  
  snp_m1_x_nyse_m1 = tf.feature_column.crossed_column(
      [snp_m1_bucketized, nyse_m1_bucketized],
      hash_bucket_size=10
  )
  
  wide_columns = [snp_m1_bucketized, nyse_m1_bucketized, snp_m1_x_nyse_m1]
  
  deep_columns = [
     snp_m1, snp_m2, nyse_m1, nyse_m2,
    djia_m1, djia_m2, nikkei_m0, nikkei_m1,
    hangseng_m0, hangseng_m1, ftse_m0, ftse_m1,
    dax_m0, dax_m1, aord_m0, aord_m1
  ]
  
  return wide_columns, deep_columns



### 3 - Create a DNN Regression Estimator

In [None]:
def create_DNNLinearCombinedClassifier(run_config, hparams):
  
    wide_columns, deep_columns = get_deep_and_wide_columns()

    dnn_optimizer = tf.train.AdamOptimizer(learning_rate=hparams.learning_rate)
    
    estimator = tf.estimator.DNNLinearCombinedClassifier(
        linear_feature_columns = wide_columns,
        dnn_feature_columns = deep_columns,
        dnn_optimizer=dnn_optimizer,
        dnn_hidden_units=hparams.hidden_units,
        label_vocabulary=TARGET_VALUES,
        config = run_config
                )
    
    return estimator

### 4. Run Local Experiment

### a. RunConfig and Hyper-params

In [None]:
# Hyper-parameters
hparams  = tf.contrib.training.HParams(num_epochs = 10,
                                       batch_size = 500,
                                       hidden_units=[32, 16],
                                       max_steps = 2000,
                                       learning_rate = 0.1,
                                       evaluate_after_sec=10)

# RunConfig
local_model_dir = "trained_models/financialtimeseries_dnn"
run_config = tf.estimator.RunConfig(
    tf_random_seed=19830610,
    model_dir=local_model_dir
)

### b. Serving Function

In [None]:
def csv_serving_input_fn():
  
  SERVING_HEADER = [
    'snp_m1', 'snp_m2', 'nyse_m1', 'nyse_m2',  
    'djia_m1', 'djia_m2', 'nikkei_m0', 'nikkei_m1', 
    'hangseng_m0', 'hangseng_m1', 'ftse_m0', 'ftse_m1', 
    'dax_m0', 'dax_m1', 'aord_m0', 'aord_m1'
  ]
  SERVING_HEADER_DEFAULTS = [
    [0.0], [0.0], [0.0], [0.0], 
    [0.0], [0.0], [0.0], [0.0], 
    [0.0], [0.0], [0.0], [0.0], 
    [0.0], [0.0], [0.0], [0.0]
  ]

  rows_string_tensor = tf.placeholder(dtype=tf.string,
                                         shape=[None],
                                         name='csv_rows')
    
  receiver_tensor = {'csv_rows': rows_string_tensor}

  row_columns = tf.expand_dims(rows_string_tensor, -1)
  columns = tf.decode_csv(row_columns, record_defaults=SERVING_HEADER_DEFAULTS)
  features = dict(zip(SERVING_HEADER, columns))
  
  # apply feature preprocessing used input_fn
  features = process_features(features)
  
  return tf.estimator.export.ServingInputReceiver(
        features, receiver_tensor)

### c. TrainSpec and EvalSpec

In [None]:
train_data_files = "data/financialtimeseries/train-data.csv"
eval_data_files = "data/financialtimeseries/eval-data.csv"

# TrainSpec
train_spec = tf.estimator.TrainSpec(
  input_fn = lambda: csv_input_fn(
    train_data_files,
    mode=tf.estimator.ModeKeys.TRAIN,
    num_epochs= hparams.num_epochs,
    batch_size = hparams.batch_size
  ),
  max_steps=hparams.max_steps,
)

# EvalSpec
eval_spec = tf.estimator.EvalSpec(
  input_fn =lambda: csv_input_fn(eval_data_files),
  exporters=[tf.estimator.LatestExporter(
      name="estimate",  # the name of the folder in which the model will be exported to under export
      serving_input_receiver_fn=csv_serving_input_fn,
      exports_to_keep=1,
      as_text=True)],
  steps = None,
  throttle_secs = hparams.evaluate_after_sec # evalute after each 10 training seconds!
)

### d. Run train_and_evaluate

In [None]:
import shutil

# remove the following line of code to resume training
shutil.rmtree(local_model_dir, ignore_errors=True)

dnn_estimator = create_DNNLinearCombinedClassifier(run_config, hparams)

# run train and evaluate experiment
tf.estimator.train_and_evaluate(
  dnn_estimator,
  train_spec,
  eval_spec
)



In [None]:
%%bash

ls trained_models/financialtimeseries_dnn/

### >> TensorBoard

In [None]:
from google.datalab.ml import TensorBoard
TensorBoard().start("trained_models/financialtimeseries_dnn")
TensorBoard().list()

In [None]:
#to stop TensorBoard
TensorBoard().stop(3843)
print('stopped TensorBoard')

## Train the Model on Cloud ML Engine

In [None]:
%%bash

echo "Submitting a Cloud ML Engine job..."

REGION=${REGION}
TIER=BASIC # BASIC | BASIC_GPU | STANDARD_1 | PREMIUM_1
BUCKET=${BUCKET}

MODEL_NAME="financialtimeseries"

PACKAGE_PATH=ml-packages/financialtimeseries-tf1.4/trainer
TRAIN_FILES=gs://${BUCKET}/data/financialtimeseries/big_data/train-*csv
VALID_FILES=gs://${BUCKET}/data/financialtimeseries/big_data/eval-*csv
MODEL_DIR=gs://${BUCKET}/models/${MODEL_NAME}

#remove model directory, if you don't want to resume training, or if you have changed the model structure
#gsutil -m rm -r ${MODEL_DIR}

CURRENT_DATE=`date +%Y%m%d_%H%M%S`
JOB_NAME=train_${MODEL_NAME}_${TIER}_${CURRENT_DATE}

gcloud ml-engine jobs submit training ${JOB_NAME} \
       --job-dir=${MODEL_DIR} \
       --runtime-version=1.4 \
       --region=${REGION} \
       --scale-tier=${TIER} \
       --module-name=trainer.task \
       --package-path=${PACKAGE_PATH} \
       -- \
       --train-files=${TRAIN_FILES} \
       --max-steps=5000 \
       --train-batch-size=500 \
       --eval-files=${VALID_FILES} \
       --eval-batch-size=500 \
       --learning-rate=0.01 \
       --layer-sizes-scale-factor=0.5 \
       --num-layers=3 \
       --job-dir=${MODEL_DIR}

## Train the Model on Cloud ML Engine + GPUs

In [None]:
%%bash

echo "Submitting a Cloud ML Engine job..."

REGION=${REGION}
TIER=BASIC_GPU # BASIC | BASIC_GPU | STANDARD_1 | PREMIUM_1
BUCKET=${BUCKET}

MODEL_NAME="financialtimeseries"

PACKAGE_PATH=ml-packages/financialtimeseries-tf1.4/trainer
TRAIN_FILES=gs://${BUCKET}/data/financialtimeseries/big_data/train-*csv
VALID_FILES=gs://${BUCKET}/data/financialtimeseries/big_data/eval-*csv
MODEL_DIR=gs://${BUCKET}/models/${MODEL_NAME}

#remove model directory, if you don't want to resume training, or if you have changed the model structure
#gsutil -m rm -r ${MODEL_DIR}

CURRENT_DATE=`date +%Y%m%d_%H%M%S`
JOB_NAME=train_${MODEL_NAME}_${TIER}_${CURRENT_DATE}

gcloud ml-engine jobs submit training ${JOB_NAME} \
       --job-dir=${MODEL_DIR} \
       --runtime-version=1.4 \
       --region=${REGION} \
       --scale-tier=${TIER} \
       --module-name=trainer.task \
       --package-path=${PACKAGE_PATH} \
       -- \
       --train-files=${TRAIN_FILES} \
       --max-steps=5000 \
       --train-batch-size=500 \
       --eval-files=${VALID_FILES} \
       --eval-batch-size=500 \
       --learning-rate=0.01 \
       --layer-sizes-scale-factor=0.5 \
       --num-layers=3 \
       --job-dir=${MODEL_DIR}

## Train the Model on Cloud ML Engine + Custom GPUs Cluster

In [None]:
%%bash

echo "Submitting a Cloud ML Engine job..."

REGION=${REGION}
TIER=CUSTOM # BASIC | BASIC_GPU | STANDARD_1 | PREMIUM_1
BUCKET=${BUCKET}

MODEL_NAME="financialtimeseries"

PACKAGE_PATH=ml-packages/financialtimeseries-tf1.4/trainer
TRAIN_FILES=gs://${BUCKET}/data/financialtimeseries/big_data/train-*csv
VALID_FILES=gs://${BUCKET}/data/financialtimeseries/big_data/eval-*csv
MODEL_DIR=gs://${BUCKET}/models/${MODEL_NAME}

#remove model directory, if you don't want to resume training, or if you have changed the model structure
#gsutil -m rm -r ${MODEL_DIR}

CURRENT_DATE=`date +%Y%m%d_%H%M%S`
JOB_NAME=train_${MODEL_NAME}_${TIER}_${CURRENT_DATE}

gcloud ml-engine jobs submit training ${JOB_NAME} \
       --job-dir=${MODEL_DIR} \
       --runtime-version=1.4 \
       --region=${REGION} \
       --module-name=trainer.task \
       --package-path=${PACKAGE_PATH} \
       --config=ml-packages/financialtimeseries-tf1.4/custom.yaml \
       -- \
       --train-files=${TRAIN_FILES} \
       --max-steps=5000 \
       --train-batch-size=500 \
       --eval-files=${VALID_FILES} \
       --eval-batch-size=500 \
       --learning-rate=0.01 \
       --layer-sizes-scale-factor=0.5 \
       --num-layers=3 \
       --job-dir=${MODEL_DIR}

## Hyper-parameters Tuning on Cloud ML Engine

In [None]:
%%bash

echo "Submitting a Cloud ML Engine job..."

REGION=${REGION}
BUCKET=${BUCKET}

MODEL_NAME="financialtimeseries"

PACKAGE_PATH=ml-packages/financialtimeseries-tf1.4/trainer
TRAIN_FILES=gs://${BUCKET}/data/financialtimeseries/big_data/train-*csv
VALID_FILES=gs://${BUCKET}/data/financialtimeseries/big_data/eval-*csv
MODEL_DIR=gs://${BUCKET}/models/${MODEL_NAME}_tune

CURRENT_DATE=`date +%Y%m%d_%H%M%S`
JOB_NAME=tune_${MODEL_NAME}_TUNE_${CURRENT_DATE}

gcloud ml-engine jobs submit training ${JOB_NAME} \
        --job-dir=${MODEL_DIR} \
        --runtime-version=1.4 \
        --region=${REGION} \
        --module-name=trainer.task \
        --package-path=${PACKAGE_PATH} \
        --config=ml-packages/financialtimeseries-tf1.4/hyperparams.yaml \
        -- \
        --train-files=${TRAIN_FILES} \
        --max-steps=5000 \
        --train-batch-size=1000 \
        --eval-files=${VALID_FILES} \
        --eval-batch-size=1000 \
        --job-dir=${MODEL_DIR}

## Deploy the Model

In [None]:
%%bash

REGION=${REGION}
BUCKET=${BUCKET}

MODEL_NAME="financialtimeseries"
MODEL_VERSION="v1"

MODEL_BINARIES=$(gsutil ls gs://${BUCKET}/models/${MODEL_NAME}/export/classifier | tail -1)


# delete model version
#gcloud ml-engine versions delete ${MODEL_VERSION} --model=${MODEL_NAME}

# delete model
#gcloud ml-engine models delete ${MODEL_NAME}

# deploy model to GCP
#gcloud ml-engine models create ${MODEL_NAME} --regions=${REGION}

#deploy model version
gcloud ml-engine versions create ${MODEL_VERSION} --model=${MODEL_NAME} --origin=${MODEL_BINARIES} --runtime-version=1.4

echo  ${MODEL_NAME} ${MODEL_VERSION} 

In [None]:
%%bash

MODEL_NAME="financialtimeseries"
MODEL_VERSION="v1"

# invoke deployed model to make prediction given new data instances
gcloud ml-engine predict --model=${MODEL_NAME} --version=${MODEL_VERSION} --json-instances=data/financialtimeseries/new-data.json

## Consume the Model as API

In [None]:
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials

def estimate(project, model_name, version, instances):

    credentials = GoogleCredentials.get_application_default()
    api = discovery.build('ml', 'v1', credentials=credentials,
                discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json')

    request_data = {'instances': instances}

    model_url = 'projects/{}/models/{}/versions/{}'.format(project, model_name, version)
    response = api.projects().predict(body=request_data, name=model_url).execute()
    
    estimates = [item[u'classes'] for item in response[u'predictions']]


    return estimates

In [None]:
MODEL_NAME='financialtimeseries'
MODEL_VERSION="v1"

instances =  [
      {
        'snp_m1' : -0.011381397443,
        'snp_m2' : -0.00318472489586,
        'nyse_m1' : -0.0122561131386,
        'nyse_m2' : -0.00311830464655,
        'djia_m1' : -0.00814669586107,
        'djia_m2' : -0.00327516443468,
        'nikkei_m0' : -0.0112286842277,
        'nikkei_m1' : -0.00618448662269,
        'hangseng_m0' : -0.00516025557739,
        'hangseng_m1' : -0.0172446189673,
        'ftse_m0' : -0.0096349683829,
        'ftse_m1' : -0.0023942318244,
        'dax_m0' : -0.00664421427352,
        'dax_m1' : -0.00410411743843,
        'aord_m0' : -0.00791849937675,
        'aord_m1' : -0.0071570640642
      },
      {
        'snp_m1' : -0.00262898268279,
        'snp_m2' : 0.00519789154606,
        'nyse_m1' : -0.00238490313864,
        'nyse_m2' : 0.00336377324257,
        'djia_m1' : -0.00193058871584,
        'djia_m2' : 0.00387061058697,
        'nikkei_m0' : -0.0162761526936,
        'nikkei_m1' : -0.00319560688,
        'hangseng_m0' : -0.0131692520561,
        'hangseng_m1' : -0.0143661769443,
        'ftse_m0' : -0.0189954213177,
        'ftse_m1' : -0.00367068315366,
        'dax_m0' : -0.021648106156,
        'dax_m1' : -0.00223370158645,
        'aord_m0' : 0.0131723464534,
        'aord_m1' : -0.0110883690885
      },
      {
        'snp_m1' : 0.00426055324048,
        'snp_m2' : 0.00782533125554,
        'nyse_m1' : 0.00591736960864,
        'nyse_m2' : 0.00691895179585,
        'djia_m1' : 0.00282309241802,
        'djia_m2' : 0.00827282886371,
        'nikkei_m0' : 0.00478911588037,
        'nikkei_m1' : 0.00989148467924,
        'hangseng_m0' : 0.00489156719444,
        'hangseng_m1' : 0.00980530371273,
        'ftse_m0' : 0.000596399488808,
        'ftse_m1' : 0.0114790189729,
        'dax_m0' : 0.000613251128797,
        'dax_m1' : 0.00780638261224,
        'aord_m0' : 0.00619442299894,
        'aord_m1' : 0.00614341726831
      }
  ]

estimates = estimate(
    instances=instances,
    project=PROJECT,
    model_name=MODEL_NAME,
    version=MODEL_VERSION)

print(estimates)

### the end ...