In [None]:
import numpy as np
import pickle as pkl
import torch
import torch.nn as nn
import torch.nn.functional as F

These are data generation. Don't change. Training data are pairs of neighbor numbers. Validation data are pairs of random numbers.  
The purpose of this setup is that if the model can generalize the relationship of the numbers from simple training data, then it is possible to do numerical reasoning whose training is on limited combination of numbers. 

In [None]:
import itertools
import random
int_pair_list = list(itertools.permutations(list(range(300)), 2))
random.shuffle(int_pair_list)

In [None]:
x1 = [x[0] for x in int_pair_list]
x2 = [x[1] for x in int_pair_list]
x1, x2 = np.array(x1), np.array(x2)
y = (x1 >= x2) * 1 + (x1 > x2) * 1
train_data = [x1[:70000], x2[:70000], y[:70000]]
val_data = [x1[70000:], x2[70000:], y[70000:]]

In [None]:
DEVICE = "cuda"
BSZ = 32

These are the train and validate functions. Don't need to change unless want to try variations of training paradigm such as using multiclass SVM loss.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import Counter

def train(model, train_loader, val_loader, fail_tol, learning_rate=3e-4, label=""):

    num_epochs = 100

#     criterion = torch.nn.MultiMarginLoss()
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
#     scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=3, factor=0.1)
    
    total_step = len(train_loader)
    loss_list, val_acc_list = [], []
    
    fail_cnt, cur_best = 0, 0
    for epoch in range(num_epochs+1):
        
        avg_loss = 0.
        res_cnt = Counter()

        correct = 0
        total = 0
        train_acc = 0.0
        if epoch > 0:
            for i, (x, labels) in enumerate(train_loader):

                model.train()
                optimizer.zero_grad()

                outputs = model(x)
                predicted = outputs.max(1, keepdim=True)[1]
#                 res_cnt.update(list(predicted.squeeze().cpu().numpy()))
                total += labels.size(0)
                correct += predicted.eq(labels.view_as(predicted)).sum().item()
                obj = criterion(outputs, labels)
                obj.backward()
                nn.utils.clip_grad_norm_(model.parameters(), 3)
                optimizer.step()

                avg_loss += obj.item() / len(train_loader)
            
            train_acc = (100 * correct / total)
        val_acc = test_model(val_loader, model)
        val_acc_list.append(val_acc)
        loss_list.append(avg_loss)

        if (val_acc > cur_best):
            print('Epoch: [{}/{}], Loss: {:.4}, Train acc: {:.4}, Val acc: {:.4}'.format(
                epoch, num_epochs, avg_loss, train_acc, val_acc))
            print("found best! save model...")
            torch.save(model.state_dict(), 'model' + "-" + label + '.ckpt')
            cur_best = val_acc
            fail_cnt = 0
        else:
            fail_cnt += 1
            print('Epoch: [{}/{}], Loss: {:.4}, Train acc: {:.4}, Val acc: {:.4} ({}/{})'.format(
                epoch, num_epochs, avg_loss, train_acc, val_acc, fail_cnt, fail_tol))
        if fail_cnt > fail_tol:
            return loss_list, val_acc_list

#         scheduler.step(val_acc)
    return loss_list, val_acc_list

def test_model(loader, model):
    from collections import Counter
    res_cnt = Counter()
    correct = 0
    total = 0
    model.eval()
    for x, labels in loader:
        outputs = model(x)
        predicted = outputs.max(1, keepdim=True)[1]
#         labels = labels.max(1)[1]
#         res_cnt.update(list(predicted.squeeze().cpu().numpy()))
        
        total += labels.size(0)
        correct += predicted.eq(labels.view_as(predicted)).sum().item()
#     print("test", res_cnt)
    return (100 * correct / total)

There are two ways to input the pairs. Concatenation ($N\times 2$) and subtraction ($N\times 1$). Although subtraction should presumably be learned (with concatenation), it doesn't seem to work that way.

In [None]:
from torch.utils.data import Dataset
class numDataset(Dataset):
    def __init__(self, data_list, device=DEVICE):
        self.s1_list, self.s2_list, self.target_list = data_list
        self.device = device
        assert (len(self.s1_list) == len(self.target_list))

    def __len__(self):
        return len(self.target_list)
        
    def __getitem__(self, key):    

        s1_idx = self.s1_list[key]
        s2_idx = self.s2_list[key]  
        label = self.target_list[key]

        return [s1_idx, s2_idx, label, self.device]
    
def collate_func(batch):
    device = batch[0][3]
    data_list, label_list = [], []
    for datum in batch:
        # Can change comma to minus (or minus to comma) in the next line
        data_list.append([datum[0] , datum[1]])
        label_list.append(datum[2])

    return [torch.FloatTensor(np.array(data_list)).to(device), 
            torch.LongTensor(label_list).to(device)]

In [None]:
train_dataset = numDataset(train_data)
val_dataset = numDataset(val_data)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=BSZ,
                                           collate_fn=collate_func,
                                           shuffle=True)
val_loader = torch.utils.data.DataLoader(dataset=val_dataset,
                                           batch_size=BSZ,
                                           collate_fn=collate_func,
                                           shuffle=True)

If using subtraction, input is 1d. Concatenation is 2d.

In [None]:
class fcNet(nn.Module):
    def __init__(self, n_layers, fc_hid_dim, device=DEVICE):
        super(fcNet, self).__init__()
        self.device = device
        self.fc_hid_dim = fc_hid_dim
        self.n_layers = n_layers

        # Specify the input dimension here
        self.linears = nn.ModuleList([nn.Sequential(nn.Linear(2, fc_hid_dim), nn.ReLU())]+
                                     [nn.Sequential(nn.Linear(fc_hid_dim, fc_hid_dim), nn.ReLU())] * (n_layers-2)+
                                     [nn.Linear(fc_hid_dim, 3)])
        self.init_weights()
    def forward(self, x):
        for linear in self.linears:
            x = linear(x)
        return x
    
    def init_weights(self):
        initrange = 0.1
        lin_layers = [layer if type(layer) == torch.nn.modules.linear.Linear else layer[0] for layer in self.linears]
     
        for layer in lin_layers:
            layer.weight.data.uniform_(-initrange, initrange)
            if layer in lin_layers:
                layer.bias.data.fill_(0)

Training is here. Feel free to change the hyperparameters.

In [None]:
n_layers = 4
fc_hid_dim = 32
model = fcNet(n_layers, fc_hid_dim).to(DEVICE)
res = train(model, train_loader, val_loader, 30, learning_rate=3e-5, label="")