In [None]:
import torch
import optuna
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pathlib

In [None]:
def df_to_X_y_vector(df, lags, future):
    X = []
    y = []
    for i in range(len(df) - lags - future):
        x_window_rows = df.iloc[i:i + lags]
        x_row = [list(x_window_rows.iloc[j]) for j in range(lags)]
        X.append(x_row)
        y_window_rows = df.iloc[i + lags:i + lags + future]
        y_row = [y_window_rows.iloc[j]['Detections'] for j in range(future)]
        y.append(y_row)
    return np.array(X), np.array(y)


def df_to_X_y_standard(df, lags):
    X = []
    y = []
    for i in range(len(df) - lags):
        window_rows = df.iloc[i:i + lags]
        row = [list(window_rows.iloc[j]) for j in range(lags)]
        X.append(row)
        label = df.iloc[i + lags]
        y.append(label['Detections'])
    return np.array(X), np.array(y)


def get_df(path):
    df = pd.read_csv(path)
    df = df.drop(['WeatherDescription', 'Unnamed: 0', 'index'], axis=1)
    df = pd.get_dummies(df, columns=['WeatherMain'], prefix='WeatherMain')
    df['Timestamp'] = pd.to_datetime(df['Timestamp'])
    df['Year'] = df['Timestamp'].dt.year
    df['Month'] = df['Timestamp'].dt.month
    df['Day'] = df['Timestamp'].dt.day
    df['Hour'] = df['Timestamp'].dt.hour
    df = df.drop(['Timestamp', 'WeatherMain_Snow', 'Year'], axis=1)
    return df_to_X_y_vector(df, 10, 5)

In [None]:
path = '../../train_belgrade.csv'
X, y = get_df(path)
X = np.reshape(X, (X.shape[0], X.shape[1], X.shape[2]))
X_train_raw, X_temp, y_train_raw, y_temp = train_test_split(X, y, test_size=0.2, random_state=42)
X_val_raw, X_test_raw, y_val_raw, y_test_raw = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

In [None]:
X_train = torch.tensor(X_train_raw, dtype=torch.float32)
y_train = torch.tensor(y_train_raw, dtype=torch.float32)
X_val = torch.tensor(X_val_raw, dtype=torch.float32)
y_val = torch.tensor(y_val_raw, dtype=torch.float32)

# permute since we have batch_first=True
X_train = X_train.permute(0, 1, 2)
X_val = X_val.permute(0, 1, 2)

# y_train = y_train.view(-1, y_train.shape[0], y_train.shape[1])
# y_val = y_val.view(-1, y_val.shape[0], y_val.shape[1]) 
print(X_train.shape, y_train.shape)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import optuna

class CustomLSTM(nn.Module):
    def __init__(self, input_dim, hidden_size, **kwargs):
        super().__init__()
        self.last_layer = kwargs.pop('last_layer', False)
        self.lstm = nn.GRU(input_dim, hidden_size, **kwargs)

    def forward(self, x):
        output, _ = self.lstm(x)
        if self.last_layer:
            return output[:, -1, :]
        return output


def create_model(trial, input_size, output_size):
    # Hyperparameters to be tuned
    hidden_size = trial.suggest_int('hidden_size', 50, 200)
    num_layers = trial.suggest_int('num_layers', 1, 3)
    dropout = trial.suggest_float('dropout', 0.1, 0.5)
    
    layers = []
    for i in range(num_layers):
        input_dim = input_size if i == 0 else hidden_size
        layers.append(CustomLSTM(input_dim, hidden_size, batch_first=True, dropout=dropout if i+1 < num_layers else 0, last_layer=i+1 == num_layers))
    layers.append(nn.Linear(hidden_size, output_size))
    
    model = nn.Sequential(*layers)
    return model


In [None]:
def objective(trial):
    # Assuming data preparation steps are already done, and you have X_train, y_train, X_val, y_val as tensors
    # print('Inputs:', X_train.shape, y_train.shape)
    
    model = create_model(trial, input_size=8, output_size=5)

    
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=trial.suggest_float('lr', 1e-5, 1e-2, log=True))
    
    # Example: Train the model
    epochs = 10
    mini_batch_size = 32
    for epoch in range(epochs):
        for i in range(0, len(X_train), mini_batch_size):
            optimizer.zero_grad()
            output = model(X_train[i:i+mini_batch_size])
            # print('Outputs', output.shape, y_train[i:i+mini_batch_size].shape)
            loss = criterion(output, y_train[i:i+mini_batch_size])
            loss.backward()
            optimizer.step()
        model.eval()
        with torch.no_grad():
            val_output = model(X_val)
            val_loss = criterion(val_output, y_val)
    
    print(f'Epoch {epoch+1}/{epochs}, Loss: {val_loss.item()}')
    # save model
    output_path = pathlib.Path('./lstm_models')
    output_path.mkdir(exist_ok=True)
    torch.save(model.state_dict(), output_path/f'model_{trial.number}.pth')

            
    return val_loss.item()


In [None]:
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10)

best_trial = study.best_trial
print(f'Best trial score: {best_trial.value}')
for key, value in best_trial.params.items():
    print(f'{key}: {value}')


In [None]:
# evaluate on test set
X_test = torch.tensor(X_test_raw, dtype=torch.float32)
X_test = X_test.permute(0, 1, 2)
y_test = torch.tensor(y_test_raw, dtype=torch.float32)

criterion = nn.MSELoss()

model = create_model(best_trial, input_size=8, output_size=5)
model.eval()
model.load_state_dict(torch.load(f'./lstm_models/model_{best_trial.number}.pth'))


In [None]:

with torch.no_grad():
    test_output = model(X_test)
    print(test_output, y_test)
    test_loss = criterion(test_output, y_test)
    
print(f'Test loss: {test_loss.item()}')

    