In [None]:
%%capture
import wandb
import pandas as pd
import numpy as np
import torch
from torch.autograd import Variable
import torch.nn.functional as F
import torch.utils.data as Data
from torch.utils.data import DataLoader, Dataset
from torch import cuda
import math
import torch.optim.lr_scheduler as sched
import os
import random
from captum.attr import IntegratedGradients, DeepLift, NoiseTunnel, GradientShap, FeatureAblation, LayerConductance

In [None]:
use_seed = True

if use_seed == True:
    torch.backends.cudnn.deterministic = True
    random.seed(hash("setting random seeds") % 2**32 - 1)
    np.random.seed(hash("improves reproducibility") % 2**32 - 1)
    torch.manual_seed(hash("by removing stochasticity") % 2**32 - 1)
    torch.cuda.manual_seed_all(hash("so runs are repeatable") % 2**32 - 1)

In [None]:
BATCH_SIZE = 34
EPOCH = 501
min_val = 1000
max_val = 2000
n_hidden = 64
LR = 0.01
step_size = 5
nw_type = 'best model'

In [None]:
config = wandb.config
config = {
  "learning_rate": LR,
  "epochs": EPOCH,
  "batch_size": BATCH_SIZE
}

os.environ["WANDB_API_KEY"] = "local-64206535a95a27db0c9d4badb97d8383f11f6500"
run = wandb.init(project="XOR_modeling", entity="roman", config = config)

In [None]:
# Утилитный класс итератор для batch тренировки
class DatasetUnknownFunc(Dataset):
    def __init__(self, min_val = 1000, max_val = 2000, balance_data = True):
        xy = np.mgrid[min_val:max_val, min_val:max_val].reshape(2, -1)
        df = pd.DataFrame({'X' : xy[0], 'Y' : xy[1]})

        df['Z'] = (df['X'] != df['Y']).astype('float')
        df['X_minus_Y'] = df.X - df.Y

        if balance_data == True:
          df1 = df[df.X == df.Y]
          df2 = df[df.X != df.Y].sample(len(df1))
          self.data = pd.concat([df1, df2], ignore_index=True, sort=False).reset_index(drop = True)
        else:
          self.data = df
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        return {
          'X': torch.tensor(self.data.X[index], dtype=torch.float),
          'Y': torch.tensor(self.data.Y[index], dtype=torch.float),
          'Z': torch.tensor(self.data.Z[index], dtype=torch.float),
          'x_minus_y' : torch.tensor([self.data.X_minus_Y[index]], dtype=torch.float),
      } 


  #x = layers.Dense(128, activation=tf.keras.activations.relu, name="dense_1")(inputs)
  #x = layers.Dense(64, activation=tf.keras.activations.relu, name="dense_2")(x)

# Сама нейронная сеть. Тип 3 не работает, надо доделать
class Net(torch.nn.Module):
    def __init__(self, n_feature, n_hidden, n_output, type = 2):
        super(Net, self).__init__()
        self.type = type

        print(f'used type: {type}')

        if type == '1':
          self.hidden = torch.nn.Linear(n_feature, n_hidden)   # hidden layer
          self.relu = torch.nn.ReLU()
          self.predict = torch.nn.Linear(n_hidden, n_output)   # output layer
        elif type == '2':
          self.hidden = torch.nn.Linear(n_feature, n_hidden)   # hidden layer
          self.relu = torch.nn.LeakyReLU()
          self.predict = torch.nn.Linear(n_hidden, n_output)   # output layer
        else:
          self.hidden = torch.nn.Linear(n_feature, n_hidden)
          self.hidden1 = torch.nn.Linear(n_hidden, n_hidden // 2)
          self.predict = torch.nn.Linear(n_hidden // 2, n_output)

    def forward(self, x_minus_y):
      if self.type == '1':
        x_minus_y = self.hidden(x_minus_y)
        x_minus_y = self.relu(x_minus_y)
        z = self.predict(x_minus_y)
      elif self.type == '2':
        x_minus_y = self.hidden(x_minus_y)
        x_minus_y = self.relu(x_minus_y)
        z = self.predict(x_minus_y)
      elif self.type == 'best model':
        x_minus_y = F.relu(self.hidden(x_minus_y))
        x_minus_y = F.relu(self.hidden1(x_minus_y))
        z = self.predict(x_minus_y)
      elif self.type == 'layer normalization':
        x = self.hidden(x_minus_y)
        norm = torch.nn.LayerNorm(x.shape)
        x_minus_y = F.relu(norm(x))
        x = self.hidden1(x_minus_y)
        norm = torch.nn.LayerNorm(x.shape)
        x_minus_y = F.relu(norm(x))
        z = self.predict(x_minus_y)
      elif self.type == 'use dropout':
        dropout = torch.nn.Dropout(p=0.2)
        x = self.hidden(x_minus_y)
        x = dropout(x)
        x_minus_y = F.relu(x)
        x = self.hidden1(x_minus_y)
        x = dropout(x)
        x_minus_y = F.relu(x)
        z = self.predict(x_minus_y)

      return z

def rmspe_func(y_pred, y_true):
    error = 0
    for val1, val2 in zip(y_pred.cpu().numpy(), y_true.cpu().numpy()):
        error += (val2 - val1)*(val2 - val1)
    return error

def validate():
  testing_loader = DatasetUnknownFunc(min_val = 1, max_val = 10)
  testing_loader.data = testing_loader.data.append({'X' : 1, 'Y': 101, 'Z': 1, 'X_minus_Y' : 100}, ignore_index=True)
  testing_loader = Data.DataLoader(
      dataset=testing_loader, 
      batch_size=1, 
      shuffle=False, num_workers=2)

  rmse = 0; nb_tr_examples = 0;

  counter = 0
  result = {}
  with torch.no_grad():
    model.eval()
    for _, data in enumerate(testing_loader, 0):
      x = data[used_feature].to(device, dtype = torch.float)
      Z = data['Z'].to(device, dtype = torch.float)
      outputs = model(x)
              
      rmse += rmspe_func(outputs.data, Z)
      nb_tr_examples+=x.size(0)

      result[counter] = outputs.cpu().numpy()[0]
      counter += 1

    result = pd.DataFrame.from_dict(result, orient = 'index').reset_index()
    result.columns = ['id', 'Z_predicted']
    result.set_index('id', inplace = True)
          
    rmse = math.sqrt(rmse/nb_tr_examples)
    wandb.log({'validate loss': rmse}, commit=False)
  return result, testing_loader.dataset.data

In [None]:
device = 'cuda' if cuda.is_available() else 'cpu'
train_dataset = DatasetUnknownFunc(min_val = min_val, max_val = max_val, balance_data = True)
training_loader = Data.DataLoader(
    dataset=train_dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=True, num_workers=2)

In [None]:
model = Net(n_feature=1, n_hidden=n_hidden, n_output = 1, type = nw_type)
model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=LR)

scheduler = sched.StepLR(optimizer, step_size=step_size, gamma=0.999)
loss_function = torch.nn.MSELoss()

In [None]:
run.watch(model, log='all')

In [None]:
used_feature = 'x_minus_y'

for epoch in range(EPOCH):
  model.train()

  rmse = 0
  nb_tr_examples = 0

  for step ,data in enumerate(training_loader, 0):
      x = data[used_feature].to(device, dtype = torch.float)
      Z = data['Z'].to(device, dtype = torch.float)
      outputs = model(x).to(device, dtype = torch.float)

      loss = loss_function(outputs.view(-1), Z.view(-1))
      rmse += rmspe_func(outputs.data, Z)
      nb_tr_examples+=x.size(0)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      samples_rmse = math.sqrt(rmse/nb_tr_examples)

  if scheduler != None:
    scheduler.step()
    wandb.log({'lr last': scheduler.get_last_lr()[0]}, step=epoch)

  wandb.log({'train loss': samples_rmse})
  
  validate()
  if epoch % 100 == 0: 
    epoch_rmse = math.sqrt(rmse/nb_tr_examples)
    print(f"Training RMSE Epoch({epoch}): {epoch_rmse}")

In [None]:
result, validate_data = validate()
result.merge(validate_data, left_index = True, right_index = True)[['X', 'Y', 'Z', 'Z_predicted']]

In [None]:
use_captum = True

if use_captum == True:
    d = torch.tensor((validate_data.X - validate_data.Y).values.reshape(-1,1)).float()
    attr_algo = IntegratedGradients(model)
    ig_attributions, delta = attr_algo.attribute(d, return_convergence_delta=True)
    values = ig_attributions.cpu().reshape(-1)
    labels = [i for i in range(len(values))]
    data = [[label, val] for (label, val) in zip(labels, values)]
    table = wandb.Table(data=data, columns = ["test sample id", "value"])
    wandb.log({"IntegratedGradients" : wandb.plot.bar(table, "test sample id", "value", title="IntegratedGradients for test samples")})

    layer_cond = LayerConductance(model, model.hidden)
    lc_attributions = layer_cond.attribute(d)
    values = lc_attributions.cpu().sum(axis = 0)
    labels = [i for i in range(len(values))]
    data = [[label, val] for (label, val) in zip(labels, values)]
    table = wandb.Table(data=data, columns = ["neuron id", "value"])
    wandb.log({"LayerConductance for hidden layer" : wandb.plot.bar(table, "neuron id", "value", title="LayerConductance for neurons of the hidden")})

    layer_cond1 = LayerConductance(model, model.hidden1)
    lc_attributions1 = layer_cond1.attribute(d)
    values = lc_attributions1.cpu().sum(axis = 0)
    labels = [i for i in range(len(values))]
    data = [[label, val] for (label, val) in zip(labels, values)]
    table = wandb.Table(data=data, columns = ["neuron id", "value"])
    wandb.log({"LayerConductance for hidden1 layer" : wandb.plot.bar(table, "neuron id", "value", title="LayerConductance for neurons of the hidden1")})    