In [None]:
import os
import time
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from sklearn.model_selection import train_test_split
from sklearn import preprocessing

from pickle import dump, load

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
mm = preprocessing.MinMaxScaler() # min max scaler
ss = preprocessing.StandardScaler() #standard scaler

def load_data():
    data_dir = '/content/drive/MyDrive/Fibre Laser/'
    files = sorted(os.listdir(data_dir), key=len)
    files.remove('target.npz') 
    files.remove('FibreLaserLSTM.pt')
    files.remove('MinMaxScaler.pkl')

    X = np.empty((0, 16384))
    y = np.empty((0, 5))

    for f in files:
        data = np.load(data_dir + f)
        X = np.append(X, np.array(data.get("output_spectrum")), axis=0).astype(np.float32)
        y = np.append(y, np.array(data.get("laser_parameters")), axis=0).astype(np.float32)

    nan_rows = ~np.isnan(X).any(axis=1)

    X = X[nan_rows]
    y = y[nan_rows]
    
    ss.fit(X)
    mm.fit(y)

    X = ss.transform(X)
    y = mm.transform(y)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.1, random_state=42)

    X_train = torch.from_numpy(X_train)
    y_train = torch.from_numpy(y_train)
    X_test = torch.from_numpy(X_test)
    y_test = torch.from_numpy(y_test)

    return(X_train, X_test, y_train, y_test)

In [None]:
def load_target():
  data_dir = '/content/drive/MyDrive/Fibre Laser/'
  f = 'target.npz'
  target = np.load(data_dir + f)['target_spectrum'].astype(np.float32)
  target = torch.from_numpy(target)
  return target

In [None]:
input_dim = 16384 # represents the size of the input at each time step, e.g. input of dimension 5 will look like this [1, 3, 8, 2, 3]
hidden_dim = 256 # represents the size of the hidden state and cell state at each time step 256
num_layers = 2 # the number of LSTM layers stacked on top of each other 2
num_outputs = 5 # 5 parameters to predict

batch_size = 48
sequence_length = 1

class LSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_outputs, dropout=0.3):
        super(LSTM, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.lstm1 = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first = True)
        self.fc1 = nn.Linear(hidden_dim, hidden_dim)
        self.dout = nn.Dropout(p=0.2)
        self.fc2 = nn.Linear(hidden_dim, num_outputs)

    def forward(self, x):
        hidden_state = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device)
        cell_state = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device)

        out, hidden = self.lstm1(x, (hidden_state, cell_state))
        #out = out.reshape(out.shape[0], -1)
        out = self.fc1(out[:,-1,:])
        out = self.dout(out)
        out = self.fc2(out)
        return out, hidden

    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        hidden = (weight.new(self.num_layers, batch_size, self.hidden_dim).zero_().to(device),
                      weight.new(self.num_layers, batch_size, self.hidden_dim).zero_().to(device))
        return hidden

In [None]:
X, X_test, y, y_test = load_data()

X = X.reshape(-1, sequence_length, input_dim)
X_test = X_test.reshape(-1, sequence_length, input_dim)
print(X.shape)

torch.Size([6428, 1, 16384])


In [None]:
train_data = TensorDataset(X, y)
train_loader = DataLoader(dataset = train_data, batch_size=batch_size, shuffle=True)

num_epochs = 100 # training epochs 100
learning_rate = 0.001 # optimizer lr 0.001

model = LSTM(input_dim=input_dim, hidden_dim=hidden_dim,
             num_layers=num_layers, num_outputs=num_outputs)
model.to(device)

criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    hidden = model.init_hidden(batch_size)
    total_loss = 0.0
    bath_num = 0
    for batch_index, (data, labels) in enumerate(train_loader):
        data, labels = data.to(device), labels.to(device)
        y_pred, hidden = model(data)

        loss = criterion(y_pred, labels)
        total_loss+=loss
        bath_num = batch_index
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print("| Epoch:", epoch, "| Average Loss:", total_loss/bath_num, "|")
    
print("Done")

| Epoch: 0 | Average Loss: tensor(0.0214, grad_fn=<DivBackward0>) |
| Epoch: 1 | Average Loss: tensor(0.0081, grad_fn=<DivBackward0>) |
| Epoch: 2 | Average Loss: tensor(0.0072, grad_fn=<DivBackward0>) |
| Epoch: 3 | Average Loss: tensor(0.0067, grad_fn=<DivBackward0>) |
| Epoch: 4 | Average Loss: tensor(0.0062, grad_fn=<DivBackward0>) |
| Epoch: 5 | Average Loss: tensor(0.0060, grad_fn=<DivBackward0>) |
| Epoch: 6 | Average Loss: tensor(0.0059, grad_fn=<DivBackward0>) |
| Epoch: 7 | Average Loss: tensor(0.0059, grad_fn=<DivBackward0>) |
| Epoch: 8 | Average Loss: tensor(0.0057, grad_fn=<DivBackward0>) |
| Epoch: 9 | Average Loss: tensor(0.0055, grad_fn=<DivBackward0>) |
| Epoch: 10 | Average Loss: tensor(0.0054, grad_fn=<DivBackward0>) |
| Epoch: 11 | Average Loss: tensor(0.0054, grad_fn=<DivBackward0>) |
| Epoch: 12 | Average Loss: tensor(0.0053, grad_fn=<DivBackward0>) |
| Epoch: 13 | Average Loss: tensor(0.0053, grad_fn=<DivBackward0>) |
| Epoch: 14 | Average Loss: tensor(0.0050, g

In [None]:
torch.save(model.state_dict(), '/content/drive/MyDrive/Fibre Laser/FibreLaserLSTM.pt')
dump(mm, open('/content/drive/MyDrive/Fibre Laser/MinMaxScaler.pkl', 'wb'))

In [None]:
model_file = '/content/drive/MyDrive/Fibre Laser/FibreLaserLSTM.pt'
scaler_file = '/content/drive/MyDrive/Fibre Laser/MinMaxScaler.pkl'
if(os.path.exists(model_file) and os.path.exists(scaler_file)):
    model = LSTM(input_dim=16384, hidden_dim=256,
             num_layers=2, num_outputs=5)
    model.to(device)
    model.load_state_dict(torch.load(model_file))
    model.eval()
    criterion = torch.nn.MSELoss()
    mm = load(open(scaler_file, 'rb'))

In [None]:
y_pred, _ = model(X_test.to(device))
loss = criterion(y_pred.to(device), y_test.to(device))
print("Test Loss:", loss)
print(y_pred[:5])
print("---------")
print(y_test[:5])

Test Loss: tensor(0.0046, grad_fn=<MseLossBackward>)
tensor([[0.6194, 0.5978, 0.6248, 0.7605, 0.3688],
        [0.2787, 0.5917, 0.6518, 0.7738, 0.3678],
        [0.2787, 0.5917, 0.6518, 0.7738, 0.3678],
        [0.4985, 0.6017, 0.6196, 0.7650, 0.3691],
        [0.4846, 0.5848, 0.6484, 0.7805, 0.3804]], grad_fn=<SliceBackward>)
---------
tensor([[0.5384, 0.5842, 0.6511, 0.8352, 0.3738],
        [0.2716, 0.6652, 0.6250, 0.7442, 0.3774],
        [0.1658, 0.5176, 0.5884, 0.8496, 0.3296],
        [0.5713, 0.5676, 0.5713, 0.7938, 0.3762],
        [0.3194, 0.4994, 0.6643, 0.7612, 0.3918]])


In [None]:
target = load_target().reshape(-1, sequence_length, input_dim)
target_params, _ = model(target.to(device))
target_params = mm.inverse_transform(target_params.cpu().detach().numpy())
print("Predicted Parameters for Target:", target_params)

Predicted Parameters for Target: [[ 7.6100249e+03  1.1001486e+00 -1.9183685e+00 -1.2154552e+00
   2.1481848e-14]]
