In [1]:
from utilities import database as db
import pandas as pd
import numpy as np
import logging
import os
from tqdm import tqdm

In [2]:
### Additional setings ###

# Logging
logging.basicConfig(
    filename="datasets_construction.log",
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

# Queries

In [3]:
tickers_query = '''
select 
distinct ticker 
from sp500_constituents_hist_prices_proc
'''

joint_data_query = '''
with ticker_data as (
	select *
	from public.sp500_constituents_hist_prices_proc s
	where true
	and s.date >= cast('2001-01-01' as date)
	and s.ticker = '__TICKER__'
)
, sp500_periods as (
	select 
		hc.*, 
		1 as in_sp500 
	from public.sp500_hist_constituents hc
	where hc.ticker = '__TICKER__'
)
select 
	cast(ticker_data.date as date) as date_1,
	ticker_data.*,
	usd_index.close as usd_index,
	gold.close as gold,
	vix.close as vix,
	fedfunds.fed_funds_rate,
	yield_curve."1_mo" as yc_1_mo,
	yield_curve."3_mo" as yc_3_mo,
	yield_curve."6_mo" as yc_6_mo,
	yield_curve."1_yr" as yc_1_yr,
	yield_curve."2_yr" as yc_2_yr,
	yield_curve."3_yr" as yc_3_yr,
	yield_curve."5_yr" as yc_5_yr,
	yield_curve."10_yr" as yc_10_yr,
	yield_curve."20_yr" as yc_20_yr,
	yield_curve."30_yr" as yc_30_yr,
	coalesce(sp500_periods.in_sp500, 0) as in_sp500,
	spy.close as spy
	
from ticker_data

left join public.spy_ohlcv_raw spy
on cast(spy.date as date) = cast(ticker_data.date as date)

left join public.us_fin_indicators_raw usd_index
on cast(usd_index.date as date) = cast(ticker_data.date as date)
and usd_index.ticker = 'DX-Y.NYB'

left join public.us_fin_indicators_raw gold
on cast(gold.date as date) = cast(ticker_data.date as date)
and gold.ticker = 'GC=F'

left join public.us_fin_indicators_raw vix
on cast(vix.date as date) = cast(ticker_data.date as date)
and vix.ticker = '^VIX'

left join public.us_fed_funds_rate_raw fedfunds
on cast(fedfunds.date as date) = cast(ticker_data.date as date)

left join public.us_treasury_yield_curve_raw yield_curve
on cast(yield_curve.date as date) = cast(ticker_data.date as date)

left join sp500_periods
on cast(sp500_periods.date_added as date) <= cast(ticker_data.date as date)
and cast(sp500_periods.date_removed as date) >= cast(ticker_data.date as date)

where true
order by ticker_data.date asc
'''

In [4]:
# Get Unique Tickers
def get_tickers():
    data = pd.read_sql(tickers_query, db.get_engine())
    tickers = data['ticker'].unique()
    return tickers

# Get Joint Data
def get_joint_data(ticker):
    query = joint_data_query.replace('__TICKER__', ticker)
    data = pd.read_sql(query, db.get_engine())
    return data

def process_data(data):
    # Convert date to datetime
    data['date'] = pd.to_datetime(data['date_1']).dt.date
    
    # Fill missing values
    for p in ['sma', 'ema']:
        data[f'{p}_7'].ffill(inplace=True)
        data[f'{p}_7'].bfill(inplace=True)
        data[f'{p}_14'].fillna(data[f'{p}_7'], inplace=True)
        data[f'{p}_20'].fillna(data[f'{p}_14'], inplace=True)
        data[f'{p}_50'].fillna(data[f'{p}_20'], inplace=True)
        data[f'{p}_100'].fillna(data[f'{p}_50'], inplace=True)
        data[f'{p}_200'].fillna(data[f'{p}_100'], inplace=True)
    
    for p in ['mom']:
        data[f'{p}_10'].ffill(inplace=True)
        data[f'{p}_10'].bfill(inplace=True)
        data[f'{p}_20'].fillna(data[f'{p}_10'], inplace=True)
        data[f'{p}_50'].fillna(data[f'{p}_20'], inplace=True)

    for p in ['atr']:
        data[f'{p}_14'].ffill(inplace=True)
        data[f'{p}_14'].bfill(inplace=True)
        data[f'{p}_30'].fillna(data[f'{p}_14'], inplace=True)

    data.ffill(inplace=True)
    data.bfill(inplace=True)
    
    # Calculate daily returns
    data['daily_return'] = data['close'].pct_change()
    data['spy_daily_return'] = data['spy'].pct_change()

    data.drop(columns=['date_1'], inplace=True)
    return data

def collection_loop(tickers):
    issue_tickers = []
    for ticker in tqdm(tickers):
        try:
            data = get_joint_data(ticker)
            data = process_data(data)
            data.to_csv(os.path.join(data_dir, f"{ticker}.csv"), index=False)
        except Exception as e:
            logging.error(f"Error processing {ticker}: {e}")
            issue_tickers.append(ticker)
            continue
    return issue_tickers

# CSV files generation

In [None]:
data_dir  = "data"
if not os.path.exists(data_dir):
    os.makedirs(data_dir)

tickers = get_tickers()

while len(tickers) > 0:
    issue_tickers = collection_loop(tickers)
    if len(issue_tickers) == 0:
        break
    else:
        tickers = issue_tickers
        logging.info(f"Retrying {len(tickers)} tickers")



  0%|          | 0/802 [00:00<?, ?it/s]