# Import necesary packages

In [1]:
import math
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib tk
from IPython.display import Image

from lime import explanation
from lime import lime_base

import pickle5 as pickle
import onnx

from sklearn.model_selection import train_test_split


In [2]:
from lime_timeseries import LimeTimeSeriesExplainer

# Dataset

### Load dataset

In [3]:
class a_dataset(object):
    def __init__(self):
        self.X = list()
        self.Y = list()
        self.parents = list()
dataset = a_dataset()

In [4]:
path_to = './data/dataset.pkl'
with open(path_to, 'rb') as inp:
    dataset.__dict__ = pickle.load(inp)

In [5]:
dataset.X = np.array(dataset.X)
dataset.Y = np.array(dataset.Y)

In [6]:
zeroT = True
zeroE = True

plt.figure()
for idx in range(len(dataset.Y)):
    if dataset.Y[idx,0] == 0 and dataset.Y[idx,1] != 0 and zeroT:
        plt.plot(dataset.X[idx,:],label=r'$\Delta T = 0 $',color='tab:blue'); zeroT = False
    elif dataset.Y[idx,1] == 0 and zeroE:
        plt.plot(dataset.X[idx,:], label=r'$\Delta \mu \varepsilon = 0$',color='tab:orange'); zeroE = False
plt.legend()
plt.grid()
plt.show()

### Split dataset


In [7]:
test_per = 0.6
train_per = 0.2
val_per = 0.2

In [8]:
# Array with idxs
idx_compleate = np.linspace( 0, dataset.X.shape[0]-1, dataset.X.shape[0] )

# Generate training index
idx_train, idx_tv = train_test_split(
    idx_compleate, 
    test_size=(1-train_per),
    random_state=1
)
# Generate test and validation index
idx_val, idx_test = train_test_split(
    idx_tv, 
    test_size = test_per/(test_per+val_per),
    random_state=1
)

idx_train = idx_train.astype(int)
idx_val = idx_val.astype(int)
idx_test = idx_test.astype(int)

Xdict = dict.fromkeys(['train', 'val', 'test'])
Ydict = dict.fromkeys(['train', 'val', 'test'])

Xdict['train'] = dataset.X[idx_train,:]
Ydict['train'] = dataset.Y[idx_train,:]
Xdict['val'] = dataset.X[idx_val,:]
Ydict['val'] = dataset.Y[idx_val,:]
Xdict['test'] = dataset.X[idx_test,:]
Ydict['test'] = dataset.Y[idx_test,:]

dataset.X = Xdict
dataset.Y = Ydict

### Set up scalers

In [9]:
class a_scaler(object):
    def __init__(self,X=None):
        if not X is None:
            self.max = np.zeros(X.shape[1])
            self.min = np.zeros(X.shape[1])
            print('Creating scaler for',X.shape[1],'items and',X.shape[0],'samples')
            for i in range(X.shape[1]):
                self.max[i] = np.amax(X[:,i])
                self.min[i] = np.amin(X[:,i])
                if self.max[i] == self.min[i]:
                    print('Column',i,'with single value')
                    self.max[i] = np.amax(X[:,i])*2
                    self.min[i] = 0
                    
    def transform(self,x):

        if isinstance(x,list):
            x = np.array(x)
        if len(x.shape) == 1:
            x = x.reshape(1,-1)

        out = np.empty_like(x)
        for i in range(x.shape[1]):
            out[:,i] = (x[:,i]-self.min[i])/(self.max[i]-self.min[i])
        return out

    def inverse_transform(self,z):

        if isinstance(z,list):
            z = np.array(z)
        if len(z.shape) == 1:
            z = z.reshape(1,-1)

        out = np.empty_like(z)
        for i in range(z.shape[1]):
            out[:,i] = (self.max[i]-self.min[i]) * z[:,i] + self.min[i]
        return out
                    

In [10]:
LoadScalers = True
if LoadScalers:
    scalerX = a_scaler()
    scalerY = a_scaler()
    
    path_to = './data/scalerY.pkl'
    with open(path_to, 'rb') as inp:
        scalerY.__dict__ = pickle.load(inp)
    path_to = './data/scalerX.pkl'
    with open(path_to, 'rb') as inp:
        scalerX.__dict__ = pickle.load(inp)
else:
    scalerX = a_scaler(dataset.X['train'])
    scalerY = a_scaler(dataset.Y['train'])

In [11]:
for key in dataset.X.keys():
    dataset.X[key] = scalerX.transform(dataset.X[key])
    dataset.Y[key] = scalerX.transform(dataset.Y[key])

In [12]:
plt.figure()
plt.plot(dataset.X['test'][0,:],color='tab:blue');
plt.grid()
plt.show()

# Train model

### Define model

In [13]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

# Weights initialization
def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.01)

# Compress sensor layer
class First_Stage_Layer(nn.Module):
    """ Custom Linear layer but mimics a standard linear layer """
    def __init__(self):
        super().__init__()

        # Layer secuence
        self.FC = nn.Sequential(
            nn.Linear(12001, 6000),
            nn.Tanh(),
            nn.Linear(6000, 3000),
            nn.Tanh(),
            # nn.Dropout(p=0.2),
            nn.Linear(3000, 1500),
            nn.Tanh(),
            # nn.Dropout(p=0.2),
            nn.Linear(1500, 750),
            nn.Tanh(),
            # nn.Dropout(p=0.2),
            nn.Linear(750, 375)
        ).apply(init_weights)
    def forward(self, x):
        x = self.FC(x)
        return x

class Second_Stage_Layer(nn.Module):
    """ Custom Linear layer but mimics a standard linear layer """
    def __init__(self):
        super().__init__()
        # Layer secuence
        self.FC = nn.Sequential(
            nn.Linear(375, 350),
            nn.Tanh(),
            nn.Linear(350, 300),
            nn.Tanh(),
            nn.Linear(300, 250),
            nn.Tanh(),
            nn.Linear(250, 200),
            nn.Tanh(),
            nn.Linear(200, 150),
            nn.Tanh(),
            nn.Linear(150, 100),
            nn.Tanh(),
            nn.Linear(100, 50),
            nn.Tanh(),
            nn.Linear(50, 25),
            nn.Tanh(),
            nn.Linear(25, 16),
            nn.Tanh(),
            nn.Linear(16, 4),
            nn.Tanh(),
            nn.Linear(4, 1),
            nn.ReLU()
        ).apply(init_weights)
    def forward(self, x):
        x = self.FC(x)
        return x

class TE(nn.Module):
    def __init__(self):
        super(TE, self).__init__()
        self.cs0 = First_Stage_Layer()
        self.fc_T = Second_Stage_Layer()
        self.fc_E = Second_Stage_Layer()
    def forward(self, x):
        bs = x.shape[0]
        x0 = self.cs0(x.reshape(bs,-1))

        T = self.fc_T(x0).flatten()
        E = self.fc_E(x0).flatten()

        return T, E

class TE_single(nn.Module):
    def __init__(self,TE,n):
        super(TE_single, self).__init__()
        self.TE = TE
        self.n = n
        for p in self.TE.parameters():
            p.requires_grad=False

    def forward(self, x):
        bs = x.shape[0]
        y = self.TE(x.reshape(bs,-1))
        y = y[self.n]
        return y
    
    def proba(self,x):
        bs = x.shape[0]
        x = torch.from_numpy(x.reshape(bs,-1)).float()
        self.TE.eval()
        y = self.TE(x)
        y = y[self.n]
        y = y.numpy()
        y = y.reshape(bs,-1)
        return y


### Load model

In [14]:
pre_model = TE()
pre_model.load_state_dict(torch.load("./data/model.pkl"))

<All keys matched successfully>

In [52]:
model = TE_single(pre_model,0)

In [16]:
input = torch.from_numpy( np.array([dataset.X['test'][-1]]) ).float()
model.eval()
y = model(input)
print(y)

tensor([0.9990])


In [53]:
y = model.proba(np.array([dataset.X['test'][11,:],dataset.X['test'][13,:],dataset.X['test'][12,:]]))
print(y)

[[0.7293267 ]
 [0.98490435]
 [0.25388497]]


# Explanation

In [18]:
idx = 11 # explained instance
num_features = 12 # how many feature contained in explanation
num_slices = 120 # split time series
series = dataset.X['test'][idx,:]
plt.plot(series)
plt.show()

In [54]:
explainer = LimeTimeSeriesExplainer()
exp = explainer.explain_instance(series, model.proba,labels=(0,), num_features=num_features, num_samples=500, num_slices=num_slices)
exp.as_pyplot_figure(label=0)
plt.title('')
plt.xlabel(r'$\Delta T$ (normalized)')
plt.ylabel('Slice\nindex',labelpad = 20).set_rotation(0)

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


### Split signal in its parts for a pretty plot

In [31]:
sp = scalerX.inverse_transform(series.reshape(1,-1))[0]
sk = [r'$P_1 \star P_1$',
        r'$P_1 \star P_2$',
        r'$P_2 \star P_2$',
        r'$S_1 \star S_1$',
        r'$S_1 \star S_2$',
        r'$S_2 \star S_2$',
        r'$\Delta \nu$']
signal = dict.fromkeys(sk)

signal[sk[0]] = [np.linspace(0,2000,2000) ,sp[0:2000]]
signal[sk[1]] = [np.linspace(2000,4000,2000) ,sp[2000:4000]]
signal[sk[2]] = [np.linspace(4000,6000,2000) ,sp[4000:6000]]
signal[sk[3]] = [np.linspace(6000,8000,2000) ,sp[6000:8000]]
signal[sk[4]] = [np.linspace(8000,10000,2000) ,sp[8000:10000]]
signal[sk[5]] = [np.linspace(10000,12000,2000) ,sp[10000:12000]]
signal[sk[6]] = [12001 ,sp[12000]]


In [44]:
plt.figure(figsize=(16,8))
for key, val in signal.items():
    plt.plot(val[0],val[1],label=key) if 'nu' not in key else None
plt.scatter(signal[sk[6]][0],signal[sk[6]][1],label=sk[6], color='tab:pink')
plt.legend()
plt.grid()
plt.show()

In [51]:
values_per_slice = math.ceil(len(series) / num_slices)

plt.figure(figsize=(16,8))
for key, val in signal.items():
    plt.plot(val[0],val[1],label=key)
plt.legend()
plt.show()

max_weight = 0
for i in range(num_features):
    feature, weight = exp.as_list(label=0)[i]
    if weight > max_weight:
        max_weight = weight

for i in range(num_features):
    feature, weight = exp.as_list(label=0)[i]
    start = feature * values_per_slice
    end = start + values_per_slice
    color = 'red' if weight < 0 else 'green' 
    plt.axvspan(start , end, color=color, alpha=abs(weight*0.6/max_weight))
plt.grid()
plt.show()