In [46]:
from datetime import datetime
import sys
from os.path import abspath, exists
from os import makedirs
import pandas as pd
from pandas_datareader import data as pdr
import yfinance as yf
from statsmodels.tsa.stattools import adfuller
from hurst import compute_Hc
yf.pdr_override()

import import_ipynb
from a_data_download import download_data

In [47]:
ROOT_DIR = abspath('../')
sys.path.append(ROOT_DIR)
TICKER_FILE_PATH = f"{ROOT_DIR}/data/sp500_tickers.csv"
PARQUET_FILE_PATH = f"{ROOT_DIR}/data/sp500_stock_data.parquet"

START_DATE = datetime(2013, 1, 1)
END_DATE = datetime(2023, 1, 1)

def calculate_years() -> int:
    return int((END_DATE - START_DATE).days / 365)

YEARS = calculate_years()

In [48]:
def load_tickers():
    sp500_df = pd.read_csv(TICKER_FILE_PATH)
    sp500_stock_data = pd.read_parquet(PARQUET_FILE_PATH) 
    return sp500_df['Ticker'].tolist(), sp500_stock_data



def is_not_null(close_data: pd.Series) -> bool:
    years = calculate_years()
    return len(close_data) >= 251.5 * years
    
def perform_adf_test(close_data: pd.Series) -> float:
    result = adfuller(close_data)
    p_value = result[1] 
    if isinstance(p_value, (float)):
        return float(p_value) 
    else:
        raise ValueError("Unexpected type for p-value in ADF test result")
    
def calculate_hurst_exponent(close_data: pd.Series) -> float:
    H, c, data = compute_Hc(close_data, kind="price", simplified=True)
    return H
    
    
def adf_filter(significance_level: float = 0.05):
    sp500_tickers, sp500_stock_data = load_tickers()
    filtered_tickers = []
    for ticker in sp500_tickers:
        if ticker in sp500_stock_data:
            close_data = sp500_stock_data[ticker][START_DATE : END_DATE][
                "close"
            ].dropna()
            if is_not_null(close_data):
                p_value = perform_adf_test(close_data)
                if p_value <= significance_level:
                    filtered_tickers.append((ticker, p_value))

    filtered_tickers.sort(key=lambda x: x[1])
    top_n_tickers = [ticker for ticker, _ in filtered_tickers[:10]]
    top_n_tickers = pd.DataFrame(top_n_tickers)
    parquet_file_path = f"{ROOT_DIR}/data/clusters/adf_cluster_{YEARS}y.parquet"
    top_n_tickers.to_parquet(parquet_file_path, engine="pyarrow")

    
def hurst_filter(threshold: float = 0.5):
    sp500_tickers, sp500_stock_data = load_tickers()
    filtered_tickers = []
    for ticker in sp500_tickers:
        if ticker in sp500_stock_data:
            close_data = sp500_stock_data[ticker][START_DATE : END_DATE][
                "close"
            ].dropna()
            if is_not_null(close_data):
                hurst_exponent = calculate_hurst_exponent(close_data)
                if hurst_exponent <= threshold:
                    filtered_tickers.append((ticker, hurst_exponent))

    filtered_tickers.sort(key=lambda x: x[1])
    top_n_tickers = [ticker for ticker, _ in filtered_tickers[:10]]
    top_n_tickers = pd.DataFrame(top_n_tickers)
    parquet_file_path = f"{ROOT_DIR}/data/clusters/hurst_cluster_{YEARS}y.parquet"
    top_n_tickers.to_parquet(parquet_file_path, engine="pyarrow")
    
    
    

In [49]:
def cluster_data(type: str):
    if type == "adf":
        adf_filter()
    elif type == "hurst":
        hurst_filter()

In [50]:
if __name__ == "__main__":
    cluster_data("hurst")