# Load Data from Deutsche Börse Public Dataset into S3 Bucket

This notebook is a modified version from https://github.com/Originate/dbg-pds-tensorflow-demo/blob/master/notebooks/02-load-multiple-days-and-prepare-ds.ipynb (MIT License)

## Obtaining Data

We obtain data for multiple days from the AWS PDS S3 bucket and output a single dataset for intraday price data.

https://registry.opendata.aws/deutsche-boerse-pds/

### Output dataset 

- Contains the top 100 stocks by trade volume for the days from 2020-01-01 to 2020-03-31 excluding days with no
trades. 
- We have filled in missing trades to have volume 0 and missing prices by forward filling.
- The data is saved into the specified S3 bucket as CSV.

```
hist_data_intraday/db-pds-2020-q1.csv (columns: sym,open,high,low,close,vol)
```

In [None]:
# get S3 bucket
s3bucket=!(aws s3 ls | grep algotrading- | awk  '{print $3}')
s3bucket=s3bucket[0]
s3bucket

In [None]:
import pandas as pd
import os.path

enable_assert = False 

# Edit the start/end date and the output folders
from_date = '2020-01-01'
until_date = '2020-03-31'

local_data_folder = 'db-pds/input' # do not end in /
output_folder = 'db-pds/output' # do not end in /

download_script = 'db-pds/input/download.sh'

dates = list(pd.date_range(from_date, until_date, freq='D').strftime('%Y-%m-%d'))

! mkdir -p {local_data_folder}

# We found it was more reliable to generate a bash script and run it, rather than
# run the commands in a python for-loop

with open(download_script, 'w') as f:
    f.write("#!/bin/bash\n")
    f.write("\nset -euo pipefail\n")
    f.write("\n# This script was generated to download data for multiple days\n")
    for date in dates:
        success_file =  os.path.join(local_data_folder, date, 'success')

        f.write("""
if [ ! -f {success_file} ]; then

    echo "Getting PDS dataset for date {date}"        
    mkdir -p {local_data_folder}/{date}
    aws s3 sync s3://deutsche-boerse-xetra-pds/{date} {local_data_folder}/{date} --no-sign-request
    touch {success_file}            
else
    echo "PDS dataset for date {date} already exists"
fi\n""".format(success_file=success_file, date=date, local_data_folder=local_data_folder))

        
! chmod +x {download_script}     
! head -n 15 {download_script} 

In [None]:
!  {download_script}

## 2. Data Preprocessing

We need to ensure we have a data frame of 'common stock' in a suitable form. We take care to filter out any data outside of trading hours also to ensure consistency.

In [None]:
import pandas as pd
import numpy as np
import glob, os
from datetime import datetime
import statsmodels.api as sm

import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
mpl.rcParams['figure.figsize'] = (15, 10) # use bigger graphs

In [None]:
def load_csv_dirs(data_dirs):
    files = []
    for data_dir in data_dirs:
        files.extend(glob.glob(os.path.join(data_dir, '*.csv')))
    return pd.concat(map(pd.read_csv, files))

data_dir = local_data_folder + '/'
data_subdirs = map(lambda date: data_dir + date, dates)
unprocessed_df = load_csv_dirs(data_subdirs)
unprocessed_df.head(2)

In [None]:
unprocessed_df.count()

In [None]:
# we want the dates to be comparable to datetime.strptime()
unprocessed_df["CalcTime"] = pd.to_datetime("1900-01-01 " + unprocessed_df["Time"])

unprocessed_df["CalcDateTime"] = pd.to_datetime(unprocessed_df["Date"] + " " + unprocessed_df["Time"])
unprocessed_df.head()

In [None]:
test1 = unprocessed_df[(unprocessed_df.Mnemonic == 'BMW') &
                 (unprocessed_df.Date == '2020-01-02') &
                 (unprocessed_df.Time == '09:01')]
test1

In [None]:
test2 = unprocessed_df[(unprocessed_df.Mnemonic == 'BMW') &
                 (unprocessed_df.Date == '2020-01-02') &
                 (unprocessed_df.Time == '09:02')]
test2

In [None]:
# Filter common stock
# Filter between trading hours 08:00 and 20:00
# Exclude auctions (those are with TradeVolume == 0)
only_common_stock = unprocessed_df[unprocessed_df.SecurityType == 'Common stock']
time_fmt = "%H:%M"
opening_hours_str = "08:00"
closing_hours_str = "20:00"
opening_hours = datetime.strptime(opening_hours_str, time_fmt)
closing_hours = datetime.strptime(closing_hours_str, time_fmt)

cleaned_common_stock = only_common_stock[(only_common_stock.TradedVolume > 0) & \
                  (only_common_stock.CalcTime >= opening_hours) & \
                  (only_common_stock.CalcTime <= closing_hours)]
cleaned_common_stock.head(2)

In [None]:
bymnemonic = cleaned_common_stock[['Mnemonic', 'TradedVolume']].groupby(['Mnemonic']).sum()
number_of_stocks = 100
top = bymnemonic.sort_values(['TradedVolume'], ascending=[0]).head(number_of_stocks)
top.head(10)

In [None]:
top_k_stocks = list(top.index.values)
cleaned_common_stock = cleaned_common_stock[cleaned_common_stock.Mnemonic.isin(top_k_stocks)]
cleaned_common_stock.head()

In some notebooks we use a subset of this data, and therefore we export it here

In [None]:
sorted_by_index = cleaned_common_stock.set_index(['Mnemonic', 'CalcDateTime']).sort_index()
sorted_by_index.head()

In [None]:
non_empty_days = sorted(list(cleaned_common_stock['Date'].unique()))
len(non_empty_days), non_empty_days[0:2], '...', non_empty_days[-3:-1]

In [None]:
import datetime
def build_index(non_empty_days, from_time, to_time):
    date_ranges = []
    for date in non_empty_days:
        yyyy, mm, dd = date.split('-')
        from_hour, from_min = from_time.split(':')
        to_hour, to_min = to_time.split(':')    
        t1 = datetime.datetime(int(yyyy), int(mm), int(dd), int(from_hour),int(from_min),0)
        t2 = datetime.datetime(int(yyyy), int(mm), int(dd), int(to_hour),int(to_min),0) 
        date_ranges.append(pd.DataFrame({"OrganizedDateTime": pd.date_range(t1, t2, freq='1Min').values}))
    agg = pd.concat(date_ranges, axis=0) 
    agg.index = agg["OrganizedDateTime"]
    return agg
new_datetime_index = build_index(non_empty_days, opening_hours_str, closing_hours_str)["OrganizedDateTime"].values
new_datetime_index

In [None]:
def basic_stock_features(input_df, mnemonic, new_time_index):
    stock = sorted_by_index.loc[mnemonic].copy()

    stock['HasTrade'] = 1.0
    
    stock = stock.reindex(new_time_index)
    
    features = ['MinPrice', 'MaxPrice', 'EndPrice', 'StartPrice']
    for f in features:
        stock[f] = stock[f].fillna(method='ffill')   
    
    features = ['HasTrade', 'TradedVolume', 'NumberOfTrades']
    for f in features:
        stock[f] = stock[f].fillna(0.0)
    
    stock['Mnemonic'] = mnemonic
    selected_features = ['Mnemonic', 'MinPrice', 'MaxPrice', 'StartPrice', 'EndPrice', 'HasTrade', 'TradedVolume', 'NumberOfTrades']
    return stock[selected_features]


bmw = basic_stock_features(sorted_by_index, 'BMW', new_datetime_index)
bmw[['TradedVolume']].plot()

In [None]:
bmw.head()

In [None]:
stocks = []
for stock in top_k_stocks:
    stock = basic_stock_features(sorted_by_index, stock, new_datetime_index)
    stocks.append(stock)
# prepared should contain the numeric features for all top k stocks,
# for all days in the interval, for which there were trades (that means excluding weekends and holidays)
# for all minutes from 08:00 until 20:00
# in minutes without trades the prices from the last available minute are carried forward
# trades are filled with zero for such minutes
# a new column called HasTrade is introduced to denote the presence of trades
prepared = pd.concat(stocks, axis=0)

In [None]:
prepared['Mnemonic'].unique(), prepared['Mnemonic'].unique().shape[0]

In [None]:
del prepared["NumberOfTrades"]
del prepared["HasTrade"]
prepared.rename(inplace=True,columns={'Mnemonic':'sym', 'StartPrice': 'open', 'EndPrice': 'close', 'TradedVolume': 'vol', 'MinPrice': 'low', 'MaxPrice':'high'})
prepared.index=prepared.index.rename('dt')
prepared = prepared[['sym', 'open', 'high', 'low', 'close','vol']]
prepared.head()

In [None]:
! mkdir -p {output_folder}
prepared.to_csv(output_folder + '/db-pds-2020-q1.csv')

In [None]:
! aws s3 cp {output_folder}/db-pds-2020-q1.csv s3://{s3bucket}/hist_data_intraday/

In [None]:
! echo "Last run on $(date)"