In [None]:
import torch as torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
from random import *
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# CNN model

In [None]:
class CNN(nn.Module):
    def __init__(self, input_features, output_dim, k_size):
        super(CNN, self).__init__()
        self.convs = nn.Sequential(
            nn.Conv1d(in_channels=input_features, out_channels=16, kernel_size=3, stride=3),
            nn.ReLU(),
            nn.AvgPool1d(kernel_size=5, stride=1),
            nn.Conv1d(in_channels=16, out_channels=8, kernel_size=3, stride=3),
            nn.ReLU(),
            nn.AvgPool1d(kernel_size=3, stride=1),
            nn.Conv1d(in_channels=8, out_channels=4, kernel_size=3, stride=1),
            nn.ReLU(),
            #nn.AvgPool1d(kernel_size=2, stride=1)
            nn.Conv1d(in_channels=4, out_channels=1, kernel_size=2, stride=1)
        )
        #self.activation = nn.GELU()
        self.fc = None
        
    def forward(self, x):
        
        x = self.convs(x)
        x = x.view(x.size(0), -1)
        linear_input = x.size(1)
        
        if self.fc is None:
            self.fc = nn.Linear(linear_input, 1).to(device)
            
        x = self.fc(x)
        #x = self.activation(x)
        
        return x
    
model = CNN(input_features=1, output_dim=1, k_size=3).to(device)
print(model)
        

In [None]:
# Create DataLoader for training and validation

batch_size = 64
learning_rate = 0.001
num_epochs = 500

train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size= batch_size, shuffle=False)

valid_dataset = TensorDataset(X_valid, y_valid)
valid_loader = DataLoader(valid_dataset,batch_size= batch_size, shuffle=False)

In [None]:
torch.cuda.empty_cache()

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
#optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)

best_loss = float('inf')

loss_vals = []
#mse_train = []
mse_valid = []

for epoch in range(num_epochs):
    train_loss = 0
    #train_mse = 0
    for inputs, labels in train_loader:
        model.train()
        inputs = inputs.view(inputs.size(0), 1, 80).to(device) # batch_size, feature_num, sequence_length
        labels = labels.view(-1, 1).to(device)
        optimizer.zero_grad()

        outputs = model(inputs)
        
        loss = criterion(outputs, labels)

        #mse = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        #train_mse += mse.item()
    print(f'Epoch {epoch+1}/{num_epochs}, MSE: {train_loss/len(train_loader):.6g}') #Training Loss: {train_loss/len(train_loader):.6g},
    model.eval()
    
    ### validation
    val_mse_epoch = 0
    with torch.no_grad():
        for val_inputs, val_labels in valid_loader:
            val_inputs = val_inputs.view(val_inputs.size(0), 1, 80).to(device)
            val_labels = val_labels.to(device).view(-1, 1)
            val_outputs = model(val_inputs)
            val_mse_epoch += criterion(val_outputs, val_labels).item()
            
    val_mse_epoch /= len(valid_loader)  
    print(f'Validation MSE: {val_mse_epoch:.6g}')
    mse_valid.append(val_mse_epoch)
    train_loss /= len(train_loader)
    loss_vals.append(train_loss)
    
    ### early stopping
    if val_mse_epoch < best_loss:
        best_loss = val_mse_epoch
        best_weights = model.state_dict()
        patience = 10
    else:
        patience -= 1
        print(f'Patience: {patience}')
        if patience == 0:
            break
    print('----------------------------------------------')

### load the best model        
model.load_state_dict(best_weights)

# Transformer model

In [None]:
class Transformer(nn.Module):
    def __init__(self):
        super().__init__()
        padd = 0
        dial = 1
        k_size = 3
        stride = 1
        trans_input_dim = int((80 + 2*padd - dial * (k_size - 1) -1) /stride + 1)
        #print(trans_input_dim)
        
        self.conv1 = nn.Conv1d(1, out_channels = 1, kernel_size = k_size, stride=stride)
        
        self.enc_transf = nn.TransformerEncoderLayer(d_model=trans_input_dim, nhead=1, dim_feedforward=20, batch_first=True, norm_first=True)
        self.relu = torch.nn.ReLU()
        self.enc_stack = nn.TransformerEncoder(self.enc_transf, num_layers=2, enable_nested_tensor=False)
        self.T = torch.nn.Tanh()
        self.G = torch.nn.GELU()
        self.output_layer = nn.Linear(trans_input_dim, 1)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.T(self.enc_stack(x))
        x = x[:, -1, :]
        x = self.output_layer(x)
        
        return x
    
transformer = Transformer().to(device)


In [None]:
torch.cuda.empty_cache()

criterion = nn.MSELoss()
optimizer = optim.Adam(transformer.parameters(), lr=learning_rate)
#optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)

best_loss = float('inf')

loss_vals = []
#mse_train = []
mse_valid = []

for epoch in range(num_epochs):
    train_loss = 0
    #train_mse = 0
    transformer.train()
    for inputs, labels in train_loader:
        inputs = inputs.view(inputs.size(0), 1, 80).to(device)
        labels = labels.view(-1, 1).to(device)
        optimizer.zero_grad()

        outputs = transformer(inputs)
        
        loss = criterion(outputs, labels)

        #mse = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        #train_mse += mse.item()
        
    print(f'Epoch {epoch+1}/{num_epochs}, MSE: {train_loss/len(train_loader):.6g}') #Training Loss: {train_loss/len(train_loader):.6g},
    loss_vals.append(train_loss/len(train_loader))
    #mse_train.append(train_mse)
    transformer.eval()
    
    ### validation
    val_mse_epoch = 0
    with torch.no_grad():
        for val_inputs, val_labels in valid_loader:
            val_inputs = val_inputs.view(val_inputs.size(0), 1, 80).to(device)
            val_labels = val_labels.view(-1, 1).to(device)
            val_outputs = transformer(val_inputs)
            val_mse_epoch += criterion(val_outputs, val_labels).item()
            
    val_mse_epoch /= len(valid_loader)  # Average over batches
    print(f'Validation MSE: {val_mse_epoch:.6g}')
    mse_valid.append(val_mse_epoch)
    
    ### early stopping
    if val_mse_epoch < best_loss:
        best_loss = val_mse_epoch
        best_weights = transformer.state_dict()
        patience = 10
    else:
        patience -= 1
        print(f'Patience: {patience}')
        if patience == 0:
            break
    print('----------------------------------------------')

### load the best model        
transformer.load_state_dict(best_weights)