In [None]:
from qcm_data import QCMData, WindowDataset, AutoregressiveLSTM
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from sklearn.model_selection import train_test_split
from scipy.ndimage import uniform_filter, gaussian_filter
from scipy.signal import find_peaks
from scipy.interpolate import interp1d

In [None]:
data = QCMData.import_hdf('data/Z-230203B.h5', 'data')

In [None]:
qcm = data.extract_rotations('s1', limits=(np.deg2rad(270), np.deg2rad(390)))

dqcm = np.diff(qcm)
mins = find_peaks(-dqcm)[0]
bgd = interp1d(mins, dqcm[mins], 'linear', fill_value='extrapolate')(np.arange(dqcm.size))

maxs = find_peaks(dqcm)[0]
maxs = maxs[dqcm[maxs] > 0.8 * (np.max(dqcm) - np.min(dqcm)) + np.min(dqcm)]
period = (maxs[1:] - maxs[:-1]).mean()

sgn = period * uniform_filter(dqcm - bgd, period, mode='nearest')

In [None]:
%matplotlib widget
fig, ax = plt.subplots(figsize=(8, 4))
ax.plot(np.arange(dqcm.size), dqcm)
ax.plot(np.arange(bgd.size), bgd)
ax.plot(np.arange(sgn.size), sgn)
ax.grid(True)
plt.show()

# LSTM

In [None]:
import torch
from torch.utils.data import DataLoader

# torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
is_cuda = torch.cuda.is_available()

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

In [None]:
test_data.shape

In [None]:
data = np.stack([sgn, gaussian_filter(sgn, 50.0)])[:, 50:-50]

batch_size, input_size, output_size, stride = 8, 200, 1, 1
train_data, test_data, val_data = (data[:, :int(0.7 * data.shape[1])],
                                   data[:, int(0.7 * data.shape[1]):int(0.85 * data.shape[1])],
                                   data[:, int(0.85 * data.shape[1]):])

train_dset = WindowDataset(train_data, input_size, output_size, stride)
test_dset = WindowDataset(test_data, input_size, output_size, stride)
val_dset = WindowDataset(val_data, input_size, output_size, stride)

train_loader = DataLoader(train_dset, batch_size=batch_size, shuffle=True, drop_last=True)
test_loader = DataLoader(test_dset, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_dset, batch_size=batch_size, shuffle=True, drop_last=True)

In [None]:
i = np.random.randint(len(train_dset))
x, y = train_dset[i]

%matplotlib widget
fig, ax = plt.subplots(figsize=(8, 4))
ax.plot(np.arange(input_size), x)
ax.scatter(np.arange(input_size, input_size + output_size), y)
# ax.plot(data)
ax.grid(True)
plt.show()

In [None]:
lr = 0.001
epochs = 50
n_layers, hidden_size = 2, 128
out_path = 'Z-230203B_2x128.pt'

criterion = torch.nn.MSELoss()
model = AutoregressiveLSTM(input_size=1, hidden_size=hidden_size,
                           n_layers=n_layers, dropout=0.3)

# model.load_state_dict(torch.load('Z-230203B_256.pt'))

optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=lr, epochs=epochs,
                                                steps_per_epoch=len(train_loader))

In [None]:
val_losses = []

model.eval()
h_t, c_t = model.init_hidden(batch_size)
for x, y in val_loader:
    y_hat, (h_t, c_t) = model.forecast(x, (h_t.data, c_t.data), output_size)

    val_loss = criterion(y_hat, y)
    val_losses.append(val_loss.item())
    
valid_loss_min = np.mean(val_losses)
print(f"Validation loss: {np.mean(valid_loss_min):.26}")

In [None]:
verbose = True

model.train()
for epoch in range(epochs):
    h_t, c_t = model.init_hidden(batch_size)
    
    for counter, (x, y) in enumerate(train_loader):

        optimizer.zero_grad()
        y_hat, (h_t, c_t) = model.forecast(x, (h_t.data, c_t.data), output_size)

        loss = criterion(y_hat, y)
        loss.backward()

        optimizer.step()
        scheduler.step()
        
    h_t, c_t = model.init_hidden(batch_size)
    val_losses = []

    model.eval()
    for x, y in val_loader:
        y_hat, (h_t, c_t) = model.forecast(x, (h_t.data, c_t.data), output_size)

        val_loss = criterion(y_hat, y)
        val_losses.append(val_loss.item())
        
    model.train()
    if verbose:
        print(f"Epoch: {epoch + 1} / {epochs}, Step: {counter}, "\
              f"Loss: {loss.item():.6f}, Val Loss: {np.mean(val_losses):.6f}")

    if np.mean(val_losses) <= valid_loss_min:
        torch.save(model.state_dict(), out_path)
        if verbose:
            print(f"Validation loss decreased ({valid_loss_min:.6f} --> {np.mean(val_losses):.6f}). "\
                    "Saving model ...")
        valid_loss_min = np.mean(val_losses)

In [None]:
# Loading the best model
out_path = 'Z-230203B_256_10pt.pt'
model = AutoregressiveLSTM(input_size=1, hidden_size=256,
                           n_layers=2, dropout=0.3)
model.load_state_dict(torch.load(out_path))

test_losses = []
h_t, c_t = model.init_hidden(batch_size)

model.eval()
for x, y in test_loader:

    y_hat, (h_t, c_t) = model.forecast(x, (h_t.data, c_t.data), output_size)
    test_loss = criterion(y_hat, y)
    test_losses.append(test_loss.item())

print(f"Test loss: {np.mean(test_losses):.26}")

In [None]:
output_size = 10
test_dset = WindowDataset(test_data, input_size, output_size, stride)
test_loader = DataLoader(test_dset, batch_size=batch_size, shuffle=True, drop_last=True)

idx = np.random.randint(len(test_dset))
x, y = test_dset[[idx]]
h = model.init_hidden(1)
y_hat, h = model.forecast(x, h, output_size)

%matplotlib widget
fig, ax = plt.subplots()
ax.plot(np.arange(input_size), x.squeeze())
ax.scatter(np.arange(input_size , input_size + output_size), y, s=5)
ax.scatter(np.arange(input_size , input_size + output_size),
           y_hat.squeeze().detach().numpy(), s=5)
ax.grid(True)
plt.show()

In [None]:
dset = WindowDataset(data, input_size, output_size, stride)

In [None]:
from scipy.signal import lfilter

alpha = 0.01
hidden = model.init_hidden(1)
y_hat, hidden = model.forecast(dset[[0]][0], hidden, output_size - 1)
window = torch.concat((dset[[0]][0], y_hat), dim=1)

curve, curve_hat = [], []
for i in range(1000):
    x, y = dset[[i]]
    y0_hat = model(window[:, -input_size:], hidden)[0]
    window = torch.concat((window, y0_hat), dim=1)[:, -(output_size + input_size):]
    y_hat, hidden = model.forecast(x, hidden, output_size)
    new_window = torch.concat((x, y_hat), dim=1)
    residual = (new_window - window).detach().numpy()
    update = torch.from_numpy(lfilter([1.0 - alpha], [1.0, -alpha], residual)).float()
    window = window + update
    ground_truth = torch.concat((x, y), dim=1)
    curve.append(ground_truth.squeeze()[-1].detach().numpy())
    curve_hat.append(window.squeeze()[-1].detach().numpy())

In [None]:
%matplotlib widget
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(np.arange(input_size + output_size), ground_truth.squeeze().detach().numpy())
ax.plot(np.arange(input_size + output_size), window.squeeze().detach().numpy())
ax.plot(np.arange(input_size + output_size), residual.squeeze())
ax.plot()
plt.show()