# Training a model

---

We have now formalized our feature engineering logic and have a good idea of the optimal hyperparameters for the XGBoost algorithm.  During hyperparameter tuning we were still using a subset of the data in order to speed up our training times.  Now its time to train on the entire data set and produce a working model.  The steps below are an execution of the feature engineering logic on a year's worth of data.  This takes a long time to process and I would recommend running `aws s3 sync` on `s3://jasbarto-forecast-lab/train` to skip having to run this yourself.

After you have the data set on the local notebook instance upload it to S3 for training on Amazon SageMaker.

## Prepare the entire data set for training

In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import glob
from datetime import datetime
from time import gmtime, strftime
import boto3

**S3 bucket for training data**

The next cell defines a variable used to store training and validation data for hyperparameter optimization and training.  Specify a bucket name below and the S3 bucket will be created on your behalf.

In [None]:
YOUR_BUCKET_NAME = ' < YOUR S3 BUCKET NAME > '
YOUR_JOB_NAME = 'xgboost-forecast-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

In [None]:
def read_s3_csv (dates):
    s3 = boto3.resource('s3')
    deutsche_boerse_bucket = 'deutsche-boerse-xetra-pds'
    
    bucket = s3.Bucket(deutsche_boerse_bucket)
    
    dataframes = []
    
    for date in dates:
        objs_count = 0
        csv_objects = bucket.objects.filter(Prefix=date)
        for csv_obj in csv_objects:
            csv_key = csv_obj.key
            if csv_key[-4:] == '.csv':
                objs_count += 1
                csv_body = csv_obj.get()['Body']
                df = pd.read_csv(csv_body)
                dataframes.append(df)
        
        print ("Loaded {} data objects for {}".format (objs_count, date))
    return pd.concat(dataframes)

In [None]:
def build_index(non_empty_days, from_time, to_time):
    date_ranges = []
    for date in non_empty_days:
        yyyy, mm, dd = date.split('-')
        from_hour, from_min = from_time.split(':')
        to_hour, to_min = to_time.split(':')    
        t1 = datetime(int(yyyy), int(mm), int(dd), int(from_hour),int(from_min),0)
        t2 = datetime(int(yyyy), int(mm), int(dd), int(to_hour),int(to_min),0) 
        date_ranges.append(pd.DataFrame({"OrganizedDateTime": pd.date_range(t1, t2, freq='1Min').values}))
    agg = pd.concat(date_ranges, axis=0) 
    agg.index = agg["OrganizedDateTime"]
    return agg

In [None]:
def basic_stock_features(input_df, mnemonic, new_time_index, inplace=False):
    stock = input_df.loc[mnemonic]
    if not inplace:
        stock = input_df.loc[mnemonic].copy()
    
    stock = stock.reindex(new_time_index)
    
    features = ['MinPrice', 'MaxPrice', 'EndPrice', 'StartPrice']
    for f in features:
        stock[f] = stock[f].fillna(method='ffill')   
    
    features = ['TradedVolume', 'NumberOfTrades']
    for f in features:
        stock[f] = stock[f].fillna(0.0)
        
    stock['HourOfDay'] = stock.index.hour
    stock['MinOfHour'] = stock.index.minute
    stock['MinOfDay'] = stock.index.hour*60 + stock.index.minute

    stock['DayOfWeek'] = stock.index.dayofweek
    stock['DayOfYear'] = stock.index.dayofyear
    stock['MonthOfYear'] = stock.index.month
    stock['WeekOfYear'] = stock.index.weekofyear
    
    stock['Mnemonic'] = mnemonic
    unwanted_features = ['ISIN', 'SecurityDesc', 'SecurityType', 'Currency', 'SecurityID', 'Date', 'Time', 'CalcTime']
    return stock.drop (unwanted_features, axis=1)

In [None]:
import sys
error_df = None
def clean_data (df, inplace = False):
    global error_df
    column_filter = ['ISIN', 'Mnemonic', 'SecurityDesc', 'SecurityType', 'Currency', 'SecurityID', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume', 'NumberOfTrades']
    n_df = df[column_filter]
    if not inplace:
        n_df = df.copy ()
        
    n_df.drop (n_df.Time == 'Time', inplace = True) # some records have headers for values, remove them
    # we want the dates to be comparable to datetime.strptime()
    try:
        n_df["CalcTime"] = pd.to_datetime("1900-01-01 " + n_df["Time"], errors='coerce')
        n_df["CalcDateTime"] = pd.to_datetime(n_df["Date"] + " " + n_df["Time"], errors='coerce')
    except:
        print ("Error processing date / time fields in dataframe:")
        print (sys.exc_info ())
        tb = sys.exc_info()[2]
        print (n_df.sample(10))
        error_df = n_df
        return pd.DataFrame ()
        

    # Filter common stock
    # Filter between trading hours 08:00 and 20:00
    # Exclude auctions (those are with TradeVolume == 0)
    only_common_stock = n_df[n_df.SecurityType == 'Common stock']
    time_fmt = "%H:%M"
    opening_hours_str = "08:00"
    closing_hours_str = "20:00"
    opening_hours = datetime.strptime(opening_hours_str, time_fmt)
    closing_hours = datetime.strptime(closing_hours_str, time_fmt)

    cleaned_common_stock = only_common_stock[(only_common_stock.TradedVolume > 0) & \
                      (only_common_stock.CalcTime >= opening_hours) & \
                      (only_common_stock.CalcTime <= closing_hours)]
    
    bymnemonic = cleaned_common_stock[['Mnemonic', 'TradedVolume']].groupby(['Mnemonic']).sum()
    number_of_stocks = 100
    top = bymnemonic.sort_values(['TradedVolume'], ascending=[0]).head(number_of_stocks)
    top_k_stocks = list(top.index.values)
    cleaned_common_stock = cleaned_common_stock[cleaned_common_stock.Mnemonic.isin(top_k_stocks)]
    sorted_by_index = cleaned_common_stock.set_index(['Mnemonic', 'CalcDateTime']).sort_index()
    non_empty_days = sorted(list(cleaned_common_stock['Date'].unique()))
    new_datetime_index = build_index(non_empty_days, opening_hours_str, closing_hours_str)["OrganizedDateTime"].values
    
    stocks = []
    for stock in top_k_stocks:
        stock = basic_stock_features(sorted_by_index, stock, new_datetime_index, inplace=True)
        stocks.append(stock)
    # prepared should contain the numeric features for all top k stocks,
    # for all days in the interval, for which there were trades (that means excluding weekends and holidays)
    # for all minutes from 08:00 until 20:00
    # in minutes without trades the prices from the last available minute are carried forward
    # trades are filled with zero for such minutes
    # a new column called HasTrade is introduced to denote the presence of trades
    prepared = pd.concat(stocks, axis=0)
    prepared = prepared.dropna(how='any')
    prepared.Mnemonic = prepared.Mnemonic.astype('category')
    return prepared

In [None]:
def create_xgb_target (df):
    return df.MaxPrice.shift(-1).fillna (method='ffill')

In [None]:
def create_xgb_features (df, horizon, inplace = False):
    n_df = df
    if not inplace:
        n_df = df.copy ()
    
    for offset in range(1, horizon+1):
        min_price = n_df['MinPrice'].shift (offset).fillna(method='bfill')
        max_price = n_df['MaxPrice'].shift (offset).fillna(method='bfill')
        start_price = n_df['StartPrice'].shift (offset).fillna(method='bfill')
        end_price = n_df['EndPrice'].shift (offset).fillna(method='bfill')
        trade_vol = n_df['TradedVolume'].shift (offset).fillna(method='bfill')
        num_trades = n_df['NumberOfTrades'].shift (offset).fillna(method='bfill')
        
        n_df["h{}_MinPrice".format (offset)] = min_price
        n_df["h{}_MaxPrice".format (offset)] = max_price
        n_df["h{}_StartPrice".format (offset)] = start_price
        n_df["h{}_EndPrice".format (offset)] = end_price
        n_df["h{}_TradeVolume".format (offset)] = trade_vol
        n_df["h{}_NumberOfTrades".format (offset)] = num_trades
        
    return n_df

In [None]:
def engineer_date_range (dates):
    unprocessed_df = read_s3_csv (dates)
    print ("Loaded CSV data set")
    
    cleaned_df = clean_data (unprocessed_df, inplace = True)
    print ("Cleaned CSV data set")
    
    
    xgb_data = create_xgb_features (cleaned_df, 5, inplace=True)
    xgb_data['NextMaxPrice'] = create_xgb_target (xgb_data)
    print ("Engineered CSV data set")
    
    train_data, validate_data = train_test_split (xgb_data, train_size=0.8, test_size=0.2, shuffle=True)

    cols = list(train_data.columns.values)
    cols.remove ('NextMaxPrice')
    cols = ['NextMaxPrice'] + cols

    train_data = pd.get_dummies (train_data[cols])
    validate_data = pd.get_dummies (validate_data[cols])
    print ("Data split for training purposes")
    
    return (train_data, validate_data)

In [None]:
train_data_folder = 'data/train'
train_output_folder = train_data_folder +'/train'
validate_output_folder = train_data_folder +'/validate'
! mkdir -p {train_output_folder}
! mkdir -p {validate_output_folder}

# Earliest possible date is 2017-06-17
from_date = '2018-01-01'
until_date = '2018-03-30'
dates = list(pd.date_range(from_date, until_date, freq='D').strftime('%Y-%m-%d'))

print ("Reading data for dates {} to {}".format (from_date, until_date))
train_df, validate_df = engineer_date_range (dates)

In [None]:
print ("Writing CSV data for dates {} to {}".format (from_date, until_date))

chunk_size = 100000
print ("Writing {} training records".format (train_df.shape[0]))
for chunk_index in range (0, train_df.shape[0], chunk_size):
    train_df[chunk_index:chunk_index + chunk_size].to_csv(train_output_folder + '/{}-{}_{}-{}.csv'.format (from_date, until_date, chunk_index, chunk_index + chunk_size), header=False, index=False)
    print ("Wrote training records {} to {}".format (chunk_index, chunk_index + chunk_size))

print ("Writing {} validation records".format (validate_df.shape[0]))
for chunk_index in range (0, validate_df.shape[0], chunk_size):
    validate_df[chunk_index:chunk_index + chunk_size].to_csv(validate_output_folder + '/{}-{}_{}-{}.csv'.format (from_date, until_date, chunk_index, chunk_index + chunk_size), header=False, index=False)
    print ("Wrote validation records {} to {}".format (chunk_index, chunk_index + chunk_size))
print ("Export to CSV complete")

## OR...

Run the following sync command to skip having to engineer your own feature set.  After executing the cell below execute the next sync command to upload the data to your own S3 bucket.  If you're savvy with your S3 commands you could also just sync from the source folder to the target folder, cutting out the notebook middleman.

In [None]:
! aws s3 sync 's3://jasbarto-forecast-lab/training/' {train_data_folder}

---

## Stage data to S3 for training

Upload the year's worth of training data to S3 in preparation for training.

In [None]:
prefix = 'training'
aws_s3_training_uri = 's3://{}/{}/'.format (YOUR_BUCKET_NAME, prefix)

! aws s3 sync data/train {aws_s3_training_uri}

## Create a SageMaker training job

The code below will programmatically create a training job for the SageMaker XGBoost algorithm.  Alternatively you can create the training job manually via the AWS web console.

### To train via the console:

Submit your data to the XGBoost algorithm via the Amazon SageMaker web console.  Select `Training Jobs` and click the `Create training job` button.

Give the training job a name such as 'xgboost-stock-forecast' or similar.

For `Algorithm` select `XGBoost` from the drop down.

Accept the defaults for the remainder of the fields and move down to `Hyperparameters`.  Set the values for the Hyperparameters as determined by the earlier hyperparameter tuning job, accepting the default values for all other parameters.

For Input data configure two channels.  The first should be given the name 'train' and have the following settings:
- `Content-type` -> 'csv'
- `Compression type` -> None
- `Record wrapper` -> None
- `S3 data type` -> S3Prefix
- `S3 data distribution type` -> Fully replicated
- `S3 location` -> 's3:// < your-bucket-name > / < your-model-prefix > /train'

For the second channel give it a name of 'validation' and set its parameters the same as the 'train' channel.  Give the 'validation' channel a different `S3 location` however, setting it to 's3:// < your-bucket-name > / < your-model-prefix > /validation'

For the Output data configuration set the `S3 output path` to 's3:// < your-bucket-name > / < your-model-prefix > /output'.

### To use the API:

In [None]:
%%time

from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri
import time

role = get_execution_role()

container = get_image_uri(boto3.Session().region_name, 'xgboost')

job_name = YOUR_JOB_NAME
print("Training job", job_name)

bucket_path = 's3://{}'.format (YOUR_BUCKET_NAME)

#Ensure that the training and validation data folders generated above are reflected in the "InputDataConfig" parameter below.
#Ensure the hyperparameter settings are in line with the parameters selected by the tuning job
create_training_params = \
{
    "AlgorithmSpecification": {
        "TrainingImage": container,
        "TrainingInputMode": "File"
    },
    "RoleArn": role,
    "OutputDataConfig": {
        "S3OutputPath": bucket_path + "/" + prefix + "/output"
    },
    "ResourceConfig": {
        "InstanceCount": 1,
        "InstanceType": "ml.m4.4xlarge",
        "VolumeSizeInGB": 20
    },
    "TrainingJobName": job_name,
    "HyperParameters": {
        "max_depth":"5",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.7",
        "silent":"0",
        "objective":"reg:linear",
        "num_round":"50"
    },
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 3600
    },
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": bucket_path + "/" + prefix + '/train',
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "csv",
            "CompressionType": "None"
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": bucket_path + "/" + prefix + '/validate',
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "csv",
            "CompressionType": "None"
        }
    ]
}

client = boto3.client('sagemaker')
client.create_training_job(**create_training_params)

status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print(status)
while status !='Completed' and status!='Failed':
    time.sleep(60)
    status_resp = client.describe_training_job(TrainingJobName=job_name)
    status = status_resp['TrainingJobStatus']
    print(status)
print ("Model exported to {}".format (status_resp['ModelArtifacts']['S3ModelArtifacts']))

## Explore the model output

Let's download and explore the trained model that has been exported to Amazon S3.  The tarball is registed as an attribute of the completed training job.  We can retrieve it, download the tarball and untar it.

In [None]:
import boto3
client = boto3.client('sagemaker')

job_name = YOUR_JOB_NAME
job_desc = client.describe_training_job(TrainingJobName=job_name)
status = job_desc['TrainingJobStatus']
print(status)
model_artifact = job_desc['ModelArtifacts']['S3ModelArtifacts']
print ("Model exported to {}".format (job_desc['ModelArtifacts']['S3ModelArtifacts']))

!aws s3 cp {model_artifact} .

In [None]:
## Now untar the exported model and explore its contents
## You can do this using '! cmd' notation in the notebook or open a Terminal window from the Jupyter interface
