In [1]:
### importing packages

In [2]:
import json
from pathlib import Path, PurePath 
import glob

import betfairlightweight
from betfairlightweight import filters

import datetime

import pandas as pd
import numpy as np

from bz2 import BZ2File # To unzip the Betfair data from its downloaded format

from betfairlightweight import StreamListener
from betfairlightweight.streaming.stream import MarketStream

import re

In [3]:
### 'logging in'

In [4]:
project_dir = Path.cwd().parents[1]
logins_dir = project_dir / 'api_logins.json'

with open(logins_dir) as f:
    login_dict =  json.load(f)
    
trading = betfairlightweight.APIClient(username=login_dict['my_username'],
                                       password=login_dict['my_password'],
                                       app_key=login_dict['my_app_key'],
                                       certs=login_dict['certs_path'])

trading.login()

<LoginResource>

In [5]:
# returns list of 'data dictionaries'
data_dicts = trading.historic.get_my_data()

# calculate range of dates for advanced data
adv_range = [d['forDate'] for d in data_dicts if d['plan'] == 'Advanced Plan']

# find min date for adv_data
adv_min_date = datetime.datetime.strptime(min(adv_range), '%Y-%m-%dT%H:%M:%S')

# find max data for adv data
def last_day_of_month(any_day):
    next_month = any_day.replace(day=28) + datetime.timedelta(days=4)  
    return next_month - datetime.timedelta(days=next_month.day)

adv_max_temp = datetime.datetime.strptime(max(adv_range), '%Y-%m-%dT%H:%M:%S')
adv_max_date = last_day_of_month(adv_max_temp) 

In [6]:
# list files within advanced data range (GB Data)
adv_file_list = trading.historic.get_file_list(
    "Horse Racing",
    "Advanced Plan",
    from_day=adv_min_date.day,
    from_month=adv_min_date.day,
    from_year=adv_min_date.year,
    to_day=adv_max_date.day,
    to_month=adv_max_date.month,
    to_year=adv_max_date.year,
    market_types_collection=["WIN"],
    countries_collection=["GB"],
    file_type_collection=["M"],
)
print("No. items :", len(adv_file_list))

No. items : 1858


In [10]:
# where to store our advanced data
adv_dir = project_dir / 'data' / 'raw' / 'api' / 'advanced'

# check if files have been downloaded already
# adv_all_files = [Path(f).name for f in adv_file_list] # all files to download
adv_downloaded_files = [Path(f).name for f in adv_dir.glob("*.bz2")] # all files downloaded

# download files we dont have and writing uncompressed versions
for file in adv_file_list[1:10]: # remove index for all files
    if Path(file).name not in adv_downloaded_files: 
        print(file)
        download = trading.historic.download_file(file_path = file, store_directory = adv_dir)
        print(download)

/xds_nfs/edp_processed/ADVANCED/2020/Jan/1/29637009/1.166897833.bz2
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897833.bz2
/xds_nfs/edp_processed/ADVANCED/2020/Jan/1/29637009/1.166897838.bz2
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897838.bz2
/xds_nfs/edp_processed/ADVANCED/2020/Jan/1/29637009/1.166897843.bz2
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897843.bz2
/xds_nfs/edp_processed/ADVANCED/2020/Jan/1/29637009/1.166897848.bz2
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897848.bz2
/xds_nfs/edp_processed/ADVANCED/2020/Jan/1/29637009/1.166897853.bz2
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897853.bz2
/xds_nfs/edp_processed/ADVANCED/2020/Jan/1/29637009/1.166897858.bz2
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897

In [11]:
# proceesing bz2 to text
adv_extfile_dirs = []

for file in glob.glob(str(adv_dir)+'/*.bz2'): # change this?
    zipfile = BZ2File(file) # open the file
    data = zipfile.read() # get the decompressed data
    newfilepath = file.split('.bz2')[0] # removing the extension and saving without a filetype
    open(newfilepath, 'wb').write(data) # write an uncompressed file
    adv_extfile_dirs.append(newfilepath)
    zipfile.close()

In [12]:
datadict = {'Time': [],
            'MarketId' : [],
            'Status' : [],
            'Inplay' : [], 
            'SelectionId' : [],
            'LastPriceTraded' : [],
            'TotalMatched' : [],
            'BSP' : [],
            'AdjFactor' :  [],
            'RunnerStatus' : [],
            'MktTotalMatched' : [],
            'RaceInfo' : [],
            'Venue' : [],
            'BackSize': [],
            'BackPrice': [],
            'LayPrice' : [],
            'LaySize' : []
            }

In [13]:
class HistoricalStream(MarketStream):
    # create custom listener and stream

    def _init_(self, listener):
        super(HistoricalStream, self)._init_(listener)


    def on_process(self, market_books):
        for market_book in market_books:
            for runner in market_book.runners:
                datadict['Time'].append(market_book.publish_time)
                datadict['MarketId'].append(float(market_book.market_id))
                datadict['Status'].append(market_book.status)
                datadict['Inplay'].append(market_book.inplay)
                datadict['SelectionId'].append(runner.selection_id)
                datadict['LastPriceTraded'].append(runner.last_price_traded)
                datadict['TotalMatched'].append(runner.total_matched)
                datadict['BSP'].append(runner.sp.actual_sp)
                datadict['AdjFactor'].append(runner.adjustment_factor)
                datadict['RunnerStatus'].append(runner.status)
                datadict['MktTotalMatched'].append(market_book.total_matched)
                datadict['RaceInfo'].append(market_book.market_definition.name)
                datadict['Venue'].append(market_book.market_definition.venue)
                
                atb_size = [x.size for x in runner.ex.available_to_back]
                datadict['BackSize'].append(atb_size)
                atb_price = [x.price for x in runner.ex.available_to_back]
                datadict['BackPrice'].append(atb_price)   
                atl_price = [x.price for x in runner.ex.available_to_lay]
                datadict['LayPrice'].append(atl_price)
                atl_size = [x.size for x in runner.ex.available_to_lay]
                datadict['LaySize'].append(atl_size)

class HistoricalListener(StreamListener):
    def _add_stream(self, unique_id, stream_type):
        if stream_type == "marketSubscription":
            return HistoricalStream(self)

In [14]:
listener = HistoricalListener(max_latency=None)

for file in adv_extfile_dirs:
    stream = trading.streaming.create_historical_stream(directory=file, listener=listener)
    stream.start() 
    print(str(file) + " stream completed.")

/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166898709 stream completed.
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897833 stream completed.
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166898719 stream completed.
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897843 stream completed.
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897853 stream completed.
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897848 stream completed.
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166897858 stream completed.
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.166898714 stream completed.
/Users/tombardrick/Documents/projects/betfair/betfair_project/data/raw/api/advanced/1.16

In [15]:
def dict_to_df(datadict):
    
    df = pd.DataFrame(datadict)
    
    df.sort_values(by = 'Time')
    
    df['MarketId'] = df['MarketId'].astype(str)
    df['SelectionId'] = df['SelectionId'].astype(str)
    
    df['LayPrice'] = df['LayPrice'].apply(lambda x: x[0] if x else np.nan)
    df['LaySize'] = df['LaySize'].apply(lambda x: x[0] if x else np.nan)
    df['BackPrice'] = df['BackPrice'].apply(lambda x: x[0] if x else np.nan)
    df['BackSize'] = df['BackSize'].apply(lambda x: x[0] if x else np.nan)
    
    return df

df = dict_to_df(datadict)

In [16]:
def extract_furlongs(market_name):
    '''
    Assuming distance is always stated 1st within 'MarketName', with space followed after.
    Distance given in format of furlongs, miles or both.
    8 furlongs in a mile.
    '''
    
    distance = market_name.split(' ')[0]
    
    if 'm' in distance:
        m = distance.split('m')[0]
        distance = distance.replace(m + 'm', '')
        
        if 'f' in distance:
            f = distance.split('f')[0]
            
            return (int(m) * 8) + int(f)

        return int(m) * 8
    
    else:
        f = distance.split('f')[0]
        
        return int(f)
    
df['Distance'] = df['RaceInfo'].apply(lambda x: extract_furlongs(x))

In [17]:
def extract_race_type(market_name):
    if 'Hrd' in market_name:
        return 'Hurdle'
    if 'Chs' in market_name:
        return 'Chase'
    if 'NHF' in market_name:
        return 'NHF'
    else:
        return 'Flat'
    
df['RaceType'] = df['RaceInfo'].apply(lambda x: extract_race_type(x))

In [18]:
df['NoRunners'] = df.groupby('MarketId')['SelectionId'].transform('nunique')

In [19]:
# converting to datetime
df['Time'] = pd.to_datetime(df['Time'], format="%Y-%m-%d %H:%M:%S", errors='coerce')

# calculating inplay start for each race (assigning to new columns)
df['StartTime'] = df['Time'].where(df['Inplay'] == True).groupby(df['MarketId']).transform('min')

# calculating difference between each time point and start time
df['TimeDif'] = (df['Time'] - df['StartTime']).astype('timedelta64[s]')

# dropping starttime (Can be inferred by InPlay)
df = df.drop('StartTime', 1)


In [20]:
# keeping only OPEN & ACTIVE markets
df = df.loc[(df['Status'] == 'OPEN') & (df['RunnerStatus'] == 'ACTIVE')] 

# removing prices with size < 10
df = df.loc[(df['BackSize'] > 5) | (df['LaySize'] > 5)] 

# removing timpoints more than on hour before the race
df = df.loc[df['TimeDif'] > -3600]

In [21]:
T_pre = 60 # ~ every minute (3600 seconds pre race)
T_post = 30 # ~ every 1-11 seconds (60 - 580 seconds for race)

# creating time bins pre race
df['T_pre'] = df.where(df['Inplay'] == False).groupby('SelectionId')['TimeDif'].apply(lambda x: pd.qcut(x, T_pre, labels = [i for i in range(-T_pre, 0)])).astype(float)

# cretaing time bins suring race
df['T_post'] = df.where(df['Inplay'] == True).groupby('SelectionId')['TimeDif'].apply(lambda x: pd.qcut(x, T_post, labels = [i for i in range(0, T_post)])).astype(float)

df['T'] = df['T_pre'].fillna(df['T_post']).astype(int)

df.drop(columns = ['T_pre', 'T_post'], inplace = True)

In [72]:
# works on assumption that runners prices active in market for about the same time as market is...
# and if comparing prices between runners you can somewhat assume they are at the same time 
# IF the no. price points (inplay) for each runner ~same

In [22]:
df.head()

Unnamed: 0,Time,MarketId,Status,Inplay,SelectionId,LastPriceTraded,TotalMatched,BSP,AdjFactor,RunnerStatus,...,Venue,BackSize,BackPrice,LayPrice,LaySize,Distance,RaceType,NoRunners,TimeDif,T
63222,2020-01-01 11:30:32.091,1.166898709,OPEN,False,335830,7.4,1008.87,,21.11,ACTIVE,...,Musselburgh,13.7,7.4,7.6,27.89,15,Hurdle,10,-3599.0,-60
63223,2020-01-01 11:30:32.091,1.166898709,OPEN,False,22070336,32.0,218.09,,6.97,ACTIVE,...,Musselburgh,2.0,30.0,32.0,7.8,15,Hurdle,10,-3599.0,-60
63224,2020-01-01 11:30:32.091,1.166898709,OPEN,False,20393848,15.0,553.19,,5.56,ACTIVE,...,Musselburgh,10.71,14.0,15.0,1.03,15,Hurdle,10,-3599.0,-60
63225,2020-01-01 11:30:32.091,1.166898709,OPEN,False,20772411,14.5,433.61,,13.63,ACTIVE,...,Musselburgh,19.76,14.5,15.0,3.4,15,Hurdle,10,-3599.0,-60
63226,2020-01-01 11:30:32.091,1.166898709,OPEN,False,20225147,2.22,8178.81,,18.67,ACTIVE,...,Musselburgh,29.3,2.22,2.24,94.94,15,Hurdle,10,-3599.0,-60


In [23]:
agg1 = {'MarketId' : 'first',
       'Venue' : 'first',
       'Distance' : 'first',
       'RaceType' : 'first',
       'BSP' : 'first',
       'NoRunners' : 'first'
}

df1 = df.groupby(['SelectionId']).agg(agg1).reset_index()

In [24]:
dfs = []
cols = ['BackSize', 'BackPrice', 'LayPrice', 'LaySize']
cols_short = [re.sub('[^A-Z]', '', s) for s in cols]

df2 = df.groupby(['SelectionId', 'T'])[cols].mean().reset_index()

for col, col_short in zip(cols, cols_short):
    x = df2.groupby('SelectionId')[col].apply(list)
    y = pd.DataFrame(x.tolist(), index=x.index,
                 columns = [col_short +':T' + str(x) for x in range(-T_pre, 0)] + \
                           [col_short + ':T+' + str(x) for x in range(T_post)]).reset_index()
    dfs.append(y)
    
z = pd.concat(dfs, axis =1)
z = z.loc[:,~z.columns.duplicated()]

final_df = df1.merge(z, on = 'SelectionId')

In [25]:
final_df.head()

Unnamed: 0,SelectionId,MarketId,Venue,Distance,RaceType,BSP,NoRunners,BS:T-60,BS:T-59,BS:T-58,...,LS:T+20,LS:T+21,LS:T+22,LS:T+23,LS:T+24,LS:T+25,LS:T+26,LS:T+27,LS:T+28,LS:T+29
0,10050919,1.166897848,Southwell,8,Flat,4.92,9,13.378936,5.166596,13.431522,...,4.7,3.156667,3.936667,1.77,1.446667,1.886667,2.715,1.55,1.9375,1.98
1,10444230,1.166898719,Musselburgh,20,Chase,34.0,14,16.571667,14.78,14.78,...,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2
2,1065462,1.166897853,Southwell,11,Flat,3.13,7,33.231818,5.575116,8.033488,...,10.07,19.035,10.8325,21.29,7.1125,5.056,2.3825,6.55,2.3425,2.41
3,11106437,1.166897833,Southwell,5,Flat,14.55,3,3.504,5.6115,8.285,...,3.915,2.72,1.06,1.69,1.69,1.69,1.69,1.69,1.69,
4,11117734,1.166897848,Southwell,8,Flat,4.7,9,32.919796,9.33875,41.04875,...,2.31,2.445,1.86,1.52,1.66,1.186667,1.056667,1.0,1.0,1.0


'BP:T-56'

In [None]:
# use multiindexing for columns for time points / prices/ sizes in future?
# move classes, functions and datadict to data_utils.py file -
# commenting thorughout (e.g. bap - base, time assumptions etc)
# move to src/data/historic