In [1]:
import pandas as pd
import os
from pathlib import Path
import gc
import multiprocessing
from joblib import Parallel, delayed
from tqdm import tqdm

import cudf
import numba
from numba import cuda

from src.data_utils import load_recent_data_from_file, save_daily_data, save_in_folders, get_latest_date
from src.config import DAILY_DATA_DIR, DATA_DIR, DAILY_PRIMARY_FEATURES_DIR

In [2]:
# reload modules
import importlib
import src.data_utils
importlib.reload(src.data_utils)

<module 'src.data_utils' from '/mnt/d/nmr/signals_prod/src/data_utils.py'>

In [3]:
recent_data = load_recent_data_from_file(DAILY_DATA_DIR, n_days=-1).reset_index().sort_values(
    by=["bloomberg_ticker", "date"]
)

recent_data[
    ["open", "close", "high", "low", "adjusted_close", "dividend_amount", "split_ratio"]
] = recent_data[
    ["open", "close", "high", "low", "adjusted_close", "dividend_amount", "split_ratio"]
].astype(
    "float16"
)

# filter out tickers with less than 100 days of data
recent_data = recent_data.groupby("bloomberg_ticker").filter(lambda x: len(x) > 100)
recent_data = recent_data.groupby("date").filter(lambda x: len(x) > 500)
gc.collect()

recent_data.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 42528901 entries, 42579060 to 1709
Data columns (total 13 columns):
 #   Column            Dtype         
---  ------            -----         
 0   date              datetime64[ns]
 1   open              float16       
 2   high              float16       
 3   low               float16       
 4   close             float16       
 5   adjusted_close    float16       
 6   volume            float64       
 7   data_provider     object        
 8   bloomberg_ticker  object        
 9   dividend_amount   float16       
 10  split_ratio       float16       
 11  date_str          object        
 12  split_factor      object        
dtypes: datetime64[ns](1), float16(7), float64(1), object(4)
memory usage: 2.8+ GB


In [4]:
def simple_moving_average(df, window):
    res = df["close"].rolling(window).mean()
    res.name = f"sma_{window}"
    return res


def exponential_moving_average(df, window):
    cum_sum = df["close"].rolling(window).sum()
    cum_count = df["close"].rolling(window).count()
    ema = cum_sum / cum_count
    ema.name = f"ema_{window}"
    return ema


def bollinger_bands(df, window):
    sma = simple_moving_average(df, window)
    std = df["close"].rolling(window).std()
    upper = sma + 2 * std
    lower = sma - 2 * std
    upper.name = f"bbupper_{window}"
    lower.name = f"bblower_{window}"
    return upper, lower


def rsi(df, window):
    delta = df["close"].diff()
    up_days = delta.copy()
    up_days[delta <= 0] = 0.0
    down_days = abs(delta.copy())
    down_days[delta > 0] = 0.0
    RS_up = up_days.rolling(window).mean()
    RS_down = down_days.rolling(window).mean()
    rsi = 100 - 100 / (1 + RS_up / RS_down)
    rsi.name = f"rsi_{window}"
    return rsi


def macd(df, window_fast, window_slow):
    ema_fast = exponential_moving_average(df, window_fast)
    ema_slow = exponential_moving_average(df, window_slow)
    macd = ema_fast - ema_slow
    macd.name = f"macd_{window_fast}_{window_slow}"
    return macd


# ATR
def average_true_range(df, window):
    tr = df[["high", "low", "close"]].max(axis=1) - df[["high", "low", "close"]].min(
        axis=1
    )
    atr = tr.rolling(window).mean()
    atr.name = f"atr_{window}"
    return atr


function_to_window: dict = {
    simple_moving_average: [5, 10, 20, 50, 100, 200],
    exponential_moving_average: [5, 10, 20, 50, 100, 200],
    bollinger_bands: [5, 10, 20, 50, 100, 200],
    rsi: [5, 10, 20, 50, 100, 200],
    average_true_range: [5, 10, 20, 50, 100, 200],
    macd: [(12, 26), (20, 50)],
}


def compute_features(df):
    features = []
    for func, windows in function_to_window.items():
        for window in windows:
            # pass windows as a tuple if the function takes more than one window
            if isinstance(window, tuple):
                _feat = func(df, *window)
            else:
                _feat = func(df, window)

            if isinstance(_feat, tuple):
                features.extend(_feat)
            else:
                features.append(_feat)

    # print type of features
    cated = cudf.concat(features, axis=1).astype("float32").add_prefix("feature_1_")
    return cated


In [5]:
tickers_list = recent_data["bloomberg_ticker"].unique().tolist()

# iterate over ticker chunks in 500
res = []
for i in tqdm(range(0, len(tickers_list), 1000)):
    tickers = tickers_list[i : i + 1000]
    #print(tickers)
    tickers_data = recent_data[recent_data["bloomberg_ticker"].isin(tickers)]
    
    _df_gpu = cudf.from_pandas(tickers_data)
    _res = compute_features(_df_gpu)
    _res = _res.to_pandas().astype("float16")
    _res["date"] = _df_gpu["date"].to_pandas()
    _res["bloomberg_ticker"] = _df_gpu["bloomberg_ticker"].to_pandas()
    _res["close"] = _df_gpu["close"].to_pandas()
    _res["volume"] = _df_gpu["volume"].to_pandas()
    _res["open"] = _df_gpu["open"].to_pandas()
    _res["high"] = _df_gpu["high"].to_pandas()
    _res["low"] = _df_gpu["low"].to_pandas()


    res.append(_res)

    del _df_gpu, _res

100%|██████████| 12/12 [02:20<00:00, 11.73s/it]


In [6]:
gc.collect()
res = pd.concat(res, axis=0)
res = res.dropna(axis=0)

# convert float 16 to float 32 in a loop
for col in res.columns:
    if res[col].dtype == "float16":
        res[col] = res[col].astype("float32")
    gc.collect()

gc.collect()

del recent_data
gc.collect()

0

In [7]:
# loop over all unique dates in chunks of 100; save each chunk to a separate file
res["date_str"] = res["date"].dt.strftime("%Y-%m-%d")
dates = res["date_str"].unique()

for i in tqdm(range(0, len(dates), 100)):
    # use save_in_folders function to save each chunk to a separate folder
    _tmp = res[res["date_str"].isin(dates[i : i + 100])]
    save_in_folders(_tmp, DAILY_PRIMARY_FEATURES_DIR)

    del _tmp
    gc.collect()

100%|██████████| 100/100 [00:04<00:00, 23.41it/s]
100%|██████████| 100/100 [00:01<00:00, 65.25it/s]
100%|██████████| 100/100 [00:01<00:00, 73.56it/s]
100%|██████████| 100/100 [00:01<00:00, 99.34it/s]
100%|██████████| 100/100 [00:01<00:00, 97.88it/s]
100%|██████████| 100/100 [00:01<00:00, 66.79it/s]
100%|██████████| 100/100 [00:01<00:00, 65.60it/s]
100%|██████████| 100/100 [00:00<00:00, 111.32it/s]
100%|██████████| 100/100 [00:00<00:00, 108.54it/s]
100%|██████████| 100/100 [00:01<00:00, 95.13it/s]
100%|██████████| 100/100 [00:00<00:00, 102.17it/s]
100%|██████████| 100/100 [00:01<00:00, 87.02it/s]
100%|██████████| 100/100 [00:01<00:00, 70.21it/s]
100%|██████████| 100/100 [00:01<00:00, 97.84it/s]
100%|██████████| 100/100 [00:01<00:00, 97.65it/s]
100%|██████████| 100/100 [00:00<00:00, 100.03it/s]
100%|██████████| 100/100 [00:00<00:00, 113.31it/s]
100%|██████████| 100/100 [00:01<00:00, 81.70it/s]
100%|██████████| 100/100 [00:01<00:00, 59.76it/s]
100%|██████████| 100/100 [00:01<00:00, 83.18i