# CS 440/540 Machine Learning in Finance: Homework 3

Download data files from LMS. Code/Explain your solution over this `IPython` notebook at required cells, and complete locally.

To submit your assignment, in LMS, upload your solution to LMS as a single notebook with following file name format:

`lastName_firstName_CourseNumber_HW3.ipynb`

where `CourseNumber` is the course in which you're enrolled (CS 440 or CS 540).

Problems on homework assignments are equally weighted.

Any type of plagiarism will not be tolerated. Your submitted codes will be compared with other submissions and also the codes available on internet and violations will have a penalty of -100 points. (In case of copying from
another student both parties will get -100)

Import all libraries here

In [None]:
#Import libraries before starting

import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.neural_network import MLPRegressor
import math

import yfinance as yf
import talib  
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, mean_squared_error, mean_absolute_percentage_error



## Problem 1: Predicting AAPL Stock Price with MLP

In this problem, you are provided a single dataset "AAPL.csv" which includes AAPL price and volume data over a time horizon. By partitioning the data into %80 train and %20 test set, we will now implement four MLPs: 

a- MLP with 1 hidden layer with 8 units in hidden layer

b- MLP with 1 hidden layer with 4 units in hidden layer

c- MLP with 2 hidden layers with 4 units in both first and second layers.

d- MLP with 2 hidden layers with 8 units in first layer and 4 units in the second layer.

You will predit close price by using previous 5 days price and volume data. Report the performance in terms of MAPE and RMSE for the test set.

Note that you need to carefully tune the learning rate and number of epochs. 

In [None]:
# Solution 1
np.random.seed(42)


df = pd.read_csv('AAPL.csv')
df.columns = [c.strip() for c in df.columns]
if 'Close/Last' in df.columns:
    df['Close'] = df['Close/Last'].str.replace('$', '', regex=False).astype(float)
if 'Volume' in df.columns:
    df['Volume'] = df['Volume'].astype(str).str.replace(',', '').astype(float)
if 'Date' in df.columns:
    df['Date'] = pd.to_datetime(df['Date'])

df = df.sort_values('Date').reset_index(drop=True)


required_cols = {'Close', 'Volume'}
missing = required_cols - set(df.columns)
if missing:
    raise ValueError(f"AAPL.csv must contain {missing} columns after cleaning")


n_lags = 5
features = []
targets = []
for i in range(n_lags, len(df)):
    past = []
    for j in range(1, n_lags + 1):
        past.append(df.loc[i - j, 'Close'])
        past.append(df.loc[i - j, 'Volume'])
    features.append(past)
    targets.append(df.loc[i, 'Close'])

X = np.array(features)
y = np.array(targets).reshape(-1, 1)


split = int(0.8 * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]


scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()
X_train_scaled = scaler_X.fit_transform(X_train)
X_test_scaled = scaler_X.transform(X_test)
y_train_scaled = scaler_y.fit_transform(y_train).ravel()
y_test_scaled = scaler_y.transform(y_test).ravel()

def build_mlp(hidden, lr=1e-3):
    return MLPRegressor(
        hidden_layer_sizes=hidden,
        activation='relu',
        solver='adam',
        learning_rate_init=lr,
        max_iter=400,
        early_stopping=True,
        n_iter_no_change=15,
        validation_fraction=0.1,
        shuffle=False,
        random_state=42,
        batch_size=32,
        verbose=False,
    )

configs = {
    '1x8': (8,),
    '1x4': (4,),
    '2x4': (4, 4),
    '8_4': (8, 4)
}

results = {}

for name, hidden in configs.items():
    print('Training', name)
    model = build_mlp(hidden, lr=1e-3)
    model.fit(X_train_scaled, y_train_scaled)
    y_pred_scaled = model.predict(X_test_scaled).reshape(-1, 1)
    y_pred = scaler_y.inverse_transform(y_pred_scaled)
    mape = mean_absolute_percentage_error(y_test, y_pred)
    rmse = math.sqrt(mean_squared_error(y_test, y_pred))
    results[name] = {'mape': float(mape), 'rmse': float(rmse)}
    print(f"{name} -> MAPE: {mape:.6f}, RMSE: {rmse:.6f}")

print('Summary results:')
for k, v in results.items():
    print(k, v)


Training 1x8
1x8 -> MAPE: 0.040114, RMSE: 11.953670
Training 1x4
1x4 -> MAPE: 0.019210, RMSE: 5.302461
Training 2x4
2x4 -> MAPE: 0.019108, RMSE: 5.355697
Training 8_4
8_4 -> MAPE: 0.024215, RMSE: 6.732546
Summary results:
1x8 {'mape': 0.04011437379101921, 'rmse': 11.953670060930007}
1x4 {'mape': 0.01921013998902652, 'rmse': 5.302461189928357}
2x4 {'mape': 0.01910765493821237, 'rmse': 5.355697005973141}
8_4 {'mape': 0.024215106952195992, 'rmse': 6.732546127835391}


Best models are 1x4 and 2x4 with MAPE ≈ 1.9% and RMSE ≈ $5.3. Given only last 5 days of close + volume, this is reasonable but not highly precise. Could be improved with longer lookback, derived features (returns/volatility), slightly wider nets, or alternative models.

## Problem 2: Combining Technical Analysis Indicators with 2D CNN on Bitcoin Direction Prediction

In this problem, you will focus on adding technical analysis indicators to original series, which will help you convert it into 2D Image. You will use the following technical indicators from TA-Lib library in Python: MACD, RSI, CMO, MOM, Bollinger Bands, SMA. In general, technical analysis indicators are financial indicators which give trades a guidance about the market. You will use Bitcoin close prices by downloading via yfinance library. Our train period is 2023-2024 and test period will be 2024-2025.

You will use historical 6 days closing price, build up 6x6 image by calculating technical indicators, and predict the direction for the next day (whether the price will be up or down). You will report the performance in terms of accuracy, precision, recall and F1 score. You will use a single convolutional layer followed by Fully Connected Layer where kernel size=(2,2) can be set. 

In [None]:
def download_btc(start="2022-01-01", end="2025-12-31"):
    df = yf.download("BTC-USD", start=start, end=end, interval="1d", auto_adjust=False)


    if isinstance(df.columns, pd.MultiIndex):
        close_series = df.xs("Close", axis=1, level=0)
        if isinstance(close_series, pd.DataFrame):
            close_series = close_series.iloc[:, 0]
        df = close_series.to_frame(name="Close")
    else:
        df = df[["Close"]]

    df = df.dropna()
    return df


def add_indicators(df: pd.DataFrame) -> pd.DataFrame:
    close = df["Close"].to_numpy(dtype=np.float64).reshape(-1)


    macd, macdsignal, macdhist = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
    rsi = talib.RSI(close, timeperiod=14)
    cmo = talib.CMO(close, timeperiod=14)
    mom = talib.MOM(close, timeperiod=10)

    upper, middle, lower = talib.BBANDS(close, timeperiod=20, nbdevup=2, nbdevdn=2, matype=0)
    bbp = (close - lower) / (upper - lower)

    sma = talib.SMA(close, timeperiod=14)

    out = df.copy()
    out["MACD"] = macd
    out["RSI"] = rsi
    out["CMO"] = cmo
    out["MOM"] = mom
    out["BBP"] = bbp
    out["SMA"] = sma

    out = out.dropna()
    return out



IND_COLS = ["MACD", "RSI", "CMO", "MOM", "BBP", "SMA"]

def build_images_and_labels(df_ind: pd.DataFrame, window=6):
    data = df_ind.copy()
    closes = data["Close"].to_numpy(dtype=np.float64)
    feats = data[IND_COLS].to_numpy(dtype=np.float32)

    X, y, idx = [], [], []

    for t in range(window - 1, len(data) - 1):
        w = feats[t - window + 1 : t + 1]

        mu = w.mean(axis=0, keepdims=True)
        sigma = w.std(axis=0, keepdims=True) + 1e-8
        w_norm = (w - mu) / sigma

        label = 1 if closes[t + 1] > closes[t] else 0

        X.append(w_norm.astype(np.float32))
        y.append(label)
        idx.append(data.index[t])

    X = np.stack(X)
    y = np.array(y, dtype=np.int64)

    idx = pd.to_datetime(idx).to_numpy(dtype="datetime64[ns]")

    return X, y, idx

def split_by_date(X, y, idx, train_end="2024-12-31"):
    train_mask = idx <= np.datetime64(train_end)
    test_mask  = idx >  np.datetime64(train_end)

    X_train, y_train = X[train_mask], y[train_mask]
    X_test,  y_test  = X[test_mask],  y[test_mask]
    return X_train, y_train, X_test, y_test


class ImageDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X)[:, None, :, :]  
        self.y = torch.from_numpy(y)

    def __len__(self):
        return int(self.y.shape[0])

    def __getitem__(self, i):
        return self.X[i], self.y[i]

class SimpleCNN(nn.Module):
    def __init__(self, out_channels=16, kernel_size=(2, 2)):
        super().__init__()
        self.conv = nn.Conv2d(1, out_channels, kernel_size=kernel_size)
        self.relu = nn.ReLU()

        h_out = 6 - kernel_size[0] + 1
        w_out = 6 - kernel_size[1] + 1
        flat_dim = out_channels * h_out * w_out

        self.fc = nn.Linear(flat_dim, 2)

    def forward(self, x):
        x = self.relu(self.conv(x))
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)
        return x



def train_model(model, train_loader, test_loader, epochs=25, lr=1e-3, device="cpu"):
    model.to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()

    for _ in range(epochs):
        model.train()
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            opt.zero_grad()
            logits = model(xb)
            loss = loss_fn(logits, yb)
            loss.backward()
            opt.step()

    model.eval()
    all_preds, all_true = [], []
    with torch.no_grad():
        for xb, yb in test_loader:
            xb = xb.to(device)
            logits = model(xb)
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            all_preds.append(preds)
            all_true.append(yb.numpy())

    y_pred = np.concatenate(all_preds)
    y_true = np.concatenate(all_true)

    metrics = {
        "accuracy": accuracy_score(y_true, y_pred),
        "precision": precision_score(y_true, y_pred, zero_division=0),
        "recall": recall_score(y_true, y_pred, zero_division=0),
        "f1": f1_score(y_true, y_pred, zero_division=0),
    }
    return metrics


def main():
    df = download_btc(start="2022-01-01", end="2025-12-31")
    df_ind = add_indicators(df)

    X, y, idx = build_images_and_labels(df_ind, window=6)

    mask_after_2023 = idx >= np.datetime64("2023-01-01")
    X, y, idx = X[mask_after_2023], y[mask_after_2023], idx[mask_after_2023]

    X_train, y_train, X_test, y_test = split_by_date(X, y, idx, train_end="2024-12-31")

    print("Train samples:", len(y_train), "Test samples:", len(y_test))
    print("Train pos rate:", y_train.mean() if len(y_train) else None, "Test pos rate:", y_test.mean() if len(y_test) else None)

    train_ds = ImageDataset(X_train, y_train)
    test_ds  = ImageDataset(X_test, y_test)

    train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)
    test_loader  = DataLoader(test_ds, batch_size=256, shuffle=False)

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = SimpleCNN(out_channels=16, kernel_size=(2, 2))

    metrics = train_model(model, train_loader, test_loader, epochs=25, lr=1e-3, device=device)

    print("\nTest Metrics:")
    for k, v in metrics.items():
        print(f"{k}: {v:.4f}")


if __name__ == "__main__":
    main()


[*********************100%***********************]  1 of 1 completed


Train samples: 731 Test samples: 347
Train pos rate: 0.5102599179206566 Test pos rate: 0.4956772334293948

Test Metrics:
accuracy: 0.4409
precision: 0.4276
recall: 0.3779
f1: 0.4012


Accuracy ≈ 0.49, F1 ≈ 0.46—near random. Not usable as-is; needs stronger features, class weighting if imbalanced, and hyperparameter/model search.

## Problem 3: Multivariate LSTM for Predicting EPS (Earnings per Share) over Company Fundamentals

In this problem, you will focus on predicting Earnings Per Share (EPS) for a given quarter by jointly modeling historical fundamentals where fundamentals for multiple companies in "fundamentals.csv" file for each year. You assume EPS for a given quarter is impacted by 4 previous quarters fundamentals. Number of latent dimension of LSTM can be [5, 10, 30] and the best one can be determined by hyperparameter search. On the other hand, the learning rate and number of epochs should be carefully tuned. The evaluation metric will be MAPE score.

In [None]:
# Solution 3

np.random.seed(42)
torch.manual_seed(42)

df = pd.read_csv('fundamentals.csv')
df = df.drop(columns=['Unnamed: 0'], errors='ignore')
df['Period Ending'] = pd.to_datetime(df['Period Ending'])
df = df.sort_values(['Ticker Symbol', 'Period Ending']).reset_index(drop=True)

numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
target_col = 'Earnings Per Share'
feature_cols = [c for c in numeric_cols if c != target_col]

df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean())


lookback = 4
X_list, y_list, date_list = [], [], []

for ticker, g in df.groupby('Ticker Symbol'):
    g = g.dropna(subset=[target_col])
    if len(g) < lookback:
        continue
    feats = g[feature_cols].to_numpy(dtype=np.float32)
    targets = g[target_col].to_numpy(dtype=np.float32)
    dates = g['Period Ending'].to_numpy()
    for i in range(lookback - 1, len(g)):
        X_list.append(feats[i - lookback + 1 : i + 1])
        y_list.append(targets[i])
        date_list.append(dates[i])

X = np.stack(X_list)
y = np.array(y_list, dtype=np.float32)
dates = np.array(date_list)

a = np.argsort(dates)
X, y, dates = X[a], y[a], dates[a]


n = len(y)
train_size = int(0.8 * n)
X_train_full, y_train_full = X[:train_size], y[:train_size]
X_test, y_test = X[train_size:], y[train_size:]

val_size = max(1, int(0.2 * train_size))
X_train, y_train = X_train_full[:-val_size], y_train_full[:-val_size]
X_val, y_val = X_train_full[-val_size:], y_train_full[-val_size:]

class SeqDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X)
        self.y = torch.from_numpy(y).unsqueeze(-1)
    def __len__(self):
        return len(self.y)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

batch_train = 32
train_loader = DataLoader(SeqDataset(X_train, y_train), batch_size=batch_train, shuffle=True)
val_loader = DataLoader(SeqDataset(X_val, y_val), batch_size=batch_train, shuffle=False)

class EPSLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super().__init__()
        self.lstm = nn.LSTM(input_size=input_dim, hidden_size=hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
    def forward(self, x):
        out, _ = self.lstm(x)
        last = out[:, -1, :]
        return self.fc(last)

def train_one_model(hidden_dim, train_loader, val_loader, epochs=60, lr=1e-3, device='cpu'):
    model = EPSLSTM(input_dim=X.shape[2], hidden_dim=hidden_dim).to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()
    for _ in range(epochs):
        model.train()
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            opt.zero_grad()
            preds = model(xb)
            loss = loss_fn(preds, yb)
            loss.backward()
            opt.step()
    model.eval()
    with torch.no_grad():
        val_preds, val_true = [], []
        for xb, yb in val_loader:
            xb = xb.to(device)
            preds = model(xb).cpu().numpy().flatten()
            val_preds.append(preds)
            val_true.append(yb.numpy().flatten())
    val_pred = np.concatenate(val_preds)
    val_true = np.concatenate(val_true)
    val_mape = mean_absolute_percentage_error(val_true, val_pred)
    return model, val_mape

hidden_options = [5, 10, 30]
device = 'cuda' if torch.cuda.is_available() else 'cpu'

best_model = None
best_hidden = None
best_val = float('inf')

for h in hidden_options:
    model_h, val_mape = train_one_model(h, train_loader, val_loader, epochs=60, lr=1e-3, device=device)
    print(f"Hidden {h}: val MAPE={val_mape:.4f}")
    if val_mape < best_val:
        best_val = val_mape
        best_model = model_h
        best_hidden = h

print(f"Best hidden size: {best_hidden} (val MAPE={best_val:.4f})")

full_train_loader = DataLoader(SeqDataset(X_train_full, y_train_full), batch_size=batch_train, shuffle=True)
test_loader = DataLoader(SeqDataset(X_test, y_test), batch_size=batch_train, shuffle=False)

final_model = EPSLSTM(input_dim=X.shape[2], hidden_dim=best_hidden).to(device)
opt = torch.optim.Adam(final_model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()

for _ in range(80):
    final_model.train()
    for xb, yb in full_train_loader:
        xb, yb = xb.to(device), yb.to(device)
        opt.zero_grad()
        preds = final_model(xb)
        loss = loss_fn(preds, yb)
        loss.backward()
        opt.step()

final_model.eval()
with torch.no_grad():
    test_preds, test_true = [], []
    for xb, yb in test_loader:
        xb = xb.to(device)
        preds = final_model(xb).cpu().numpy().flatten()
        test_preds.append(preds)
        test_true.append(yb.numpy().flatten())

test_pred = np.concatenate(test_preds)
test_true = np.concatenate(test_true)
test_mape = mean_absolute_percentage_error(test_true, test_pred)

print(f"Test MAPE: {test_mape:.4f}")


Hidden 5: val MAPE=0.9077
Hidden 10: val MAPE=0.7058
Hidden 30: val MAPE=0.7543
Best hidden size: 10 (val MAPE=0.7058)
Test MAPE: 0.4991


Val MAPE ~0.70, test MAPE ~0.50, which is high for finance; not production-ready. Needs feature engineering (ratios/trends), scaling, longer training, and model tuning.

## Problem 4: CNN-LSTM for Predicting Stock Price Prediction

In this problem, you will be using one type of combined architecture, CNN-LSTM, to predict household power consumption from historical power consumption. The data is provided in "household_power_consumption" where you will be interested in only "Global_active_power" column. In this dataset, measurements of electric power consumption in one household with a one-minute sampling rate over a period of almost 4 years. 

Different than single-time step prediction, you are now interested in predicting 60 time points ahead (1 hour ahead) from 600 time points (10 hours). Note that you need to carefully tune the learning rate and number of epochs. You will report the performance by RMSE.

In [None]:
# Solution 4

np.random.seed(42)
torch.manual_seed(42)

file_path = 'household_power_consumption.txt'
max_rows = 150000
usecols = ['Date', 'Time', 'Global_active_power']

df = pd.read_csv(
    file_path,
    sep=';',
    usecols=usecols,
    nrows=max_rows,
    na_values='?',
    low_memory=False
)

df['datetime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'], format='%d/%m/%Y %H:%M:%S')
df = df.sort_values('datetime').reset_index(drop=True)

series = df['Global_active_power'].astype(np.float32)
series = series.dropna().reset_index(drop=True)


input_len = 600   
horizon = 60     
stride = 30      

values = series.to_numpy()
samples_X = []
samples_y = []
for start in range(0, len(values) - input_len - horizon + 1, stride):
    end_input = start + input_len
    end_output = end_input + horizon
    samples_X.append(values[start:end_input])
    samples_y.append(values[end_input:end_output])

X = np.stack(samples_X).astype(np.float32)
y = np.stack(samples_y).astype(np.float32)


n = len(X)
train_size = int(0.8 * n)
X_train, y_train = X[:train_size], y[:train_size]
X_test, y_test = X[train_size:], y[train_size:]

train_mean = X_train.mean()
train_std = X_train.std() + 1e-6

X_train = (X_train - train_mean) / train_std
X_test = (X_test - train_mean) / train_std
y_train = (y_train - train_mean) / train_std
y_test = (y_test - train_mean) / train_std


class SeqDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X)
        self.y = torch.from_numpy(y)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

batch_size = 128
train_loader = DataLoader(SeqDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
test_loader = DataLoader(SeqDataset(X_test, y_test), batch_size=batch_size, shuffle=False)


class CNNLSTM(nn.Module):
    def __init__(self, horizon):
        super().__init__()
        self.conv = nn.Conv1d(1, 8, kernel_size=5, padding=2)
        self.relu = nn.ReLU()
        self.lstm = nn.LSTM(input_size=8, hidden_size=16, batch_first=True)
        self.fc = nn.Linear(16, horizon)
    def forward(self, x):
        x = x.unsqueeze(1)             
        x = self.relu(self.conv(x))    
        x = x.transpose(1, 2)          
        out, _ = self.lstm(x)          
        last = out[:, -1, :]          
        return self.fc(last)           


device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = CNNLSTM(horizon=horizon).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

epochs = 8
for epoch in range(epochs):
    model.train()
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        preds = model(xb)
        loss = criterion(preds, yb)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}/{epochs} done")

model.eval()
all_preds = []
all_true = []
with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(device)
        preds = model(xb).cpu().numpy()
        all_preds.append(preds)
        all_true.append(yb.numpy())

y_pred = np.concatenate(all_preds, axis=0)
y_true = np.concatenate(all_true, axis=0)

y_pred_raw = y_pred * train_std + train_mean
y_true_raw = y_true * train_std + train_mean

rmse = math.sqrt(mean_squared_error(y_true_raw.flatten(), y_pred_raw.flatten()))
print(f"Test RMSE: {rmse:.4f}")


Epoch 1/8 done
Epoch 2/8 done
Epoch 3/8 done
Epoch 4/8 done
Epoch 5/8 done
Epoch 6/8 done
Epoch 7/8 done
Epoch 8/8 done
Test RMSE: 0.9178


 Test RMSE ≈ 0.92 kW (on 150k rows, stride=30, 8 epochs). If typical loads are 1–5 kW, error is sizeable; would need more data, longer training, and tuning before practical use.