In [33]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np

In [34]:
mini_df = pd.read_csv('data/nn_data/mini_df.csv')
mini_df_fill = pd.read_csv('data/nn_data/mini_df_fill.csv')
nn_df = pd.read_csv('data/nn_data/nn_df.csv')

In [35]:
dataframes = [mini_df, nn_df, mini_df_fill]

In [36]:
# OneHotEncoding

encoded_dataframes = []

for df in dataframes:
    encoded_df = pd.get_dummies(df)
    encoded_dataframes.append(encoded_df)

In [37]:
# Scaling

scaler = StandardScaler()

scaled_dataframes = []
for encoded_df in encoded_dataframes:
    scaled_array = scaler.fit_transform(encoded_df)
    scaled_df = pd.DataFrame(scaled_array, columns=encoded_df.columns)
    scaled_dataframes.append(scaled_df)

In [38]:
mini_df_scaled_path = "data/ready_dataframes/mini_df_scaled.csv"
nn_df_scaled_path = "data/ready_dataframes/nn_df_scaled.csv"
mini_df_fill_scaled_path = "data/ready_dataframes/mini_df_fill_scaled.csv"

scaled_dataframes[0].to_csv(mini_df_scaled_path, index=False)
scaled_dataframes[1].to_csv(nn_df_scaled_path, index=False)
scaled_dataframes[2].to_csv(mini_df_fill_scaled_path, index=False)

In [39]:
# Model-ready files
nn_df = pd.read_csv(nn_df_scaled_path)
mini_df = pd.read_csv(mini_df_scaled_path)
mini_df_fill = pd.read_csv(mini_df_fill_scaled_path)

In [40]:
# Data Split

# nn_df
target_column = 'Market value'
X = nn_df.drop(columns=[target_column])
y = nn_df[target_column]

# mini_df
# target_column = 'Market value'
# X = mini_df.drop(columns=[target_column])
# y = mini_df[target_column]

# mini_df_fill
# target_column = 'Market value'
# X = mini_df_fill.drop(columns=[target_column])
# y = mini_df_fill[target_column]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [41]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

Using device: cpu


In [42]:
# data conversion - to tensor
X_train = torch.tensor(X_train.values, dtype=torch.float32).unsqueeze(1).to(device)
X_test = torch.tensor(X_test.values, dtype=torch.float32).unsqueeze(1).to(device)
y_train = torch.tensor(y_train.values, dtype=torch.float32).unsqueeze(1).to(device)
y_test = torch.tensor(y_test.values, dtype=torch.float32).unsqueeze(1).to(device)


# dataloader creation
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# LSTM

In [ ]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)
    
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

input_size = X_train.shape[2]
hidden_size = 50
num_layers = 2

model = LSTMModel(input_size, hidden_size, num_layers).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [ ]:
# model training
def train_model(model, train_loader, test_loader, criterion, optimizer, epochs=50):
    train_losses = []
    test_losses = []
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        epoch_loss = running_loss / len(train_loader.dataset)
        
        train_losses.append(epoch_loss)
        
        model.eval()
        test_loss = 0.0
        with torch.no_grad():
            for inputs, targets in test_loader:
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                test_loss += loss.item() * inputs.size(0)
        test_loss = test_loss / len(test_loader.dataset)
        test_losses.append(test_loss)
        
        print(f'Epoch {epoch+1}/{epochs}, Train Loss: {epoch_loss:.4f}, Test Loss: {test_loss:.4f}')
    
    loss_df = pd.DataFrame({'Epoch': list(range(1, epochs + 1)), 'Train Loss': train_losses, 'Test Loss': test_losses})
    loss_df.to_csv("lstm_training_results.csv", index=False)

In [ ]:
train_model(model, train_loader, test_loader, criterion, optimizer, epochs=50)

In [ ]:
# model evaluation
def evaluate_model(model, test_loader):
    model.eval()
    predictions, actuals = [], []
    with torch.no_grad():
        for inputs, targets in test_loader:
            outputs = model(inputs)
            predictions.extend(outputs.numpy())
            actuals.extend(targets.numpy())
    return np.array(predictions), np.array(actuals)

y_pred, y_true = evaluate_model(model, test_loader)

rmse = np.sqrt(mean_squared_error(y_true, y_pred))
mae = mean_absolute_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)

print(f'RMSE: {rmse}')
print(f'MAE: {mae}')
print(f'R²: {r2}')

# CNN

In [28]:
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=2)
        self.dropout = nn.Dropout(0.2)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(64 * (X_train.shape[1] - 1), 50)
        self.fc2 = nn.Linear(50, 1)
    
    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = self.dropout(x)
        x = self.flatten(x)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

model = CNNModel().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [28]:
# model training
def train_model(model, train_loader, test_loader, criterion, optimizer, epochs=50):
    train_losses = []
    test_losses = []
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        epoch_loss = running_loss / len(train_loader.dataset)
        
        train_losses.append(epoch_loss)
        
        # evaluation on test
        model.eval()
        test_loss = 0.0
        with torch.no_grad():
            for inputs, targets in test_loader:
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                test_loss += loss.item() * inputs.size(0)
        test_loss = test_loss / len(test_loader.dataset)
        test_losses.append(test_loss)
        
        print(f'Epoch {epoch+1}/{epochs}, Train Loss: {epoch_loss:.4f}, Test Loss: {test_loss:.4f}')
    
    loss_df = pd.DataFrame({'Epoch': list(range(1, epochs + 1)), 'Train Loss': train_losses, 'Test Loss': test_losses})
    loss_df.to_csv("cnn_training_results.csv", index=False)

In [ ]:
train_model(model, train_loader, test_loader, criterion, optimizer, epochs=50)

In [ ]:
def evaluate_model(model, test_loader):
    model.eval()
    predictions, actuals = [], []
    with torch.no_grad():
        for inputs, targets in test_loader:
            outputs = model(inputs)
            predictions.extend(outputs.numpy())
            actuals.extend(targets.numpy())
    return np.array(predictions), np.array(actuals)

# model evaluation
y_pred, y_true = evaluate_model(model, test_loader)

rmse = np.sqrt(mean_squared_error(y_true, y_pred))
mae = mean_absolute_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)

print(f'RMSE: {rmse}')
print(f'MAE: {mae}')
print(f'R²: {r2}')

# Results Summary and Comparison