In [1]:
import pandas as pd
import numpy as np
import torch
from torch.autograd import Variable
import torch.nn.functional as F
import torch.utils.data as Data
from torch.utils.data import DataLoader, Dataset
from torch import cuda
import math
import torch.optim.lr_scheduler as sched

import matplotlib.pyplot as plt
%matplotlib inline

import imageio

Величины Z в тестовом задании либо близки к 0 либо к 1.0. Неточность я посчитал результатом моделирования нейронной сетью. Поэтому я выбрал рабочую гипотезу, что искомая функция описывается следующем образом:
> ***Если разность между X и Y равняется 0, то и величина Z равняется 0. Иначе Z = 1.***


In [2]:
# Создает gif анимацию процесса тренировки. Необходимо установить make_gif = True, чтобы анимация создалась
# Правда это значительно замедляет работу, включать только по необходимости...:)
def make_train_gif(min_val, max_val, x_minus_y, Z, predictions, samples_rmse):
    plt.cla()
    ax.set_title('Regression Analysis - Batches', fontsize=35)
    ax.set_xlabel('Independent variable', fontsize=24)
    ax.set_ylabel('Dependent variable', fontsize=24)
    ax.set_xlim(min_val - max_val, max_val - min_val)
    ax.set_ylim(-2.0, 2.0)
    ax.scatter(x_minus_y.numpy(), Z.numpy(), color = "blue", alpha=0.3)
    ax.scatter(x_minus_y.numpy(), predictions.numpy(), color='green', alpha=0.3)
    ax.text(0.6*(max_val - min_val), -1.8, 'Epoch = %d' % epoch,
            fontdict={'size': 24, 'color':  'red'})
    ax.text(0.6*(max_val - min_val), -1.5, 'Loss = %.4f' % samples_rmse,
            fontdict={'size': 24, 'color':  'red'})

    fig.canvas.draw()
    image = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
    image  = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))

    my_images.append(image)      


In [3]:
# Утилитный класс итератор для batch тренировки
class DatasetUnknownFunc(Dataset):
    def __init__(self, min_val = 1000, max_val = 2000, balance_data = True):
        xy = np.mgrid[min_val:max_val, min_val:max_val].reshape(2, -1)
        df = pd.DataFrame({'X' : xy[0], 'Y' : xy[1]})

        df['Z'] = (df['X'] != df['Y']).astype('float')
        df['X_minus_Y'] = df.X - df.Y

        if balance_data == True:
          df1 = df[df.X == df.Y]
          df2 = df[df.X != df.Y].sample(len(df1))
          self.data = pd.concat([df1, df2], ignore_index=True, sort=False).reset_index(drop = True)
        else:
          self.data = df
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        return {
          'X': torch.tensor(self.data.X[index], dtype=torch.float),
          'Y': torch.tensor(self.data.Y[index], dtype=torch.float),
          'Z': torch.tensor(self.data.Z[index], dtype=torch.float),
          'x_minus_y' : torch.tensor([self.data.X_minus_Y[index]], dtype=torch.float),
      } 

# Сама нейронная сеть. Тип 3 не работает, надо доделать
class Net(torch.nn.Module):
    def __init__(self, n_feature, n_hidden, n_output, type = 2):
        super(Net, self).__init__()
        self.type = type
        if type == 1:
          self.hidden = torch.nn.Linear(n_feature, n_hidden)   # hidden layer
          self.relu = torch.nn.ReLU()
          self.predict = torch.nn.Linear(n_hidden, n_output)   # output layer
        elif type == 2:
          self.hidden = torch.nn.Linear(n_feature, n_hidden)   # hidden layer
          self.relu = torch.nn.LeakyReLU()
          self.predict = torch.nn.Linear(n_hidden, n_output)   # output layer
        else:
          self.hidden = torch.nn.Linear(n_feature, n_hidden)
          self.relu = torch.nn.LeakyReLU()
          self.hidden1 = torch.nn.Linear(n_hidden, 100)
          self.relu1 = torch.nn.LeakyReLU(),
          self.predict = torch.nn.Linear(100, n_output)

    def forward(self, x_minus_y):
      if self.type == 1:
        x_minus_y = self.hidden(x_minus_y)
        x_minus_y = self.relu(x_minus_y)
        z = self.predict(x_minus_y)
      elif self.type == 2:
        x_minus_y = self.hidden(x_minus_y)
        x_minus_y = self.relu(x_minus_y)
        z = self.predict(x_minus_y)
      else:
        x_minus_y = self.hidden(x_minus_y)
        x_minus_y = self.relu(x_minus_y)
        x_minus_y = self.hidden1(x_minus_y)
        x_minus_y = self.relu1(x_minus_y)
        z = self.predict(x_minus_y)

      return z


def rmspe_func(y_pred, y_true):
    error = 0
    for val1, val2 in zip(y_pred.cpu().numpy(), y_true.cpu().numpy()):
        error += (val2 - val1)*(val2 - val1)
    return error

In [9]:
device = 'cuda' if cuda.is_available() else 'cpu'

BATCH_SIZE = 30
EPOCH = 500
min_val = 1000
max_val = 2000
n_hidden = 100

train_dataset = DatasetUnknownFunc(min_val = min_val, max_val = max_val, balance_data = True)
training_loader = Data.DataLoader(
    dataset=train_dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=True, num_workers=2)

In [10]:
model = Net(n_feature=1, n_hidden=n_hidden, n_output = 1, type = 2)
model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0002)
scheduler = None
loss_function = torch.nn.MSELoss()

In [11]:
make_gif = False

torch.manual_seed(1)

if make_gif == True:
  my_images = []
  fig, ax = plt.subplots(figsize=(16,10))

used_feature = 'x_minus_y'

model.train()
for epoch in range(EPOCH):
  rmse = 0
  nb_tr_examples = 0

  for step ,data in enumerate(training_loader, 0):
      x = data[used_feature].to(device, dtype = torch.float)
      Z = data['Z'].to(device, dtype = torch.float)
      outputs = model(x).to(device, dtype = torch.float)

      loss = loss_function(outputs.view(-1), Z.view(-1))
      rmse += rmspe_func(outputs.data, Z)
      nb_tr_examples+=x.size(0)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      if scheduler != None:
        scheduler.step()

      samples_rmse = math.sqrt(rmse/nb_tr_examples)

      if (make_gif == True) & (step == 0):
        make_train_gif(min_val, 2*max_val, x.cpu(), Z.cpu(), outputs.data.cpu(), samples_rmse)

  if epoch % 100 == 0: 
    epoch_rmse = math.sqrt(rmse/nb_tr_examples)
    print(f"Training RMSE Epoch({epoch}): {epoch_rmse}")

if make_gif == True:
  imageio.mimsave('./curve_model_batch.gif', my_images, fps=12)

Training RMSE Epoch(0): 13.79825738049125
Training RMSE Epoch(100): 0.32672721733238713
Training RMSE Epoch(200): 0.052751522896606236
Training RMSE Epoch(300): 0.024554378690546736
Training RMSE Epoch(400): 0.07671226868376246


In [12]:
testing_loader = DatasetUnknownFunc(min_val = 1, max_val = 10)
testing_loader = Data.DataLoader(
    dataset=testing_loader, 
    batch_size=1, 
    shuffle=False, num_workers=2)

rmse = 0; nb_tr_examples = 0;

counter = 0
result = {}
with torch.no_grad():
  model.eval()
  for _, data in enumerate(testing_loader, 0):
    x = data[used_feature].to(device, dtype = torch.float)
    Z = data['Z'].to(device, dtype = torch.float)
    outputs = model(x)
            
    rmse += rmspe_func(outputs.data, Z)
    nb_tr_examples+=x.size(0)

    result[counter] = outputs.cpu().numpy()[0]
    counter += 1

  result = pd.DataFrame.from_dict(result, orient = 'index').reset_index()
  result.columns = ['id', 'Z_predicted']
  result.set_index('id', inplace = True)
        
  rmse = math.sqrt(rmse/nb_tr_examples)
    
print(f"Validation RMSE: {rmse}\n")

Validation RMSE: 0.1642911146970802



In [13]:
result = result.merge(testing_loader.dataset.data, left_index = True, right_index = True)[['X', 'Y', 'Z', 'Z_predicted']]
result.sort_values('Z_predicted')

Unnamed: 0_level_0,X,Y,Z,Z_predicted
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,1,1,0.0,0.003807
7,8,8,0.0,0.003807
6,7,7,0.0,0.003807
5,6,6,0.0,0.003807
8,9,9,0.0,0.003807
3,4,4,0.0,0.003807
2,3,3,0.0,0.003807
1,2,2,0.0,0.003807
4,5,5,0.0,0.003807
12,3,4,1.0,0.500547
