In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd drive/My\ Drive

In [None]:
# Imports
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split

In [None]:
# Load the data
''' path_to_data = '/content/drive/My Drive/MG_time_series.csv' for Google Colab 
and 'RNN_Course/Data/MG_time_series.csv' for running the codes on the server
'''
path_to_data = 'Data/MG_time_series.csv'
data = pd.read_csv(path_to_data)

In [None]:
# Normalize data
# data = (data - np.mean(data)) / np.std(data)
print(data.shape)

# Create sequences
def create_dataset(series, time_steps):
    X, y = [], []
    for i in range(len(series) - time_steps):
        X.append(series[i:i + time_steps])
        y.append(series[i + time_steps])
    return np.array(X), np.array(y)

# Convert to Numpy if it's a Pandas object
if isinstance(data, pd.Series) or isinstance(data, pd.DataFrame):
    data = data.values

time_steps = 50  # Number of time steps (sliding window)
X, y = create_dataset(data, time_steps)


# Convert data to PyTorch tensors and reshape for RNN input (samples, time steps, features)
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
X_tensor = torch.from_numpy(X).float()
y_tensor = torch.from_numpy(y).float()


In [None]:
# Splitting the dataset into training, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(X_tensor, y_tensor, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

print(f"Training set: {X_train.shape}, {y_train.shape}")
print(f"Validation set: {X_val.shape}, {y_val.shape}")
print(f"Test set: {X_test.shape}, {y_test.shape}")


In [None]:
# Define the vanilla RNN model
class VanillaRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(VanillaRNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.hidden_size).to(x.device)  # Initial hidden state
        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])  # Take the output from the last time step
        return out

In [None]:
# Set initial parameters
input_size = 1
output_size = 1
hidden_sizes = [16, 32, 64, 128]  # Hidden layer sizes to tune

In [None]:
# Initialize a dictionary to store the losses for each hidden layer size
losses_per_hidden_size = {}

# Set initial parameters
best_model = None
best_val_loss = float('inf')

# Loop over different hidden sizes
for hidden_size in hidden_sizes:
    model = VanillaRNN(input_size, hidden_size, output_size)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Initialize lists for storing epoch losses
    train_loss_epoch = []
    val_loss_epoch = []

    # Training loop
    epochs = 100
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output, y_train)
        loss.backward()
        optimizer.step()

        # Append training loss for this epoch
        train_loss_epoch.append(loss.item())

        # Validation
        model.eval()
        val_output = model(X_val)
        val_loss = criterion(val_output, y_val)
        val_loss_epoch.append(val_loss.item())

        # Save the best model
        if val_loss.item() < best_val_loss:
            best_val_loss = val_loss.item()
            best_model = model  # Save the best model

        if epoch % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Hidden Size: {hidden_size}, Training Loss: {loss.item()}, Validation Loss: {val_loss.item()}")

    # Store the training and validation losses for this hidden size
    losses_per_hidden_size[hidden_size] = (train_loss_epoch, val_loss_epoch)

# Plot training and validation loss for each hidden size
save_path = '/content/drive/My Drive/training.pdf'
my_color = ['#FF662A', '#FFA22A', '#82AC26', '#4F3F84']
plt.figure(figsize=(12, 8))
for i, hidden_size in enumerate(hidden_sizes):
    train_loss, val_loss = losses_per_hidden_size[hidden_size]
    plt.plot(train_loss, color=my_color[i], label=f'Training Loss (Hidden Size {hidden_size})', linestyle='solid')
    plt.plot(val_loss, color=my_color[i], label=f'Validation Loss (Hidden Size {hidden_size})', linestyle='dashed')

plt.title('Training and Validation Loss for Different Hidden Sizes')
plt.xlabel('Epochs', fontsize=22)  # X-axis label font size
plt.ylabel('Loss', fontsize=22)  # Y-axis label font size
plt.xticks(fontsize=18)  # X-axis ticks font size
plt.yticks(fontsize=18)

# Increase font size for the legend
plt.legend(fontsize=18)
plt.savefig(save_path, format='pdf')
plt.show()

In [None]:
best_model.eval()
test_output = best_model(X_test)
test_loss = criterion(test_output, y_test)

print(f"Test Loss: {test_loss.item()}")

# Convert tensors to numpy arrays for plotting
y_test_np = y_test.detach().numpy()
test_output_np = test_output.detach().numpy()

# Plot the true vs predicted time series
save_path = '/content/drive/My Drive/predicted_vs_true_timeseries.pdf'
plt.figure(figsize=(10, 6))
plt.plot(y_test_np, label='True Time Series', color='#696969')
plt.plot(test_output_np, label='Predicted Time Series', color='#FF5500', linestyle='dashed')
plt.title('True vs Predicted Time Series', fontsize=22)
plt.xlabel('Time Steps', fontsize=22)
plt.ylabel('Value', fontsize=22)
plt.legend(fontsize=18)

# Save the figure as a PDF file
plt.savefig(save_path, format='pdf')

print(f"Figure saved as PDF to: {save_path}")

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.hidden_size).to(x.device)  # Initial hidden state
        c0 = torch.zeros(1, x.size(0), self.hidden_size).to(x.device)  # Initial cell state
        out, _ = self.lstm(x, (h0, c0))  # LSTM forward
        out = self.fc(out[:, -1, :])  # Fully connected layer applied to the last time step
        return out

In [None]:
# Initialize a dictionary to store the losses for each hidden layer size
losses_per_hidden_size = {}

# Set initial parameters
best_model = None
best_val_loss = float('inf')

# Loop over different hidden sizes
for hidden_size in hidden_sizes:
    model = LSTMModel(input_size, hidden_size, output_size)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Initialize lists for storing epoch losses
    train_loss_epoch = []
    val_loss_epoch = []

    # Training loop
    epochs = 100
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output, y_train)
        loss.backward()
        optimizer.step()

        # Append training loss for this epoch
        train_loss_epoch.append(loss.item())

        # Validation
        model.eval()
        val_output = model(X_val)
        val_loss = criterion(val_output, y_val)
        val_loss_epoch.append(val_loss.item())

        # Save the best model
        if val_loss.item() < best_val_loss:
            best_val_loss = val_loss.item()
            best_model = model  # Save the best model

        if epoch % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Hidden Size: {hidden_size}, Training Loss: {loss.item()}, Validation Loss: {val_loss.item()}")

    # Store the training and validation losses for this hidden size
    losses_per_hidden_size[hidden_size] = (train_loss_epoch, val_loss_epoch)

# Plot training and validation loss for each hidden size
my_color = ['#FF662A', '#FFA22A', '#82AC26', '#4F3F84']
plt.figure(figsize=(12, 8))
for i, hidden_size in enumerate(hidden_sizes):
    train_loss, val_loss = losses_per_hidden_size[hidden_size]
    plt.plot(train_loss, color=my_color[i], label=f'Training Loss (Hidden Size {hidden_size})', linestyle='solid')
    plt.plot(val_loss, color=my_color[i], label=f'Validation Loss (Hidden Size {hidden_size})', linestyle='dashed')

plt.title('Training and Validation Loss for Different Hidden Sizes')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
best_model.eval()
test_output = best_model(X_test)
test_loss = criterion(test_output, y_test)

print(f"Test Loss: {test_loss.item()}")

# Convert tensors to numpy arrays for plotting
y_test_np = y_test.detach().numpy()
test_output_np = test_output.detach().numpy()

# Plot the true vs predicted time series
save_path = '/content/drive/My Drive/predicted_vs_true_timeseries.pdf'
plt.figure(figsize=(10, 6))
plt.plot(y_test_np, label='True Time Series', color='#696969')
plt.plot(test_output_np, label='Predicted Time Series', color='#FF5500', linestyle='dashed')
plt.title('True vs Predicted Time Series')
plt.xlabel('Time Steps')
plt.ylabel('Value')
plt.legend()

# Save the figure as a PDF file
plt.savefig(save_path, format='pdf')

print(f"Figure saved as PDF to: {save_path}")

In [None]:
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(GRUModel, self).__init__()
        self.hidden_size = hidden_size
        self.gru = nn.GRU(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.hidden_size).to(x.device)  # Initial hidden state
        out, _ = self.gru(x, h0)  # GRU forward
        out = self.fc(out[:, -1, :])  # Fully connected layer applied to the last time step
        return out

In [None]:
# Initialize a dictionary to store the losses for each hidden layer size
losses_per_hidden_size = {}

# Set initial parameters
best_model = None
best_val_loss = float('inf')

# Loop over different hidden sizes
for hidden_size in hidden_sizes:
    model = GRUModel(input_size, hidden_size, output_size)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Initialize lists for storing epoch losses
    train_loss_epoch = []
    val_loss_epoch = []

    # Training loop
    epochs = 100
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output, y_train)
        loss.backward()
        optimizer.step()

        # Append training loss for this epoch
        train_loss_epoch.append(loss.item())

        # Validation
        model.eval()
        val_output = model(X_val)
        val_loss = criterion(val_output, y_val)
        val_loss_epoch.append(val_loss.item())

        # Save the best model
        if val_loss.item() < best_val_loss:
            best_val_loss = val_loss.item()
            best_model = model  # Save the best model

        if epoch % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Hidden Size: {hidden_size}, Training Loss: {loss.item()}, Validation Loss: {val_loss.item()}")

    # Store the training and validation losses for this hidden size
    losses_per_hidden_size[hidden_size] = (train_loss_epoch, val_loss_epoch)

# Plot training and validation loss for each hidden size
my_color = ['#FF662A', '#FFA22A', '#82AC26', '#4F3F84']
plt.figure(figsize=(12, 8))
for i, hidden_size in enumerate(hidden_sizes):
    train_loss, val_loss = losses_per_hidden_size[hidden_size]
    plt.plot(train_loss, color=my_color[i], label=f'Training Loss (Hidden Size {hidden_size})', linestyle='solid')
    plt.plot(val_loss, color=my_color[i], label=f'Validation Loss (Hidden Size {hidden_size})', linestyle='dashed')

plt.title('Training and Validation Loss for Different Hidden Sizes')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
best_model.eval()
test_output = best_model(X_test)
test_loss = criterion(test_output, y_test)

print(f"Test Loss: {test_loss.item()}")

# Convert tensors to numpy arrays for plotting
y_test_np = y_test.detach().numpy()
test_output_np = test_output.detach().numpy()

# Plot the true vs predicted time series
save_path = '/content/drive/My Drive/predicted_vs_true_timeseries.pdf'
plt.figure(figsize=(10, 6))
plt.plot(y_test_np, label='True Time Series', color='#696969')
plt.plot(test_output_np, label='Predicted Time Series', color='#FF5500', linestyle='dashed')
plt.title('True vs Predicted Time Series')
plt.xlabel('Time Steps')
plt.ylabel('Value')
plt.legend()

# Save the figure as a PDF file
plt.savefig(save_path, format='pdf')

print(f"Figure saved as PDF to: {save_path}")