In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import pickle
import yfinance as yf
import os 

In [2]:
def get_Data(ticker, start_date = "2005-01-01", end_date = "2025-01-19"):
    try:
        ticker_data = yf.Ticker(ticker)
        historical_data = ticker_data.history(interval="1d", start=start_date, end=end_date)
        return historical_data
    except Exception as e:
        print(f"Error fetching data for {ticker}: {e}")

In [3]:
# base model data
tickers = ["GDXJ", "GDX", "GLDM"]
data_base = {}
for ticker in tickers:
    print(f"Fetching data for {ticker}...")
    data = get_Data(ticker, start_date="2005-01-01", end_date="2025-01-19")
    data = data.reset_index()
    data_base[ticker] = data

Fetching data for GDXJ...
Fetching data for GDX...
Fetching data for GLDM...


In [4]:
#target data(ensemble)
target_stock = ["GLD"]
data_ensemble = {}
for ticker in target_stock:
    target_stock_data = get_Data(ticker, start_date="2005-01-01", end_date="2025-01-19") 
    target_stock_data = target_stock_data.reset_index()
    data_ensemble[ticker] = target_stock_data

In [5]:
def calculate_rsi(df: pd.DataFrame, period: int = 14) -> pd.Series:
    """
    Calculate the Relative Strength Index (RSI) using the 'Close' column.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing at least a 'Close' column.
        period (int): Look-back period for the RSI (default: 14).
    
    Returns:
        pd.Series: RSI values.
    """
    close = df['Close']
    delta = close.diff()
    gain = delta.where(delta > 0, 0).rolling(window=period).mean()
    loss = -delta.where(delta < 0, 0).rolling(window=period).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi


def calculate_bollinger_band_width(df: pd.DataFrame, window: int = 20, num_std: float = 2) -> pd.Series:
    """
    Calculate the Bollinger Bands Width using the 'Close' column.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing at least a 'Close' column.
        window (int): Rolling window period (default: 20).
        num_std (float): Number of standard deviations for the bands (default: 2).
    
    Returns:
        pd.Series: Bollinger Bands width.
    """
    close = df['Close']
    ma = close.rolling(window=window).mean()
    std = close.rolling(window=window).std()
    upper_band = ma + num_std * std
    lower_band = ma - num_std * std
    bb_width = (upper_band - lower_band) / ma
    return bb_width


def calculate_adx(df: pd.DataFrame, window: int = 14) -> pd.Series:
    """
    Calculate the Average Directional Index (ADX) using the 'High', 'Low', and 'Close' columns
    based on Wilder's smoothing method.

    Parameters:
        df (pd.DataFrame): DataFrame containing 'High', 'Low', and 'Close' columns.
        window (int): Look-back period (default: 14).

    Returns:
        pd.Series: ADX values.
    """
    high = df['High']
    low = df['Low']
    close = df['Close']
    
    # Calculate previous values
    prev_close = close.shift(1)
    prev_high = high.shift(1)
    prev_low = low.shift(1)
    
    # True Range: max(high - low, abs(high - prev_close), abs(low - prev_close))
    tr1 = high - low
    tr2 = (high - prev_close).abs()
    tr3 = (low - prev_close).abs()
    true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    
    # Directional Movements
    up_move = high - prev_high
    down_move = prev_low - low

    plus_dm = up_move.where((up_move > down_move) & (up_move > 0), 0)
    minus_dm = down_move.where((down_move > up_move) & (down_move > 0), 0)
    
    # Apply Wilder's smoothing using exponential moving average (alpha=1/window)
    tr_smooth = true_range.ewm(alpha=1/window, min_periods=window, adjust=False).mean()
    plus_dm_smooth = plus_dm.ewm(alpha=1/window, min_periods=window, adjust=False).mean()
    minus_dm_smooth = minus_dm.ewm(alpha=1/window, min_periods=window, adjust=False).mean()

    # Calculate Directional Indicators
    plus_di = 100 * (plus_dm_smooth / tr_smooth)
    minus_di = 100 * (minus_dm_smooth / tr_smooth)
    
    # DX: Directional Index
    dx = 100 * ( (plus_di - minus_di).abs() / (plus_di + minus_di) )
    
    # ADX: Average Directional Index is the smoothed DX
    adx = dx.ewm(alpha=1/window, min_periods=window, adjust=False).mean()
    
    return adx


def calculate_volume_roc(df: pd.DataFrame, period: int = 20) -> pd.Series:
    """
    Calculate the Volume Rate of Change (ROC) using the 'Volume' column.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing a 'Volume' column.
        period (int): Look-back period (default: 20).
    
    Returns:
        pd.Series: Volume ROC values (in percent).
    """
    volume = df['Volume']
    volume_shifted = volume.shift(period)
    roc = ((volume - volume_shifted) / volume_shifted) * 100
    return roc


def calculate_price_zscore(df: pd.DataFrame, window: int = 20) -> pd.Series:
    """
    Calculate the rolling Z-score of the price using the 'Close' column.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing a 'Close' column.
        window (int): Rolling window period (default: 20).
    
    Returns:
        pd.Series: Z-score values.
    """
    close = df['Close']
    rolling_mean = close.rolling(window=window).mean()
    rolling_std = close.rolling(window=window).std()
    zscore = (close - rolling_mean) / rolling_std
    return zscore


def calculate_skewness(df: pd.DataFrame, window: int = 20) -> pd.Series:
    """
    Calculate the rolling skewness of the price using the 'Close' column.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing a 'Close' column.
        window (int): Rolling window period (default: 20).
    
    Returns:
        pd.Series: Skewness values.
    """
    close = df['Close']
    return close.rolling(window=window).skew()


def calculate_macd(df: pd.DataFrame, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> pd.DataFrame:
    """
    Calculate the Moving Average Convergence Divergence (MACD) indicator using the 'Close' column.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing a 'Close' column.
        fast_period (int): Period for the fast EMA (default: 12).
        slow_period (int): Period for the slow EMA (default: 26).
        signal_period (int): Period for the signal line EMA (default: 9).
    
    Returns:
        pd.DataFrame: A DataFrame with columns for MACD line, Signal line, and MACD Histogram.
    """
    close = df['Close']
    ema_fast = close.ewm(span=fast_period, adjust=False).mean()
    ema_slow = close.ewm(span=slow_period, adjust=False).mean()
    macd_line = ema_fast - ema_slow
    signal_line = macd_line.ewm(span=signal_period, adjust=False).mean()
    macd_hist = macd_line - signal_line
    return pd.DataFrame({
        'MACD': macd_line,
        'MACD_Signal': signal_line,
        'MACD_Hist': macd_hist
    })


def calculate_stochastic(df: pd.DataFrame, k_period: int = 14, d_period: int = 3) -> pd.DataFrame:
    """
    Calculate the Stochastic Oscillator using the 'High', 'Low', and 'Close' columns.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing 'High', 'Low', and 'Close' columns.
        k_period (int): Look-back period for %K (default: 14).
        d_period (int): Smoothing period for %D (default: 3).
    
    Returns:
        pd.DataFrame: A DataFrame with '%K' and '%D' columns.
    """
    high = df['High']
    low = df['Low']
    close = df['Close']
    lowest_low = low.rolling(window=k_period).min()
    highest_high = high.rolling(window=k_period).max()
    percent_k = 100 * (close - lowest_low) / (highest_high - lowest_low)
    percent_d = percent_k.rolling(window=d_period).mean()
    return pd.DataFrame({
        'Stochastic_%K': percent_k,
        'Stochastic_%D': percent_d
    })


def calculate_std(df: pd.DataFrame, window: int = 20) -> pd.Series:
    """
    Calculate the rolling standard deviation using the 'Close' column.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing a 'Close' column.
        window (int): Rolling window period (default: 20).
    
    Returns:
        pd.Series: Rolling standard deviation.
    """
    close = df['Close']
    return close.rolling(window=window).std()


def calculate_fibonacci_retracement(df: pd.DataFrame,
                                    fib_levels: list = [0.236, 0.382, 0.5, 0.618, 0.786]) -> pd.DataFrame:
    """
    Calculate Fibonacci retracement levels based on the highest high and lowest low in the DataFrame.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing 'High' and 'Low' columns.
        fib_levels (list): List of Fibonacci levels (default: [0.236, 0.382, 0.5, 0.618, 0.786]).
    
    Returns:
        pd.DataFrame: A DataFrame with constant Fibonacci levels for each row.
    """
    swing_high = df['High'].max()
    swing_low = df['Low'].min()
    diff = swing_high - swing_low
    levels = {f'Fib_{int(level*100)}': swing_high - diff * level for level in fib_levels}
    fib_df = pd.DataFrame({key: [value] * len(df) for key, value in levels.items()}, index=df.index)
    return fib_df


In [6]:
def calculate_all_indicators(df: pd.DataFrame) -> pd.DataFrame:
    """
    Create a feature DataFrame by combining the original OHLCV (and any extra) columns
    with various technical indicators computed from the DataFrame.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing the necessary columns.
    
    Returns:
        pd.DataFrame: A DataFrame with the original data and the calculated indicators.
    """
    features = pd.DataFrame(index=df.index)
    
    features['Date'] = df['Date']

    # Copy essential OHLCV data
    for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
        if col in df.columns:
            features[col] = df[col]
    
    # Copy additional columns if available
    for col in ['Dividends', 'Stock Splits', 'Capital Gains']:
        if col in df.columns:
            features[col] = df[col]
    period_ranges = [14, 20, 50, 200]
    # Add technical indicators
    for period in period_ranges:            
        features[f'RSI_{period}'] = calculate_rsi(df, period=period)
        features[f'BB_Width_{period}'] = calculate_bollinger_band_width(df, window=period, num_std=2)
        features[f'ADX_{period}'] = calculate_adx(df, window=period)
        features[f'Volume_ROC_{period}'] = calculate_volume_roc(df, period=period)
        features[f'Price_Z_Score_{period}'] = calculate_price_zscore(df, window=period)
        features[f'Skewness_{period}'] = calculate_skewness(df, window=period)
        # Rolling standard deviation (e.g., of the 'Close' price)
        features[f'Std_{period}'] = calculate_std(df, window=period)
    
    # Merge MACD and Stochastic indicators (returned as DataFrames)
    macd_df = calculate_macd(df)
    stochastic_df = calculate_stochastic(df)
    features = features.join(macd_df)
    features = features.join(stochastic_df)

    
    # Fibonacci retracement levels (constant across time, useful for overlays)
    fib_df = calculate_fibonacci_retracement(df)
    features = features.join(fib_df)
    
    return features

In [7]:
def calendar_features (df: pd.DataFrame)-> pd.DataFrame:
    df['Date'] = pd.to_datetime(df['Date'], utc=True)
    df['is_trading_day'] = df['Date'].dt.dayofweek.apply(lambda x: 1 if x < 5 else 0)
    df['day_of_week'] = df['Date'].dt.dayofweek
    df['day_of_month'] = df['Date'].dt.day
    df['week_of_year'] = df['Date'].dt.isocalendar().week
    df['month'] = df['Date'].dt.month
    df['quarter'] = df['Date'].dt.quarter

    # Period End Indicators
    df['is_month_end'] = df['Date'].dt.is_month_end.astype(int)
    df['is_quarter_end'] = df['Date'].dt.is_quarter_end.astype(int)
    df['is_year_end'] = df['Date'].dt.is_year_end.astype(int)
    df['days_to_month_end'] = df['Date'].dt.days_in_month - df['Date'].dt.day
    
    return df

In [8]:
#removing features that have no info or no use or overlapped taht causing error
def feature_selection(df, remove_features = ["Date","Dividends", "Stock Splits", "Capital Gains", 'Open', 'High', 'Low', 'Close', 'Volume']):
    df = df.drop(columns=remove_features)
    return df

In [9]:
def indicator_building(data: pd.DataFrame) -> pd.DataFrame:
    """
    Compute technical indicators and merge them with the original dataset.

    Parameters:
        data (pd.DataFrame): A DataFrame containing the original data. The DataFrame
                             must include the required columns (e.g., 'Open', 'High',
                             'Low', 'Close', 'Volume') for computing the technical indicators.

    Returns:
        pd.DataFrame: A new DataFrame that contains both the original data columns and
                      the additional technical indicator features.
    """
    features_df = calculate_all_indicators(data)
    #print(f"Data shape: {features_df.shape}")
    features_df = calendar_features(features_df)
    #print(f"Data shape: {features_df.shape}")
    features_df = feature_selection(features_df)
    #replace NaN values with 0, since when calculating the indicators, the first n values will be NaN
    #features_df = features_df.fillna(0)
    #dropping NaN values
    #print(f"Data shape: {data.shape}")
    data = data.join(features_df)
    #print(f"Data shape: {data.shape}")
    data = data.dropna()
    #print(f"Data shape: {data.shape}")
    data = data.drop(columns=["Date"])
    #print(f"Data shape: {data.shape}")
    return data

In [10]:
#####################################################################
#               individual-prediction-model design data             #
#####################################################################
def create_sliding_window_data(data, window_size, target_days):
    X, y = [], []

    # Loop through the data to create sliding windows
    for i in range(len(data) - window_size - target_days+1):
        X_window = data.iloc[i:i + window_size].drop(columns=data.columns[2:4])  # Data from [i, i+window_size-1]
        #print(X_window[:10])
        y_target = data.iloc[i + window_size:i + window_size + target_days, 2:4]  # Target: Low and High columns
        #print(y_target)
        X.append(X_window)
        y.append(y_target)

    # Convert to NumPy arrays
    X = np.array(X)
    y = np.array(y)

    return X, y

In [11]:
########################################################
#               All-in-1-model design data             #
########################################################
def create_sliding_window_data_a(i_from_window_size_loop, data, window_size, max_window_size, target_days):
    X, y = [], []

    # Loop through the data to create sliding windows
    for i in range(max_window_size, len(data) - target_days+1):
        if i_from_window_size_loop == 0:
            y_target = data.iloc[i :i + target_days, 2:4] 
            #print(y_target)
            y.append(y_target)
        X_window = data.iloc[i-window_size:i].drop(columns=data.columns[2:4]) # Data from [i, i+window_size-1]
        X.append(X_window)


    # Convert to NumPy arrays
    X = np.array(X)
    y = np.array(y)

    return X, y

In [12]:
# base model data indicators
window_sizes = [14, 30, 60, 90, 180] 
max_window_size = max(window_sizes)
target_days = 7 
base_X = {}
base_y = {}
for ticker in data_base:
    data_base[ticker] = indicator_building(data_base[ticker])
    print(f"Data shape for {ticker}: {data_base[ticker].shape}")
    #print(f"Data shape for {ticker}: {featured_data.shape}")
    for window_size in window_sizes:
        X, y = create_sliding_window_data(data_base[ticker], 
                                          window_size=window_size, 
                                          target_days=target_days)
        print(f"Data shape for {ticker} with window size {window_size}:")
        print(f"X: {X.shape}, y: {y.shape}")
        base_X[(ticker, window_size)] = X
        base_y[(ticker, window_size)] = y
    
# ensemble model data indicators
ensemble_X = {}
ensemble_y = {}
for ticker in data_ensemble:
    data_ensemble = indicator_building(data_ensemble[ticker])
    for i in range(len(window_sizes)):
        X, y = create_sliding_window_data_a(i, 
                                          data_ensemble, 
                                          window_size = window_sizes[i], 
                                          max_window_size = max_window_size, 
                                          target_days = target_days)
        print(f"Data shape for {ticker} with window size {window_sizes[i]}:")
        print(f"X: {X.shape}, y: {y.shape}")
        ensemble_X[(ticker, window_sizes[i])] = X
        ensemble_y[(ticker, window_sizes[i])] = y

Data shape for GDXJ: (3422, 56)
Data shape for GDXJ with window size 14:
X: (3402, 14, 54), y: (3402, 7, 2)
Data shape for GDXJ with window size 30:
X: (3386, 30, 54), y: (3386, 7, 2)
Data shape for GDXJ with window size 60:
X: (3356, 60, 54), y: (3356, 7, 2)
Data shape for GDXJ with window size 90:
X: (3326, 90, 54), y: (3326, 7, 2)
Data shape for GDXJ with window size 180:
X: (3236, 180, 54), y: (3236, 7, 2)
Data shape for GDX: (4298, 56)
Data shape for GDX with window size 14:
X: (4278, 14, 54), y: (4278, 7, 2)
Data shape for GDX with window size 30:
X: (4262, 30, 54), y: (4262, 7, 2)
Data shape for GDX with window size 60:
X: (4232, 60, 54), y: (4232, 7, 2)
Data shape for GDX with window size 90:
X: (4202, 90, 54), y: (4202, 7, 2)
Data shape for GDX with window size 180:
X: (4112, 180, 54), y: (4112, 7, 2)
Data shape for GLDM: (1253, 56)
Data shape for GLDM with window size 14:
X: (1233, 14, 54), y: (1233, 7, 2)
Data shape for GLDM with window size 30:
X: (1217, 30, 54), y: (1217, 

In [13]:
#normalize the data
def normalize_data(data):
    for key in data:
        scaler = MinMaxScaler()
        data[key] = scaler.fit_transform(data[key].reshape(-1, data[key].shape[-1])).reshape(data[key].shape)
    return data

In [14]:
base_X_normalized = normalize_data(base_X)
ensemble_X_normalized = normalize_data(ensemble_X)


In [15]:
def marging(base_X_normalized, base_y, window_sizes):
    merged_X = {}
    merged_y = {}

    # Iterate over all window sizes
    for window_size in window_sizes:
        X_list = []  # List to collect all X data for the current window size
        y_list = []  # List to collect all y data for the current window size
        
        # Iterate through the base_X_normalized dictionary to gather data for the current window size
        for (ticker, size), data in base_X_normalized.items():
            if size == window_size:
                X = data  # Assuming the value of each key in the dictionary is a tuple (X, y)
                X_list.append(X)
        for (ticker, size), data in base_y.items():
            if size == window_size:
                y = data  # Assuming the value of each key in the dictionary is a tuple (X, y)
                y_list.append(y)
        
        # Concatenate the X and y data for the current window size across all tickers
        merged_X[window_size] = np.concatenate(X_list, axis=0)  # Concatenate X along the first axis (rows)
        merged_y[window_size] = np.concatenate(y_list, axis=0) 
    return merged_X, merged_y

In [16]:
merged_X, merged_y = marging(base_X_normalized, base_y, window_sizes)


In [17]:
def save_data(data, filename, folder_path):
    #create a folder if it does not exist
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    with open(f"{folder_path}/{filename}", "wb") as file:
        pickle.dump(data, file)

In [18]:
for window_size in window_sizes:
    save_data(merged_X[window_size], f"X_{window_size}days_i.pkl", "data")
    save_data(merged_y[window_size], f"y_{window_size}days_i.pkl", "data")

In [20]:
for window_size in window_sizes:
    save_data(ensemble_X_normalized[(target_stock[0], window_size)], f"X_{window_size}days_a.pkl", "data")
    save_data(ensemble_y[(target_stock[0], window_sizes[0])], f"y_a.pkl", "data") #because we only build the y in the first itration, other is none

In [21]:
def difference (data):
    #the data would be a df from the raw data
    #we will difference the data with the shift function
    #return the differenced data
    return data