In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import yfinance as yf
import numpy as np
import pandas as pd

## Create features 

In [None]:
def continuous_data_features(df):
    indicators = pd.DataFrame(index=df.index)

    close = df["Close"]
    high = df["High"]
    low = df["Low"]
    vol = df["Volume"]

    # Simple Moving Average 10 day
    indicators["SMA10"] = close.rolling(window=10).mean()

    # Weighted Moving Average 10 day
    weights = np.arange(1, 11)
    indicators["WMA10"] = close.rolling(10).apply(lambda x: np.dot(x, weights)/weights.sum(), raw=True)

    # Momentum 10 day
    indicators["MOM10"] = close - close.shift(10)

    # Stochastic Oscillator %K 14 day
    indicators["STOCHK"] = 100 * (close - low.rolling(14).min()) / (high.rolling(14).max() - low.rolling(14).min())

    # Stochastic Oscillator %D 3 day SMA of %K
    indicators["STOCHD"] = indicators["STOCHK"].rolling(3).mean()

    # Relative Strength Index 14 day 
    delta = close.diff()
    up = delta.clip(lower=0)
    down = -1 * delta.clip(upper=0)
    rs = up.rolling(14).mean() / down.rolling(14).mean()
    indicators["RSI14"] = 100 - (100 / (1 + rs))

    # MACD 12-day EMA - 26 day EMA
    ema12 = close.ewm(span=12, adjust=False).mean()
    ema26 = close.ewm(span=26, adjust=False).mean()
    indicators["MACD"] = ema12 - ema26

    # Williams %R 14 day
    highest14 = high.rolling(14).max()
    lowest14 = low.rolling(14).min()
    indicators["WILLR"] = -100 * (highest14 - close) / (highest14 - lowest14)

    # 9. Accumulation/Distribution Oscillator (ADOSC)
    clv = np.where((high - low) == 0, 0, ((close - low) - (high - close)) / (high - low))
    adl = (clv * vol).cumsum()

    ema3_adl = adl.ewm(span=3, adjust=False).mean()
    ema10_adl = adl.ewm(span=10, adjust=False).mean()

    adosc = ema3_adl - ema10_adl

    # Scale ADOSC to 0–100 like in papers
    min_val = adosc.min()
    max_val = adosc.max()
    indicators["ADOSC"] = 100 * (adosc - min_val) / (max_val - min_val)


    # 10. Commodity Channel Index 20 day
    tp = (high + low + close) / 3
    sma = tp.rolling(20).mean()
    mad = (tp - sma).abs().rolling(20).mean()
    indicators["CCI20"] = (tp - sma) / (0.015 * mad)

    indicators["Close"] = close

    return indicators

In [None]:
def trend_deterministic_data(indicators, df):
    trend = pd.DataFrame(index=indicators.index)

    close = indicators["Close"]
    high = df["High"]
    low = df["Low"]
    vol = df["Volume"]

    #SMA
    trend["SMA10_T"] = np.where(close > indicators["SMA10"], 1, -1)

    #WMA
    trend["WMA10_T"] = np.where(close > indicators["WMA10"], 1, -1)

    #Momentum
    trend["MOM10_T"] = np.where(indicators["MOM10"] > 0, 1, -1)

    # Sophisticated Oscillator trends
    trend["STOCHK_T"] = np.where(indicators["STOCHK"] > indicators["STOCHK"].shift(1), 1, -1)
    trend["STOCHD_T"] = np.where(indicators["STOCHD"] > indicators["STOCHD"].shift(1), 1, -1)

    #RSI trend
    rsi = indicators["RSI14"]
    trend["RSI14_T"] = np.select([rsi > 70, rsi < 30, rsi > rsi.shift(1)],
                                 [-1, 1, 1], default=-1)
    #MACD trend
    trend["MACD_T"] = np.where(indicators["MACD"] > indicators["MACD"].shift(1), 1, -1)

    #Williams %R trend
    trend["WILLR_T"] = np.where(indicators["WILLR"] > indicators["WILLR"].shift(1), 1, -1)

    #ADOSC trend
    trend["ADOSC_T"] = np.where(indicators["ADOSC"] > indicators["ADOSC"].shift(1), 1, -1)

    #CCI trend
    cci = indicators["CCI20"]
    trend["CCI20_T"] = np.select([cci > 200, cci < -200, cci > cci.shift(1)],
                                 [-1, 1, 1], default=-1)
    return trend
    

In [None]:
tickers = ["MSFT", "AMZN", "^GSPC", "^DJI"]
results = []

In [None]:
class StockANN(nn.Module):
    def __init__(self, input_size, hidden_neurons):
        super(StockANN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_neurons)
        self.tanh = nn.Tanh()
        self.fc2 = nn.Linear(hidden_neurons, 1)
        self.output_act = nn.Tanh()

    def forward(self, x):
        x = self.tanh(self.fc1(x))
        x = self.output_act(self.fc2(x))
        return x

In [None]:
def continuous_ann_pytorch(X, y):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=42, shuffle=True)

    scaler = MinMaxScaler(feature_range=(-1, 1))
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    X_train_t = torch.tensor(X_train_scaled, dtype=torch.float32)
    y_train_t = torch.tensor(y_train.values.reshape(-1, 1), dtype=torch.float32)
    X_test_t = torch.tensor(X_test_scaled, dtype=torch.float32)
    y_test_t = torch.tensor(y_test.values.reshape(-1, 1), dtype=torch.float32)

    dataset = TensorDataset(X_train_t, y_train_t)
    loader = DataLoader(dataset, batch_size=32, shuffle=True)

    hidden_neurons = [10, 20, 50, 100]
    epochs_list = [1000, 2000, 5000]
    momentum_values = [0.1, 0.3, 0.5, 0.9]
    learning_rate = 0.1

    results = []

    for n in hidden_neurons:
        for ep in epochs_list:
            for mc in momentum_values:
                model = StockANN(X_train.shape[1], n)
                criterion = nn.MSELoss()
                optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=mc)

                for epoch in range(ep):
                    for inputs, labels in loader:
                        optimizer.zero_grad()
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)
                        loss.backward()
                        optimizer.step()

                with torch.no_grad():
                    preds = model(X_test_t).flatten()
                    preds = torch.where(preds > 0, 1, -1).numpy()
                    acc = accuracy_score(y_test, preds)
                    f1 = f1_score(y_test, preds)

                results.append({
                    'Hidden Neurons': n,
                    'Epochs': ep,
                    'Momentum': mc,
                    'Accuracy': acc,
                    'F1': f1
                })

    df_results = pd.DataFrame(results)
    best = df_results.loc[df_results['Accuracy'].idxmax()]
    print("✅ Best Continuous ANN Parameters:")
    print(best)
    return df_results, best


In [None]:
def trend_ann_pytorch(X, y):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=42, shuffle=True)

    scaler = MinMaxScaler(feature_range=(-1, 1))
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    X_train_t = torch.tensor(X_train_scaled, dtype=torch.float32)
    y_train_t = torch.tensor(y_train.values.reshape(-1, 1), dtype=torch.float32)
    X_test_t = torch.tensor(X_test_scaled, dtype=torch.float32)
    y_test_t = torch.tensor(y_test.values.reshape(-1, 1), dtype=torch.float32)

    dataset = TensorDataset(X_train_t, y_train_t)
    loader = DataLoader(dataset, batch_size=32, shuffle=True)

    hidden_neurons = [10, 20, 50, 100]
    epochs_list = [1000, 2000, 5000]
    momentum_values = [0.1, 0.3, 0.5, 0.9]
    learning_rate = 0.1

    results = []

    for n in hidden_neurons:
        for ep in epochs_list:
            for mc in momentum_values:
                model = StockANN(X_train.shape[1], n)
                criterion = nn.BCELoss()  # Binary classification
                optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=mc)

                for epoch in range(ep):
                    for inputs, labels in loader:
                        optimizer.zero_grad()
                        outputs = model(inputs)
                        loss = criterion((outputs + 1)/2, (labels + 1)/2)  # convert -1,1 -> 0,1
                        loss.backward()
                        optimizer.step()

                with torch.no_grad():
                    preds = model(X_test_t).flatten()
                    preds = torch.where(preds > 0, 1, -1).numpy()
                    acc = accuracy_score(y_test, preds)
                    f1 = f1_score(y_test, preds)

                results.append({
                    'Hidden Neurons': n,
                    'Epochs': ep,
                    'Momentum': mc,
                    'Accuracy': acc,
                    'F1': f1
                })

    df_results = pd.DataFrame(results)
    best = df_results.loc[df_results['Accuracy'].idxmax()]
    print("✅ Best Trend ANN Parameters:")
    print(best)
    return df_results, best

In [None]:
for ticker in tickers:
    print(f"Processing {ticker}")
    
    df = yf.download(ticker, start="2015-01-01", end="2024-12-31", auto_adjust=True)
    df.dropna(inplace=True)
    df["Target"] = np.where(df["Close"] > df["Close"].shift(1), 1, -1)
    df.dropna(inplace=True)

    y = df["Target"]
    X_cont_full = continuous_data_features(df)
    X_cont = X_cont_full.drop(columns=["Close"])

    cont_aligned = pd.concat([X_cont, y], axis=1)
    cont_aligned.dropna(inplace=True)
    X_cont_clean = cont_aligned.drop(columns=['Target'])
    y_cont_clean = cont_aligned['Target']

    summary_results, _, _ = continuous_ann_pytorch(X_cont_clean, y_cont_clean)
    print("\n--- Continuous ANN Results ---")
    print(summary_results.to_string(index=False))

In [None]:
for ticker in tickers:
    print(f"Processing {ticker}")
    
    df = yf.download(ticker, start="2015-01-01", end="2024-12-31", auto_adjust=True)
    df.dropna(inplace=True)
    df["Target"] = np.where(df["Close"] > df["Close"].shift(1), 1, -1)
    df.dropna(inplace=True)

    y = df["Target"]
    
    X_cont_full = continuous_data_features(df)
    X_trend_full = trend_deterministic_data(X_cont_full, df)

    trend_aligned = pd.concat([X_trend_full, y], axis=1)
    trend_aligned.dropna(inplace=True)
    X_trend_clean = trend_aligned.drop(columns=['Target'])
    y_trend_clean = trend_aligned['Target']

    summary_results, _, _ = trend_ann_pytorch(X_trend_clean, y_trend_clean)
    
    print("\n--- Trend ANN Results ---")
    print(summary_results.to_string(index=False))