In [1]:
# import standard libraries
import os
from glob import glob
import matplotlib.pyplot as plt
import multiprocessing
import numpy as np
# third-party libraries
import dask
import dask.multiprocessing
from dask.diagnostics import ProgressBar
import pandas as pd
from numba import jit
from sklearn.linear_model import LinearRegression
import seaborn as sns
from tqdm.notebook import tqdm
# local libraries
from fbd import get_monthly_returns, ols_fast
from fbd import START, END

In [2]:
filenames = glob('data/returns/raw/*.parquet')
dates_start = pd.date_range(START, END, freq='MS')
dates_dt = dates_start.strftime("%Y-%m")

# Data

## Fama-French

In [3]:
fama = pd.read_csv('data/fama/fama.csv.gz', compression='gzip', index_col=0)

In [4]:
# fama are in returns (%)
fama = fama/100

In [5]:
# separate fama and riskless rate
rf = fama[['RF']]
fama.drop(columns=['RF'], inplace=True)

In [6]:
# get another set of factors
fama = fama[['Mkt-RF', 'SMB', 'HML']]
fama.columns

Index(['Mkt-RF', 'SMB', 'HML'], dtype='object')

In [7]:
num_feat = fama.shape[1]
num_feat

3

## Constituents

In [8]:
const_mat = pd.read_csv('data/russell3000.csv.gz', compression='gzip',
                        index_col=0)

# Compute loadings

In [9]:
dates = pd.date_range(START, END, freq='M')
dates = dates.strftime("%Y-%m-%d")

In [10]:
def write_monthly_loadings(date, di, const_mat, fama, rf):
    date_b = date[:7]
    date_a = dates[di-1][:7]
    d_ret = get_monthly_returns(date_b, const_mat, months=2)

    # filter fama factors to dates of returns
    d_fama = fama.loc[d_ret.index]
    d_rf = rf.loc[d_ret.index]

    # compute excess returns
    d_exret = np.subtract(d_ret, d_rf)

    # get number of rows of previous month as window size
    w = d_ret.index.str.startswith(date_a).sum()

    # roll the window
    d_days_index = range(w, d_ret.shape[0])
    d_days = d_exret.index[d_days_index].values

    for ti, t in enumerate(d_days_index):
        # get metrics at t
        d_t_days = d_days[ti]
        
        filename = f'data/loadings/{num_feat}_factor/{d_t_days}.parquet'
        if not os.path.exists(filename):
            d_t_exret = d_ret.iloc[t-w:t, :]
            d_t_fama = d_fama.iloc[t-w:t, :]
            # perform fast OLS on each columns
            d_t_loadings = [ols_fast(d_t_fama, d_t_exret.loc[:, p], p) for p in d_t_exret]

            # concatenate loadings
            d_t_loadings = pd.concat(d_t_loadings, axis=1)
                        
            d_t_loadings.index = d_t_loadings.index.astype(str)

            # save loadings to parquet
            d_t_loadings.to_parquet(filename)

In [11]:
# default setting for Dask
dask.config.set(scheduler='processes')

<dask.config.set at 0x7f891ae15c40>

In [12]:
%%time
with ProgressBar(): 
    promises = []
    for di, date in enumerate(dates):
        # skip first month
        if di == 0:
            continue

        promises.append(dask.delayed(write_monthly_loadings)(date, di, const_mat, fama, rf))

    dask.compute(promises)[0]

[########################################] | 100% Completed | 19min 55.1s
CPU times: user 8.56 s, sys: 1.47 s, total: 10 s
Wall time: 19min 55s
