In [None]:
import FI_estimation
import torch
from torch import nn
from torch.nn import functional as F
import torch.utils.data as Data
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch import optim
import numpy as np
from matplotlib import pyplot as plt
import os
from scipy import ndimage

torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
from torchvision.models.feature_extraction import get_graph_node_names
from torchvision.models.feature_extraction import create_feature_extractor

### training the ANN

In [None]:
# ---------- load data and prepare training and testing data sets
type = np.float64

data = np.load('Data'+os.sep+'ES_log_normal_train_X.npy') 
positions = np.load('Data'+os.sep+'ES_log_normal_train_Y.npy')

x_tr = data.reshape(data.shape[0]*data.shape[1],data.shape[2])
y_tr = positions.reshape(positions.shape[0]*positions.shape[1],positions.shape[2])[:,0]
x_tst = np.load("Data"+os.sep+"log_normal_test_X.npy")
delta = 0.1

def shuffle_split_data(X, y, ratio=0.5):
    arr_rand = np.random.rand(X.shape[0])
    split = arr_rand < np.percentile(arr_rand, ratio*100)

    X_train = X[split]
    y_train = y[split]
    X_test =  X[~split]
    y_test = y[~split]
    return X_train, X_test, y_train, y_test
X_train, X_val, Y_train, Y_val = shuffle_split_data(x_tr, y_tr, ratio=0.8)
#
X_train = X_train.astype(type)
Y_train = Y_train.astype(type)
X_val = X_val.astype(type)
Y_val = Y_val.astype(type)
#
torch.set_default_tensor_type(torch.DoubleTensor)
train_set = TensorDataset(torch.tensor(X_train), torch.tensor(Y_train).T)
val_set = TensorDataset(torch.tensor(X_val), torch.tensor(Y_val).T)

bs = 128
train_loader = Data.DataLoader(train_set, batch_size=bs, shuffle=True)
validation_loader = Data.DataLoader(val_set, batch_size=bs, shuffle=True)

In [3]:
class Model(nn.Module):
    def __init__(self, n):
        super(Model, self).__init__()
        self.linear1 = nn.Linear(n[0], n[1])
        self.linear2 = nn.Linear(n[1], n[2])
        self.linear3 = nn.Linear(n[2], n[3])
        self.linear4 = nn.Linear(n[3], n[4])
        self.linear5 = nn.Linear(n[4], n[5])
    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = self.linear1(x)
        x = F.relu(x)
        x = self.linear2(x)    
        x = F.relu(x)
        x = self.linear3(x)
        x = F.relu(x)
        x = self.linear4(x)       
        x = F.relu(x)
        x = self.linear5(x)  
        return x
n = [10,150,100,50,25,1]

In [None]:
Nruns = 10
starting_epoch = 0
criterion =  nn.MSELoss()
n_epochs = 20
lr = 1e-4
training_loss = 0.
validation_loss = 0.
net = Model(n).to(device)
all_losses = []
all_losses_val = []
all_losses.append(training_loss/len(train_loader))
all_losses_val.append(validation_loss/len(validation_loader))
#torch.save(net,  'Models' +os.sep+ 'epoch' + str(0)+'Lognormal.pth')
for epoch in range(starting_epoch, starting_epoch + n_epochs):
    optimizer = optim.Adam(net.parameters(), lr=lr, weight_decay=0.00)
    net.train()
    training_loss = 0.
    validation_loss = 0.
    batch_count = 0
    for inputs, target in train_loader:
        inputs = inputs.to(device)
        target = target.to(device)
        batch_count += 1
        y_pred = net(inputs)
        loss = criterion(y_pred,  torch.unsqueeze(target,dim=1)  )
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        training_loss += loss.item()
    net.eval()
    with torch.no_grad():
        for inputs, target in validation_loader:
            inputs = inputs.to(device)
            target = target.to(device)
            y_pred = net(inputs)
            loss_val = criterion(y_pred,  torch.unsqueeze(target,dim=1) )
            validation_loss += loss_val.item()
    print("epoch= ", epoch, ", val= ", validation_loss/len(validation_loader),", train = ", training_loss/len(train_loader))
    #torch.save(net,   'Models' +os.sep+ 'epoch' + str(epoch+1)+'Lognormal.pth')
    all_losses_val.append(validation_loss/len(validation_loader))
    all_losses.append(training_loss/len(train_loader))

For FI based early stopping, increase the size of the network and decrease the learning rate
If y_tr is rescaled then the MSE must be rescaled accordingly in order to be comparable with 1/FI

### Calculating the Fisher information

In [None]:
#------- calculating the FI flow at different epochs
#------- parameters entering the algorithm; they should be adjusted if convergence is not reached
fi_in = 10
kappa = 0.1 # ratio of slopes
gamma = 0.05 # stop when the LFI doesn't improve in the first iterations 
noise = 0.01 # noise added
step = 50 # 10 increase of dimensionality
upperLimit = 1000 # maximum increase 
n_epochs = 20
X = x_tst
X = X.reshape(X.shape[0]*X.shape[1], X.shape[2]) 
X = torch.tensor(X)
print("X.shape", X.shape)
fi_array = []
for epoch in range(n_epochs):
    print("epoch: ", epoch)
    net = Model(n)
    net = torch.load('Models'+os.sep + 'epoch'+ str(epoch)+'Lognormal.pth', map_location=torch.device('cpu'))
    net.eval()
    # ------- extract data arrays
    return_nodes = {
        "linear1": "linear1",
        "linear2": "linear2",
        "linear3": "linear3",
        "linear4": "linear4"}
    model2 = create_feature_extractor(net, return_nodes=return_nodes)
    intermediate_outputs = model2(X)
    linear1 = intermediate_outputs['linear1']
    linear2 = intermediate_outputs['linear2']
    linear3 = intermediate_outputs['linear3']
    linear4 = intermediate_outputs['linear4']
    layer_out = net(X)
    # ------- reshape arrays
    linear1 = linear1.reshape(3 , linear1.shape[0]//3, linear1.shape[1])
    linear2 = linear2.reshape(3 , linear2.shape[0]//3, linear2.shape[1])
    linear3 = linear3.reshape(3 , linear3.shape[0]//3, linear3.shape[1])
    linear4 = linear4.reshape(3 , linear4.shape[0]//3, linear4.shape[1])
    layer_out = layer_out.reshape(3 , layer_out.shape[0]//3, layer_out.shape[1])
    # ------- convert to numpy arrays
    linear1 = linear1.detach().numpy()
    linear2 = linear2.detach().numpy()
    linear3 = linear3.detach().numpy()
    linear4 = linear4.detach().numpy()
    layer_out = layer_out.detach().numpy()
    # ------- apply activation function since the data is extracted directly after the linear transformation
    linear1 = (FI_estimation.ReLU(linear1))
    linear2 = (FI_estimation.ReLU(linear2))
    linear3 = (FI_estimation.ReLU(linear3))
    linear4 = (FI_estimation.ReLU(linear4))
    layer_out = (layer_out)
    # ------- calculate the FI
    fi_1 = FI_estimation.get_FI(linear1, delta, upperLimit, step, kappa, constant_threshold=gamma, noise_factor=noise, biasedLFI=True)
    fi_2 = FI_estimation.get_FI(linear2, delta, upperLimit, step, kappa, constant_threshold=gamma, noise_factor=noise, biasedLFI=True)
    fi_3 = FI_estimation.get_FI(linear3, delta, upperLimit, step, kappa, constant_threshold=gamma, noise_factor=noise, biasedLFI=True)
    fi_4 = FI_estimation.get_FI(linear4, delta, upperLimit, step, kappa, constant_threshold=gamma, noise_factor=noise, biasedLFI=True)
    fi_out = FI_estimation.get_FI(layer_out, delta, upperLimit, step, kappa, constant_threshold=gamma, noise_factor=noise, biasedLFI=True)
    if isinstance(fi_out, np.ndarray): fi_out = fi_out.item()
    fi = np.array([fi_in,fi_1,fi_2,fi_3,fi_4,fi_out])
    print(fi)
    fi_array.append(fi)
    print("#------------------------------------------------------#")