In [None]:
from models import *
from glob import glob
from tqdm.autonotebook import tqdm
import matplotlib.pyplot as plt
import numpy as np 

plt.rcParams["figure.figsize"] = (15,3)
plt.rcParams['figure.dpi'] = 100 # 200 e.g. is really fine, but slower


epochs = 64

ma = lambda x,N : np.convolve(x, np.ones(N)/N, mode='valid')

def open_dataset_file(filename):
    with gzip.open(filename, 'rb') as f:
        return pickle.load(f)

# !wget https://www.dropbox.com/s/xa8ned0lgvz4nff/cavity_model_64epochs.torch

In [None]:
data08 = open_dataset_file('./data/cavity/h_202210.pickle.gzip')
data10 = open_dataset_file('./data/cavity/h_202210.pickle.gzip')
data = data08+data10
# subsampling 4 more times
data = [d[[0,2,4],::8,:] for d in data]

In [None]:
plt.plot(data[0][0,:,0],'r')
plt.plot(data[0][1,:,0],'g')
plt.plot(data[0][2,:,0],'b')
plt.title('amplitude')
plt.legend(['probe amplitude', 'forward amplitude','reflected amplitude'])
plt.show()

In [None]:
data_faulty = data[-30:]
data_healthly = data[:-30]

def shift(d, shift = 2):
    ''' shift signal'''
    d[:,shift:,:] = d[:,:-shift,:]
    return d

def add_noise(d,noise = 0.5, since = 50):
    ''' adds gaussian noise '''
    return d[:,:,since:] + noise * np.random.randn(*d[...,since:].shape)

def nullify(d,N = 1, since = 50):
    idx = np.random.randint(0,d.shape[1], N)
    d[:,idx,since:] = 0.0
    return d
'''
data_faulty[:10] = [shift(d) for d in data_faulty[:10]]
data_faulty[10:20] = [add_noise(d) for d in data_faulty[10:20]]
data_faulty[20:] = [nullify(d) for d in data_faulty[20:]]
'''
# print(data_faulty[0].shape)
for d in data_faulty:
    d[0,5,30] += 5*torch.rand(1)
    # d[0,10,0] -= 5*torch.rand(1)

for d in data_healthly:
    d += 0.1 * np.random.randn(*d.shape)
    
plt.plot(data_faulty[0][0,:,:],'r')
plt.title('anomaly - notice the bump at x=1, it is around 10th pulse')
plt.show()
plt.plot(data_healthly[0][0,:,:],'b')
plt.title('healthly - notice a small noise')
plt.show()

In [None]:

class RNNCavityAnomalyDetector(nn.Module):
    def __init__(self, input_dim, embedding_dim, optimize_bias = True):
        super(RNNCavityAnomalyDetector, self).__init__()
        self.rnn = nn.LSTM(input_dim, embedding_dim, bias = optimize_bias, batch_first = True)
        self.linear = nn.Linear(embedding_dim, embedding_dim, bias = optimize_bias)
        self.c = nn.Parameter(torch.randn(embedding_dim),requires_grad = optimize_bias)
    def phi(self, x):
        '''
        This function is the encoder which embeds inputs 
        '''
        # rnn returns two tuples, first is output, the second are hidden states
        return self.linear(self.rnn(x)[0])
    
    def forward(self, x):
        phi = self.phi(x)
        return torch.linalg.vector_norm(phi - self.c, dim = -1)

input_dim = 3 * 23 
# the original shape is (6,23,signal length), it must be restructured to 
# fit to the GRU structure, i.e. (1,signal lenght, 6 * 23)
data = [torch.tensor(d.reshape((1,input_dim,-1)).transpose(0,2,1)).float() for d in data_healthly + data_faulty]
data = [((d / d.max()) - 0.5) * 2 for d in data]

# 1 for ok data, -1 for the created faults 
labels = torch.cat((torch.ones((len(data_healthly))),-torch.ones((len(data_faulty)))))

model = RNNCavityAnomalyDetector(input_dim, 16)
train = True
if train:
    optimizer = optim.Adam(model.parameters(), lr = 0.01)
    losses = []

    for epoch in tqdm(range(epochs)):
        # if d is ok, then model(d).mean(), otherwise we take the last 
        y = [(model(d)[0,-1] if l == 1 else (model(d)[0,-1]**(-1)))  for d,l in zip(data, labels)]
        # optimisation 
        loss = torch.stack(y).mean()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        losses.append(loss.item())
        plt.subplot(131)
        plt.plot(losses)
        plt.title('Loss')
        plt.subplot(132)
        plt.plot(model(data[150]).flatten().detach()[5:],'b')
        plt.plot(model(data[155]).flatten().detach()[5:],'b')
        plt.plot(model(data[160]).flatten().detach()[5:],'b')
        plt.plot(model(data[-1]).flatten().detach()[5:],'r')
        plt.plot(model(data[-2]).flatten().detach()[5:],'r')
        plt.plot(model(data[-3]).flatten().detach()[5:],'r')
        plt.title('Anomaly scores')
        plt.subplot(133)
        plt.plot(torch.stack(y).detach())
        plt.title('Loss of each sample')
        plt.show()
else:
    model.load_state_dict(torch.load('cavity_model_64epochs.torch'))

In [None]:
print(input_dim)
print(dict(model.linear.named_parameters()).keys())
print(dict(model.rnn.named_parameters())['weight_hh_l0'].shape)
# plt.imshow(dict(model.rnn.named_parameters())['weight_ih_l0'].detach())
plt.imshow(dict(model.rnn.named_parameters())['weight_ih_l0'].detach())
plt.show()