# IMPORTS

In [None]:
import pandas as pd
import pandas_ta as ta
import numpy as np
from matplotlib import pyplot as plt

# GRAPHERS

In [None]:
def plot_signals(df, source_col, signal_col, figsize=(10, 6)):
    """
    Plot the price data along with buy/sell/hold signals.
    
    Args:
        df (pd.DataFrame): DataFrame containing the data.
        source_col (str): Name of the source column (e.g., close prices) in the DataFrame.
        signal_col (list): List of signal column names in the DataFrame.
        figsize (tuple): Figure size (width, height).
    """
    # Plot the price data
    ax = df[list(set(df.columns) - set(signal_col))].plot(figsize=figsize, label="Price")
    
    # Get the indices where each signal occurs
    for col in signal_col:
        buy_indices = df.index[df[col] == 1]
        sell_indices = df.index[df[col] == -1]
        hold_indices = df.index[df[col] == 999]

        # Plot buy signals (signal = 1) as green triangles
        if len(buy_indices) > 0:
            ax.scatter(buy_indices, df.loc[buy_indices, source_col], color='green', marker='^', label=f'Buy Signal - {col}', s=80)

        # Plot sell signals (signal = -1) as red triangles
        if len(sell_indices) > 0:
            ax.scatter(sell_indices, df.loc[sell_indices, source_col], color='red', marker='v', label=f'Sell Signal - {col}', s=80)

        # Plot hold signals (signal = 0) as blue circles
        if len(hold_indices) > 0:
            ax.scatter(hold_indices, df.loc[hold_indices, source_col], color='blue', marker='o', label=f'Hold Signal - {col}', s=80)

    plt.xlabel("Date")
    plt.ylabel("Price")
    plt.title("Price with Buy/Sell/Hold Signals")
    plt.legend()
    plt.grid(True)
    plt.show()


# SIGNAL METHODS

In [None]:
def compute_target_variables(open_series, high_series, low_series, close_series, volume_series):
    """
    Compute the target variables indicating bullish or bearish candle, percentage change in prices,
    and additional targets based on price movement and volume.
    
    Args:
        open_series (pd.Series): Series containing the opening prices.
        high_series (pd.Series): Series containing the highest prices.
        low_series (pd.Series): Series containing the lowest prices.
        close_series (pd.Series): Series containing the closing prices.
        volume_series (pd.Series): Series containing the volume data.
        
    Returns:
        pd.DataFrame: DataFrame with the target variables and percentage changes.
    """
    # Create a new DataFrame to store the computed features
    new_df = pd.DataFrame(index=open_series.index)
    
    # Calculate percentage change for each price (current day)
    new_df['open_change_intermediate'] = open_series.pct_change()
    new_df['high_change_intermediate'] = high_series.pct_change()
    new_df['low_change_intermediate'] = low_series.pct_change()
    new_df['close_change_intermediate'] = close_series.pct_change()
    
    # Determine bullish or bearish candle (current day)
    new_df['close_bullish_bearish_signal'] = np.where(close_series > open_series, 1, -1)
    
    # Compute overall change in price (drop) (current day)
    new_df['price_change_intermediate'] = close_series - close_series.shift(1)
    
    # Calculate percentage change for each price (previous day)
    new_df['prev_open_change_intermediate'] = open_series.shift(1).pct_change()
    new_df['prev_high_change_intermediate'] = high_series.shift(1).pct_change()
    new_df['prev_low_change_intermediate'] = low_series.shift(1).pct_change()
    new_df['prev_close_change_intermediate'] = close_series.shift(1).pct_change()
    
    # Determine if today's closing price is higher than yesterday's price and recorded higher highs and higher lows
    new_df['price_increase_and_higher_highs_lows_signal'] = np.where(
        (close_series > close_series.shift(1)) &
        (high_series > high_series.shift(1)) &
        (low_series > low_series.shift(1)),
        1, -1
    )
    
    # Compute percentage change in volume
    new_df['volume_change_intermediate'] = volume_series.pct_change()
    new_df['volume_bullish_bearish_signal'] = np.where(volume_series > volume_series.shift(1), 1, -1)

    
    return new_df


In [None]:
def calculate_k_majority(src: pd.Series, k: int):
    """
    Calculate the majority class based on the count of up candles in the past k rows.
    
    Args:
        src (pd.Series): Series containing the source data (e.g., close prices).
        k (int): Number of past rows to consider for majority calculation.
    
    Returns:
        pd.Series: A Series containing the majority class for each element in the source data.
    """
    # Calculate the target label indicating whether each candle is an up or down candle
    up_down = (src.diff() > 0).astype(int)

    # Use a rolling window to count the number of up candles in the past k rows
    up_count = up_down.rolling(window=k).sum()

    # Determine the majority class based on the count
    k_up_majority = up_count.apply(lambda x: 1 if x >= k / 2 else -1)

    return k_up_majority


In [None]:
def compute_ma_kpi(source: str, kind: str, ma_short: pd.Series, ma_long: pd.Series, short_window: int, long_window: int) -> pd.DataFrame:
    """
    Compute key performance indicators (KPIs) for moving averages (MA) based on two MA series.
    
    Args:
        source (str): The source of the moving averages (e.g., "close", "volume").
        ma_short (pd.Series): Series containing the short-term MA values.
        ma_long (pd.Series): Series containing the long-term MA values.
        short_window (int): Window length for the short-term MA.
        long_window (int): Window length for the long-term MA.
    
    Returns:
        pd.DataFrame: DataFrame with MA KPI metrics computed.
    """
    # Initialize an empty DataFrame to store the computed KPIs
    kpi_df = pd.DataFrame(index=ma_short.index)
    
    # Calculate MA crossover signals
    golden_cross = np.where((ma_short > ma_long) & (ma_short.shift(1) < ma_long.shift(1)), 1, 0)
    death_cross = np.where((ma_short < ma_long) & (ma_short.shift(1) > ma_long.shift(1)), -1, 0)

    kpi_df[f'{source}_{kind}_{short_window}_{long_window}_golden_cross_signal_intermediate'] = golden_cross
    kpi_df[f'{source}_{kind}_{short_window}_{long_window}_death_cross_signal_intermediate'] = death_cross
    
    # Combine signals into a single column
    signals = np.where(golden_cross == 1, 1, np.where(death_cross == -1, -1, 0))
    
    # Add computed KPIs to the DataFrame
    kpi_df[f'{source}_{kind}_{short_window}_{long_window}_combined_signal'] = signals
    
    return kpi_df


In [None]:
def generate_rsi_signals(rsi_series: pd.Series, length: str|int) -> pd.DataFrame:
    """
    Generate boolean signals for overbought/oversold conditions based on RSI indicator.

    Args:
        rsi_series (pd.Series): Series containing the RSI values.

    Returns:
        pd.DataFrame: DataFrame with boolean signals for buy (entry) and sell (exit) conditions.
    """
    # Initialize DataFrame to store signals
    signals_df = pd.DataFrame(index=rsi_series.index)

    # Generate signals for RSI: Overbought (> 70) and Oversold (< 30)
    signals_df['rsi_overbought'] = np.where(rsi_series > 70, -1, 0)
    signals_df['rsi_oversold'] = np.where(rsi_series < 30, 1, 0)

    # Combine signals into a single column
    signals_df[f"rsi_{length}_combined_signal"] = np.where(signals_df['rsi_overbought'] == -1, -1, np.where(signals_df['rsi_oversold'] == 1, 1, 0))

    return signals_df


def generate_macd_signals(macd_series: pd.Series, macd_signal_series: pd.Series, macd_histogram_series: pd.Series) -> pd.DataFrame:
    """
    Generate boolean signals for reversal and other metrics based on MACD indicators.

    Args:
        macd_series (pd.Series): Series containing the MACD Line values.
        macd_signal_series (pd.Series): Series containing the MACD Signal Line values.
        macd_histogram_series (pd.Series): Series containing the MACD Histogram values.

    Returns:
        pd.DataFrame: DataFrame with boolean signals for reversal and other metrics.
    """
    # Initialize DataFrame to store signals
    signals_df = pd.DataFrame(index=macd_series.index)

    # Generate signals for MACD Line and Signal Line Crosses
    signals_df['macd_cross_above_signal'] = np.where(macd_series > macd_signal_series, 1, 0)
    signals_df['macd_cross_below_signal'] = np.where(macd_series < macd_signal_series, -1, 0)
    signals_df['macd_signal_cross_combined_signal'] = np.where(signals_df['macd_cross_below_signal'] == -1, -1, np.where(signals_df['macd_cross_above_signal'] == 1, 1, 0))

    # Generate signals for MACD Histogram Patterns (Divergences)
    signals_df['bullish_divergence'] = np.where((macd_histogram_series > 0) & (macd_histogram_series.shift(1) < 0), 1, 0)
    signals_df['bearish_divergence'] = np.where((macd_histogram_series < 0) & (macd_histogram_series.shift(1) > 0), -1, 0)
    signals_df['macd_hist_cross_combined_signal'] = np.where(signals_df['bearish_divergence'] == -1, -1, np.where(signals_df['bullish_divergence'] == 1, 1, 0))

    # Generate signals for MACD Line and Price Divergence
    signals_df['bullish_price_macd_divergence'] = np.where((macd_series > 0) & (macd_series.shift(1) < 0), 1, 0)
    signals_df['bearish_price_macd_divergence'] = np.where((macd_series < 0) & (macd_series.shift(1) > 0), -1, 0)
    signals_df['macd_divergence_cross_combined_signal'] = np.where(signals_df['bearish_price_macd_divergence'] == -1, -1, np.where(signals_df['bullish_price_macd_divergence'] == 1, 1, 0))

    # Generate signals for MACD Line and Signal Line Strength
    signals_df['macd_line_strength'] = np.where(macd_series.diff().abs() > 100, 1, 0)
    signals_df['macd_signal_strength'] = np.where(macd_signal_series.diff().abs() > 100, 1, 0)

    # Generate signals for MACD Line and Signal Line Convergence
    signals_df['macd_convergence'] = np.where((macd_series.diff() * macd_signal_series.diff()).shift(1) < 0, 1, 0)

    return signals_df

def generate_kdj_signals(k_series: pd.Series, d_series: pd.Series, j_series: pd.Series) -> pd.DataFrame:
    """
    Generate signals based on KDJ indicator values.

    Args:
        k_series (pd.Series): Series containing the K line values of the KDJ indicator.
        d_series (pd.Series): Series containing the D line values of the KDJ indicator.
        j_series (pd.Series): Series containing the J line values of the KDJ indicator.

    Returns:
        pd.DataFrame: DataFrame with signals for buy (entry) and sell (exit) conditions.
    """
    # Initialize DataFrame to store signals
    signals_df = pd.DataFrame(index=k_series.index)

    # Generate signals for KDJ: Buy when K crosses above D and J
    signals_df['kdj_buy_signal'] = np.where((k_series > d_series) & (k_series > j_series), 1, 0)

    # Generate signals for KDJ: Sell when K crosses below D and J
    signals_df['kdj_sell_signal'] = np.where((k_series < d_series) & (k_series < j_series), -1, 0)

    # Generate combined signal
    signals_df['kdj_combined_signal'] = np.where(signals_df['kdj_sell_signal'] == -1, -1, np.where(signals_df['kdj_buy_signal'] == 1, 1, 0))

    return signals_df


def generate_vwap_signals(vwap_series: pd.Series, close_series: pd.Series) -> pd.DataFrame:
    """
    Generate signals based on VWAP (Volume Weighted Average Price).

    Args:
        vwap_series (pd.Series): Series containing the VWAP values.
        close_series (pd.Series): Series containing the closing prices.

    Returns:
        pd.DataFrame: DataFrame with signals for buy (entry) and sell (exit) conditions.
    """
    # Initialize DataFrame to store signals
    signals_df = pd.DataFrame(index=close_series.index)

    # Generate signals for VWAP: Buy when close price is above VWAP
    signals_df['vwap_buy_signal'] = np.where(close_series > vwap_series, 1, 0)

    # Generate signals for VWAP: Sell when close price is below VWAP
    signals_df['vwap_sell_signal'] = np.where(close_series < vwap_series, -1, 0)

    # Generate combined signal
    signals_df['vwap_combined_signal'] = np.where(signals_df['vwap_sell_signal'] == -1, -1, np.where(signals_df['vwap_buy_signal'] == 1, 1, 0))

    return signals_df


def generate_lr_signals(lr_series: pd.Series, close_series: pd.Series) -> pd.DataFrame:
    """
    Generate signals based on Linear Regression (LR).

    Args:
        lr_series (pd.Series): Series containing the LR values.
        close_series (pd.Series): Series containing the closing prices.

    Returns:
        pd.DataFrame: DataFrame with signals for buy (entry) and sell (exit) conditions.
    """
    # Initialize DataFrame to store signals
    signals_df = pd.DataFrame(index=close_series.index)

    # Generate signals for LR: Buy when close price is above LR
    signals_df['lr_buy_signal'] = np.where(close_series > lr_series, 1, 0)

    # Generate signals for LR: Sell when close price is below LR
    signals_df['lr_sell_signal'] = np.where(close_series < lr_series, -1, 0)

    # Generate combined signal
    signals_df['lr_combined_signal'] = np.where(signals_df['lr_sell_signal'] == -1, -1, np.where(signals_df['lr_buy_signal'] == 1, 1, 0))


    return signals_df


def generate_candlestick_signal(ohlc4_series: pd.Series) -> pd.DataFrame:
    """
    Generate boolean signals for up and down candlesticks based on OHLC4 (Open, High, Low, Close) values.

    Args:
        ohlc4_series (pd.Series): Series containing the OHLC4 (Open, High, Low, Close) values.

    Returns:
        pd.DataFrame: DataFrame with boolean signals for up (1) and down (-1) candlesticks.
    """
    # Calculate the difference between Close and Open prices
    candlestick_signal = ohlc4_series.diff()

    # Generate signal: 1 for up candlestick, -1 for down candlestick
    candlestick_signal = np.where(candlestick_signal > 0, 1, -1)

    # Create a DataFrame to store the signals
    signals_df = pd.DataFrame(index=ohlc4_series.index)
    signals_df['candlestick_signal'] = candlestick_signal

    return signals_df


def generate_adx_signals(adx_series: pd.Series, dmp_series: pd.Series, dmn_series: pd.Series) -> pd.DataFrame:
    """
    Generate signals for entry and exit based on the ADX (Average Directional Index) indicator.

    Args:
        adx_series (pd.Series): Series containing the ADX values.
        dmp_series (pd.Series): Series containing the DMP (Directional Movement Plus) values.
        dmn_series (pd.Series): Series containing the DMN (Directional Movement Minus) values.

    Returns:
        pd.DataFrame: DataFrame with signals for entry and exit conditions.
    """
    # Initialize DataFrame to store signals
    signals_df = pd.DataFrame(index=adx_series.index)

    # Generate signals for entry when ADX is rising and above a certain threshold
    entry_condition = (adx_series > adx_series.shift(1)) & (adx_series > 25)  # Adjust threshold as needed
    signals_df['entry_signal'] = np.where(entry_condition, 1, 0)

    # Generate signals for exit when ADX is falling below a certain threshold
    exit_condition = (adx_series < adx_series.shift(1)) & (adx_series < 20)  # Adjust threshold as needed
    signals_df['exit_signal'] = np.where(exit_condition, -1, 0)

    # Combine entry and exit signals into a single column
    signals_df['combined_signal'] = signals_df['entry_signal'] + signals_df['exit_signal']

    # Additional signals (you can customize these):
    # 1. DI lines crossover (use dmp_series and dmn_series)
    # 2. ADX level (above 20 for entry, below 20 for exit)
    # 3. ADX slope (compare ADX with its moving average)

    # Example: Add DI lines crossover signal
    di_crossover_condition = (dmp_series > dmn_series) & (dmp_series.shift(1) < dmn_series.shift(1))
    signals_df['di_crossover_signal'] = np.where(di_crossover_condition, 1, 0)

    return signals_df

In [None]:
# # if closeprice went bullish or bearish
# data = pd.concat([df], axis=1)
# data_ = data[["close", "CDL_INSIDE"]]["2022-01":"2023-01"]
# # data_ = data[["close", "CDL_INSIDE"]]
# plot_signals(data_, source_col="close", signal_col=["CDL_INSIDE"])

# adx_signals = generate_adx_signals(df.ADX_14, df.DMP_14, df.DMN_14)
# # # if closeprice went bullish or bearish
# # data = pd.concat([df, adx_signals], axis=1)
# # data_ = data[["close", "combined_signal"]]["2021-08":"2021-10"]
# # # data_ = data[["close", "combined_signal"]]
# # plot_signals(data_, source_col="close", signal_col=["combined_signal"])

# ohlc4_signal = generate_candlestick_signal(ohlc4_series=df.OHLC4)
# ohlc4_signal
# # # if closeprice went bullish or bearish
# # data = pd.concat([df, ohlc4_signal], axis=1)
# # data_ = data[["close", "candlestick_signal"]]["2021-08":"2021-10"]
# # # data_ = data[["close", "candlestick_signal"]]
# # plot_signals(data_, source_col="close", signal_col=["candlestick_signal"])


# lr_signals = generate_vwap_signals(vwap_series=df.LR_14, close_series=df.close)
# lr_signals

# vwap_signals = generate_vwap_signals(vwap_series=df.VWAP_D, close_series=df.close)
# # if closeprice went bullish or bearish
# data = pd.concat([df, vwap_signals], axis=1)
# data_ = data[["close", "vwap_combined_signal"]]["2021-08":"2021-10"]
# # data_ = data[["close", "rsi_14_combined_signal"]]
# plot_signals(data_, source_col="close", signal_col=["vwap_combined_signal"])7


# macd_signals = generate_macd_signals(macd_series=df.MACD_12_26_9, macd_histogram_series=df.MACDh_12_26_9, macd_signal_series=df.MACDs_12_26_9)
# macd_signals
# # if closeprice went bullish or bearish
# data = pd.concat([df, macd_signals], axis=1)
# data_ = data[["close", "macd_signal_cross_combined_signal"]]["2021-08":"2021-10"]
# # data_ = data[["close", "rsi_14_combined_signal"]]
# plot_signals(data_, source_col="close", signal_col=["macd_signal_cross_combined_signal"])

# # if closeprice went bullish or bearish
# data = pd.concat([df, macd_signals], axis=1)
# data_ = data[["close", "macd_hist_cross_combined_signal"]]["2021-08":"2021-10"]
# # data_ = data[["close", "rsi_14_combined_signal"]]
# plot_signals(data_, source_col="close", signal_col=["macd_hist_cross_combined_signal"])

# # if closeprice went bullish or bearish
# data = pd.concat([df, macd_signals], axis=1)
# data_ = data[["close", "macd_divergence_cross_combined_signal"]]["2021-08":"2021-10"]
# # data_ = data[["close", "rsi_14_combined_signal"]]
# plot_signals(data_, source_col="close", signal_col=["macd_divergence_cross_combined_signal"])

# # if closeprice went bullish or bearish
# data = pd.concat([df, macd_signals], axis=1)
# data_ = data[["close", "macd_convergence"]]["2021-08":"2021-10"]
# # data_ = data[["close", "rsi_14_combined_signal"]]
# plot_signals(data_, source_col="close", signal_col=["macd_convergence"])

# # if closeprice went bullish or bearish
# data = pd.concat([df, macd_signals], axis=1)
# data_ = data[["close", "macd_signal_strength"]]["2021-08":"2021-10"]
# # data_ = data[["close", "rsi_14_combined_signal"]]
# plot_signals(data_, source_col="close", signal_col=["macd_signal_strength"])

# # if closeprice went bullish or bearish
# data = pd.concat([df, macd_signals], axis=1)
# data_ = data[["close", "macd_line_strength"]]["2021-08":"2021-10"]
# # data_ = data[["close", "rsi_14_combined_signal"]]
# plot_signals(data_, source_col="close", signal_col=["macd_line_strength"])

# rsi_base_signal = generate_rsi_signals(df.RSI_14, 14)
# rsi_base_signal[rsi_base_signal.rsi_14_combined_signal==-1]
# # if closeprice went bullish or bearish
# data = pd.concat([df, rsi_base_signal], axis=1)
# data_ = data[["close", "rsi_14_combined_signal"]]["2021-08":"2022-10"]
# # data_ = data[["close", "rsi_14_combined_signal"]]
# plot_signals(data_, source_col="close", signal_col=["rsi_14_combined_signal"])

# DATA LOADING

In [None]:
column_names = ["timestamp", "open", "high", "low", "close", "volume", "trades"]

In [None]:
df = pd.read_csv("Kraken_OHLCVT/ETHUSD_720.csv", names=column_names, index_col="timestamp")
df.index = pd.to_datetime(df.index, unit='s')

In [None]:
df.tail()

# INDICATORS

In [None]:
CustomStrategy = ta.Strategy(
    name="Trend, Volume, and Momentum Analysis",
    description="Calculates SMA, EMA, RSI, MACD, and KDJ for trend, volume, and momentum analysis.",
    ta=[
        # Simple Moving Averages (SMA) for close prices
        {"kind": "sma", "length": 200, "prefix": "CLOSE"},
        {"kind": "sma", "length": 5, "prefix": "CLOSE"},
        {"kind": "sma", "length": 9, "prefix": "CLOSE"},
        {"kind": "sma", "length": 13, "prefix": "CLOSE"},
        {"kind": "sma", "length": 21, "prefix": "CLOSE"},
        # Exponential Moving Averages (EMA) for close prices
        {"kind": "ema", "length": 200, "prefix": "CLOSE"},
        {"kind": "ema", "length": 5, "prefix": "CLOSE"},
        {"kind": "ema", "length": 9, "prefix": "CLOSE"},
        {"kind": "ema", "length": 13, "prefix": "CLOSE"},
        {"kind": "ema", "length": 21, "prefix": "CLOSE"},
        # Simple Moving Averages (SMA) for volume
        {"kind": "sma", "close": "volume", "length": 200, "prefix": "VOLUME"},
        {"kind": "sma", "close": "volume", "length": 5, "prefix": "VOLUME"},
        {"kind": "sma", "close": "volume", "length": 9, "prefix": "VOLUME"},
        {"kind": "sma", "close": "volume", "length": 13, "prefix": "VOLUME"},
        {"kind": "sma", "close": "volume", "length": 21, "prefix": "VOLUME"},
        # Exponential Moving Averages (EMA) for volume
        {"kind": "ema", "close": "volume", "length": 200, "prefix": "VOLUME"},
        {"kind": "ema", "close": "volume", "length": 5, "prefix": "VOLUME"},
        {"kind": "ema", "close": "volume", "length": 9, "prefix": "VOLUME"},
        {"kind": "ema", "close": "volume", "length": 13, "prefix": "VOLUME"},
        {"kind": "ema", "close": "volume", "length": 21, "prefix": "VOLUME"},
        # Relative Strength Index (RSI)
        {"kind": "rsi", "period": "14"},
        # Moving Average Convergence Divergence (MACD)
        {"kind": "macd", "fast": 12, "slow": 26, "signal": 9},
        # Stochastic Oscillator (KDJ)
        # {"kind": "kdj"},
        {"kind": "vwap"},
        {"kind": "linreg"},
        {"kind": "ohlc4"},
        {"kind": "adx"},
        {"kind": "cdl_pattern"},
    ]
)
# To run your "Custom Strategy"
df.ta.strategy(CustomStrategy)


In [None]:
df.columns

In [None]:
target_kpis = compute_target_variables(df.open, df.high, df.low, df.close, df.volume)
df["target"] = target_kpis.close_bullish_bearish_signal

# STRATEGIES

#### Targets Labels

In [None]:
target_kpis

In [None]:
data = pd.concat([df, target_kpis], axis=1)

In [None]:
# if closeprice went bullish or bearish
data_ = data[["close", "close_bullish_bearish_signal"]]["2021-08":"2021-10"]
# data_ = data[["close", "close_bullish_bearish_signal"]]
plot_signals(data_, source_col="close", signal_col=["close_bullish_bearish_signal"])

In [None]:
# if volume went bullish or bearish
data_ = data[["volume", "volume_bullish_bearish_signal"]]["2021-08":"2021-10"]
# data_ = data[["volume", "volume_bullish_bearish_signal"]]
plot_signals(data_, source_col="volume", signal_col=["volume_bullish_bearish_signal"])

In [None]:
# if higher highers and high lows indications
data_ = data[["close", "price_increase_and_higher_highs_lows_signal"]]["2021-08":"2021-10"]
# data_ = data[["close", "price_increase_and_higher_highs_lows_signal"]]
plot_signals(data_, source_col="close", signal_col=["price_increase_and_higher_highs_lows_signal"])

In [None]:
df = pd.concat([df, target_kpis], axis=1)

#### MA Golden, Death & Combined Signal

In [None]:
# MA crossover golden, death and combined signal
close_ma_kpis = compute_ma_kpi(source="close", kind="sma", short_window=13, long_window=200, ma_short=df.CLOSE_SMA_13, ma_long=df.CLOSE_SMA_200)
close_ma_kpis

In [None]:
df = pd.concat([df, close_ma_kpis], axis=1)

In [None]:
# data_ = data[["close", "CLOSE_SMA_13", "CLOSE_SMA_200", "close_sma_13_200_combined_signal"]]["2021-06":"2022-02"]
data_ = data[["close", "CLOSE_SMA_13", "CLOSE_SMA_200", "close_sma_13_200_combined_signal"]]
plot_signals(data_, source_col="close", signal_col=["close_sma_13_200_combined_signal"])

In [None]:
# data_ = data[["close", "CLOSE_SMA_13", "CLOSE_SMA_200", "close_sma_13_200_golden_cross_signal_intermediate"]]["2021-06":"2022-02"]
data_ = data[["close", "CLOSE_SMA_13", "CLOSE_SMA_200", "close_sma_13_200_golden_cross_signal_intermediate"]]
plot_signals(data_, source_col="close", signal_col=["close_sma_13_200_golden_cross_signal_intermediate"])

In [None]:
# data_ = data[["close", "CLOSE_SMA_13", "CLOSE_SMA_200", "close_sma_13_200_death_cross_signal_intermediate"]]["2021-06":"2022-02"]
data_ = data[["close", "CLOSE_SMA_13", "CLOSE_SMA_200", "close_sma_13_200_death_cross_signal_intermediate"]]
plot_signals(data_, source_col="close", signal_col=["close_sma_13_200_death_cross_signal_intermediate"])

#### K_MAJORITY

In [None]:
# KNN-like
k=5

# past k up candles to classify current
df["close_k_past_up_majority_signal"] = calculate_k_majority(df["close"], k)
df["volume_k_past_up_majority_signal"] = calculate_k_majority(df["volume"], k)


In [None]:
data_ = df[["close", "close_k_past_up_majority_signal"]]["2021-06":"2022-02"]
# data_ = df[["close", "close_k_past_up_majority_signal"]]
plot_signals(data_, source_col="close", signal_col=["close_k_past_up_majority_signal"])

#### RANDOM FOREST

In [None]:
# Import necessary libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix


In [None]:
# Load OHLCV data with target
data = df.copy().dropna()
data.head(5)

In [None]:
# Split data into features (X) and target variable (y)
X = data.drop(columns=['target'])  # Features
y = data['target']  # Target variable

In [None]:
# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# Initialize Random Forest classifier
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=0)

In [None]:
rf_classifier

In [None]:
# Train the classifier
rf_classifier.fit(X_train, y_train)

In [None]:
X_train.shape

In [None]:
# save model
import pickle

with open('models/random_forest_up_down_classifier_data_720.pkl', 'wb') as f:
    pickle.dump(rf_classifier, f)

# load model from file 
# with open('models/random_forest_up_down_classifier.pkl', 'rb') as f:
#     rf = pickle.load(f)

In [None]:
# Make predictions on the testing set
y_pred = rf_classifier.predict(X_test)

In [None]:
# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)

In [None]:
# Print classification report
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

In [None]:
# Print confusion matrix
print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))

#### XGBOOST

In [None]:
# Import necessary libraries
import pandas as pd
from sklearn.model_selection import train_test_split
import xgboost as xgb
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

In [None]:
# Load OHLCV data with target
data = df.copy().dropna()
data["target"] = np.where(data["target"]==1, 1, 0)
data.head(5)

In [None]:
# Split data into features (X) and target variable (y)
X = data.drop(columns=['target'])  # Features
y = data['target']  # Target variable

In [None]:
# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

In [None]:
# Initialize XGBoost classifier
xgb_classifier = xgb.XGBClassifier(objective='binary:logistic', random_state=0)

In [None]:
# Train the classifier
xgb_classifier.fit(X_train, y_train)

In [None]:
# save model
import pickle

with open('models/xgb_classifier_up_down_classifier_data_720.pkl', 'wb') as f:
    pickle.dump(xgb_classifier, f)

# load model from file 
# with open('models/xgb_classifier_up_down_classifier.pkl', 'rb') as f:
#     rf = pickle.load(f)

In [None]:
# Make predictions on the testing set
y_pred = xgb_classifier.predict(X_test)

In [None]:
# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)

In [None]:
# Print classification report
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

In [None]:
# Print confusion matrix
print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))

#### LSTM

In [None]:
# Import necessary libraries
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
from keras.optimizers import Adam
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

In [None]:
# Load OHLCV data with target
data = df.copy().dropna()
data["target"] = np.where(data["target"]==1, 1, 0)
data.head(5)

In [None]:
# Split data into features (X) and target variable (y)
X = data.drop(columns=['target'])  # Features
y = data['target']  # Target variable

In [None]:
# Scale features
scaler = MinMaxScaler(feature_range=(0, 1))
X_scaled = scaler.fit_transform(X)

In [None]:
# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

In [None]:
# Reshape features for LSTM input (samples, timesteps, features)
X_train = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1]))
X_test = np.reshape(X_test, (X_test.shape[0], 1, X_test.shape[1]))

In [None]:
# Initialize LSTM model
model = Sequential()
model.add(LSTM(units=50, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dropout(0.2))
model.add(LSTM(units=50, return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(units=50))
model.add(Dropout(0.2))
model.add(Dense(units=1, activation='sigmoid'))


In [None]:
# Compile model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
# Train model
model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_test, y_test))

In [None]:
# save model

model.save('models/lstm_up_down_classifier_data_720.h5')

In [None]:
# Evaluate model
y_pred_prob = model.predict(X_test)
y_pred = (y_pred_prob > 0.5).astype(int)  # Convert probabilities to binary predictions
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)

In [None]:
# Print classification report
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

In [None]:
# Print confusion matrix
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# Compute confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Plot confusion matrix
plt.figure()
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Bearish', 'Bullish'], yticklabels=['Bearish', 'Bullish'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

# THE END