# Beta computed in parallel
* First denormalize and fill seconds for each time id and stock
* Compute beta and beta2 values over time ids for wap1 and wap2
* Compute same values for first 300secs of time ids (no leakage)

In [None]:
import os   
import glob
import json
import numpy as np
import pandas as pd
import multiprocessing
from joblib import Parallel, delayed

pd.options.display.max_rows = 300

In [None]:
DENORM = pd.read_csv("denormalize_multipliers.csv")
BETA = pd.read_csv("all_times.csv")

def fill_seconds(df):
    df = df.reset_index(drop=True)
    index_range = pd.Index(range(600), name='seconds_in_bucket')
    df = df.set_index('seconds_in_bucket').reindex(index_range)
    
    # Forward fill & back fill seconds
    df = df.ffill().reset_index()
    return df.bfill().iloc[:600]

def denormalize(df):
    global DENORM

    df = df.merge(DENORM, how="left")
    df['ask_price1'] *= df['multiplier']
    df['ask_price2'] *= df['multiplier']
    df['bid_price1'] *= df['multiplier']
    df['bid_price2'] *= df['multiplier']
    
    return df.drop(['multiplier'], axis=1)

def extract_waps(df):
    df["wap"] = (df["bid_price1"] * df["ask_size1"] + df["ask_price1"] * df["bid_size1"]) \
                / (df["bid_size1"] + df["ask_size1"])
    df["wap2"] = (df["bid_price2"] * df["ask_size2"] + df["ask_price2"] * df["bid_size2"]) \
                / (df["bid_size2"] + df["ask_size2"])
    return df

### Parallelize for All Stocks

In [None]:
def denorm_fill(sid, df):
    file = f"individual_book_train/stock_{sid}.csv" 
    df = pd.concat([df, denormalize(pd.read_csv(file))])
    df = df.groupby("time_id").apply(fill_seconds).reset_index(drop=True).astype({'time_id': int, 'stock_id': int})
    df = extract_waps(df)
    return df

def denorm_fill_stocks():
    ret = pd.DataFrame()
    ret = Parallel(n_jobs=-1, verbose=1) (
        delayed(denorm_fill)(sid, ret) for sid in pd.read_csv("all_times.csv").stock_id.unique()
    )  
    ret = pd.concat(ret, ignore_index=True)
    return ret

beta = denorm_fill_stocks()

In [None]:
beta

In [None]:
beta.memory_usage()

In [None]:
beta.memory_usage()

In [None]:
beta['seconds_in_bucket'] = beta['seconds_in_bucket'].astype('int32')

In [None]:
beta[['bid_price1','bid_price2','ask_price1','ask_price2','wap','wap2'
    ]] = beta[['bid_price1','bid_price2','ask_price1','ask_price2','wap','wap2']].astype('float32')

In [None]:
beta[['bid_size1','bid_size2','ask_size1','ask_size2'
    ]] = beta[['bid_size1','bid_size2','ask_size1','ask_size2']].astype('int32')

In [None]:
beta['seconds_in_bucket'] = beta['seconds_in_bucket'].astype('int32')

In [None]:
beta.memory_usage()

In [None]:
beta.to_feather("everything.fth")

In [None]:
print(f"{beta.memory_usage().sum() * 1e-9:.2f}GB used in total")

## Start Here! 
* Read and compute beta

In [3]:
import os   
import glob
import json
import numpy as np
import pandas as pd
import multiprocessing
from joblib import Parallel, delayed

pd.options.display.max_rows = 300

df = pd.read_feather("everything.fth")
df

Unnamed: 0,seconds_in_bucket,time_id,bid_price1,ask_price1,bid_price2,ask_price2,bid_size1,ask_size1,bid_size2,ask_size2,stock_id,wap,wap2
0,0,5,194.074066,194.244431,194.064026,194.254471,3,226,2,100,0,194.076309,194.067764
1,1,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,194.079025,194.067764
2,2,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,194.079025,194.067764
3,3,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,194.079025,194.067764
4,4,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,194.079025,194.067764
...,...,...,...,...,...,...,...,...,...,...,...,...,...
257359195,595,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,194.826736,194.772461
257359196,596,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,194.826736,194.772461
257359197,597,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,194.826736,194.772461
257359198,598,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,194.826736,194.772461


In [4]:
def compute_beta(df, mid=False):
    if mid:
        c = (((df['wap'] - df['wap_mean-300']) * (df['market_mean_sec-300'] - df['market_mean-300'])) / len(df)).sum()
        v = (((df['market_mean_sec-300'] - df['market_mean-300']) ** 2) / len(df)).sum()
        df['beta-300'] = (c / v).astype('float32')
        return df
        
    c = (((df['wap'] - df['wap_mean']) * (df['market_mean_sec'] - df['market_mean'])) / len(df)).sum()
    v = (((df['market_mean_sec'] - df['market_mean']) ** 2) / len(df)).sum()
    df['beta'] = (c / v).astype('float32')
    
    return df

def compute_beta2(df, mid=False):
    if mid:
        c = (((df['wap2'] - df['wap_mean2-300']) * (df['market_mean_sec2-300'] - df['market_mean2-300'])) / len(df)).sum()
        v = (((df['market_mean_sec2-300'] - df['market_mean2-300']) ** 2) / len(df)).sum()
        df['beta2-300'] = (c / v).astype('float32')
        return df
    
    c = (((df['wap2'] - df['wap_mean2']) * (df['market_mean_sec2'] - df['market_mean2'])) / len(df)).sum()
    v = (((df['market_mean_sec2'] - df['market_mean2']) ** 2) / len(df)).sum()
    df['beta2'] = (c / v).astype('float32')
    return df

def extract_beta(data):   
    df = data.reset_index(drop=True)
    df['wap_mean'] = df.groupby(['stock_id'])['wap'].transform(np.mean).astype('float32')
    df['wap_mean2'] = df.groupby(['stock_id'])['wap2'].transform(np.mean).astype('float32')
    df['market_mean_sec'] = df.groupby(['seconds_in_bucket'])['wap'].transform(np.mean).astype('float32')
    df['market_mean_sec2'] = df.groupby(['seconds_in_bucket'])['wap2'].transform(np.mean).astype('float32')
    df['market_mean'] = df['wap'].mean()
    df['market_mean2'] = df['wap2'].mean()
    df = df.reset_index(drop=True).astype({'market_mean':'float32','market_mean2':'float32'})
    df = df.groupby(['stock_id']).apply(compute_beta)
    df = df.drop(['wap_mean','market_mean_sec','market_mean'], axis=1)
    df = df.groupby(['stock_id']).apply(compute_beta2)
    df = df.drop(['wap_mean2','market_mean_sec2','market_mean2'], axis=1)
    
    df2 = df.query("seconds_in_bucket < 300").copy()
    df2['wap_mean-300'] = df2.groupby(['stock_id'])['wap'].transform(np.mean).astype('float32')
    df2['wap_mean2-300'] = df2.groupby(['stock_id'])['wap2'].transform(np.mean).astype('float32')
    df2['market_mean_sec-300'] = df2.groupby(['seconds_in_bucket'])['wap'].transform(np.mean).astype('float32')
    df2['market_mean_sec2-300'] = df2.groupby(['seconds_in_bucket'])['wap2'].transform(np.mean).astype('float32')
    df2['market_mean-300'] = df2['wap'].mean()
    df2['market_mean2-300'] = df2['wap2'].mean()
    df2 = df2.reset_index(drop=True).astype({'market_mean-300':'float32','market_mean2-300':'float32'})
    df2 = df2.groupby(['stock_id']).apply(compute_beta, True)
    df2 = df2.drop(['wap_mean-300','market_mean_sec-300','market_mean-300'],axis=1)
    df2 = df2.groupby(['stock_id']).apply(compute_beta2, True)
    df2 = df2.drop(['wap_mean2-300','market_mean_sec2-300','market_mean2-300'],axis=1)

    result = pd.merge(df, df2, how="left")
    result = result.drop(["wap", "wap2"],axis=1)
    result = result.ffill().reset_index()
    result = result.reset_index(drop=True)
    return result

def parallel_beta(df, tids, f="beta/beta_parallel_1000.fth"):
    ret = pd.DataFrame()
    ret = Parallel(n_jobs=-1, verbose=1)(
        delayed(extract_beta)(df[df['time_id']==tid]) for tid in tids
    )
    del df
    pd.concat(ret, axis=0, ignore_index=True).reset_index(drop=True).to_feather(f)


parallel_beta(df, df.time_id.unique()[:1000])

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done  18 tasks      | elapsed:   13.7s
[Parallel(n_jobs=-1)]: Done 168 tasks      | elapsed:   41.7s
[Parallel(n_jobs=-1)]: Done 418 tasks      | elapsed:  1.5min
[Parallel(n_jobs=-1)]: Done 768 tasks      | elapsed:  2.5min
[Parallel(n_jobs=-1)]: Done 1000 out of 1000 | elapsed:  3.2min finished


In [5]:
parallel_beta(df, df.time_id.unique()[1000:2000], f="beta/beta_parallel_2000.fth")
parallel_beta(df, df.time_id.unique()[2000:3000], f="beta/beta_parallel_3000.fth")
parallel_beta(df, df.time_id.unique()[3000:], f="beta/beta_parallel_3830.fth")

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done  18 tasks      | elapsed:   10.5s
[Parallel(n_jobs=-1)]: Done 168 tasks      | elapsed:   37.3s
[Parallel(n_jobs=-1)]: Done 418 tasks      | elapsed:  1.4min
[Parallel(n_jobs=-1)]: Done 768 tasks      | elapsed:  2.5min
[Parallel(n_jobs=-1)]: Done 1000 out of 1000 | elapsed:  3.2min finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done  18 tasks      | elapsed:    9.6s
[Parallel(n_jobs=-1)]: Done 168 tasks      | elapsed:   37.7s
[Parallel(n_jobs=-1)]: Done 418 tasks      | elapsed:  1.5min
[Parallel(n_jobs=-1)]: Done 768 tasks      | elapsed:  2.5min
[Parallel(n_jobs=-1)]: Done 1000 out of 1000 | elapsed:  3.2min finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done  18 tasks      | elapsed:    9.6s
[Parallel(n_jobs=-1)]: Done 168 tasks      | elapsed:   35.4s
[Pa

### Merge data from files 

In [2]:
import pandas as pd

df = pd.read_feather("beta/beta_parallel_1000.fth")
df = df.reset_index(drop=True)

In [3]:
df = df.drop(['index'],axis=1)

In [4]:
df2 = pd.read_feather("beta/beta_parallel_2000.fth").drop(['index'],axis=1)

In [5]:
df = pd.concat([df, df2], axis=0, ignore_index=True).reset_index(drop=True)
del df2

In [6]:
df

Unnamed: 0,seconds_in_bucket,time_id,bid_price1,ask_price1,bid_price2,ask_price2,bid_size1,ask_size1,bid_size2,ask_size2,stock_id,beta,beta2,beta-300,beta2-300
0,0,5,194.074066,194.244431,194.064026,194.254471,3,226,2,100,0,0.617274,0.651555,0.737558,0.778640
1,1,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
2,2,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
3,3,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
4,4,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
134390995,595,16629,194.241882,194.362045,194.141754,194.372055,115,200,20,1,126,2.100011,1.987291,2.105483,2.074515
134390996,596,16629,194.241882,194.362045,194.141754,194.472183,115,200,20,100,126,2.100011,1.987291,2.105483,2.074515
134390997,597,16629,194.241882,194.362045,194.141754,194.462173,115,201,20,100,126,2.100011,1.987291,2.105483,2.074515
134390998,598,16629,194.241882,194.362045,194.141754,194.462173,115,201,20,100,126,2.100011,1.987291,2.105483,2.074515


In [7]:
df3 = pd.read_feather("beta/beta_parallel_3000.fth").drop(['index'],axis=1)
df = pd.concat([df, df3], axis=0, ignore_index=True).reset_index(drop=True)
del df3

In [8]:
df

Unnamed: 0,seconds_in_bucket,time_id,bid_price1,ask_price1,bid_price2,ask_price2,bid_size1,ask_size1,bid_size2,ask_size2,stock_id,beta,beta2,beta-300,beta2-300
0,0,5,194.074066,194.244431,194.064026,194.254471,3,226,2,100,0,0.617274,0.651555,0.737558,0.778640
1,1,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
2,2,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
3,3,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
4,4,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
201585595,595,25250,184.177078,184.387161,184.137054,184.407166,101,100,100,59,126,0.852605,0.931619,-2.318214,-1.716217
201585596,596,25250,184.177078,184.387161,184.137054,184.407166,101,100,100,59,126,0.852605,0.931619,-2.318214,-1.716217
201585597,597,25250,184.147049,184.387161,184.137054,184.407166,1,150,100,9,126,0.852605,0.931619,-2.318214,-1.716217
201585598,598,25250,184.167068,184.387161,184.137054,184.407166,102,150,100,9,126,0.852605,0.931619,-2.318214,-1.716217


In [9]:
df4 = pd.read_feather("beta/beta_parallel_3830.fth").drop(['index'],axis=1)
df = pd.concat([df, df4], axis=0, ignore_index=True).reset_index(drop=True)
del df4

In [10]:
df  #should be 257359200 rows x 15 columns

Unnamed: 0,seconds_in_bucket,time_id,bid_price1,ask_price1,bid_price2,ask_price2,bid_size1,ask_size1,bid_size2,ask_size2,stock_id,beta,beta2,beta-300,beta2-300
0,0,5,194.074066,194.244431,194.064026,194.254471,3,226,2,100,0,0.617274,0.651555,0.737558,0.778640
1,1,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
2,2,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
3,3,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
4,4,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
257359195,595,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,-0.520259,-0.371785,-1.238651,-2.130582
257359196,596,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,-0.520259,-0.371785,-1.238651,-2.130582
257359197,597,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,-0.520259,-0.371785,-1.238651,-2.130582
257359198,598,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,-0.520259,-0.371785,-1.238651,-2.130582


In [11]:
df.to_feather("beta/beta_parallel_all.fth")

In [21]:
df

Unnamed: 0,seconds_in_bucket,time_id,bid_price1,ask_price1,bid_price2,ask_price2,bid_size1,ask_size1,bid_size2,ask_size2,stock_id,beta,beta2,beta-300,beta2-300
0,0,5,194.074066,194.244431,194.064026,194.254471,3,226,2,100,0,0.617274,0.651555,0.737558,0.778640
1,1,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
2,2,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
3,3,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
4,4,5,194.074066,194.244431,194.064026,194.254471,3,100,2,100,0,0.617274,0.651555,0.737558,0.778640
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
257359195,595,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,-0.520259,-0.371785,-1.238651,-2.130582
257359196,596,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,-0.520259,-0.371785,-1.238651,-2.130582
257359197,597,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,-0.520259,-0.371785,-1.238651,-2.130582
257359198,598,32767,194.781387,194.871613,194.771362,194.881653,101,100,1,100,126,-0.520259,-0.371785,-1.238651,-2.130582


In [22]:
betas = df.drop(["seconds_in_bucket","bid_price1","bid_price2","ask_price1","ask_price2",
                 "bid_size1","bid_size2","ask_size1","ask_size2"], axis=1)
del df
betas

Unnamed: 0,time_id,stock_id,beta,beta2,beta-300,beta2-300
0,5,0,0.617274,0.651555,0.737558,0.778640
1,5,0,0.617274,0.651555,0.737558,0.778640
2,5,0,0.617274,0.651555,0.737558,0.778640
3,5,0,0.617274,0.651555,0.737558,0.778640
4,5,0,0.617274,0.651555,0.737558,0.778640
...,...,...,...,...,...,...
257359195,32767,126,-0.520259,-0.371785,-1.238651,-2.130582
257359196,32767,126,-0.520259,-0.371785,-1.238651,-2.130582
257359197,32767,126,-0.520259,-0.371785,-1.238651,-2.130582
257359198,32767,126,-0.520259,-0.371785,-1.238651,-2.130582


In [26]:
betas.memory_usage()  

Index               128
time_id      1029436800
stock_id     1029436800
beta         1029436800
beta2        1029436800
beta-300     1029436800
beta2-300    1029436800
dtype: int64

In [32]:
betas_condensed = betas.groupby(["stock_id","time_id"], as_index=False).first()
betas_condensed

Unnamed: 0,stock_id,time_id,beta,beta2,beta-300,beta2-300
0,0,5,0.617274,0.651555,0.737558,0.778640
1,0,11,0.792696,0.719935,0.412667,0.450700
2,0,16,6.263979,6.132343,4.284855,1.519335
3,0,31,1.331730,1.513052,3.499825,2.378868
4,0,62,0.097950,0.064668,0.250785,0.973303
...,...,...,...,...,...,...
428927,126,32751,1.175407,1.165029,3.484390,1.049953
428928,126,32753,-2.039227,-1.317102,3.307661,2.160123
428929,126,32758,0.395725,0.591536,-0.570789,-0.046489
428930,126,32763,0.111783,0.218478,0.959953,1.315896


In [33]:
betas_condensed.to_feather("beta/betas_condensed.fth")