In [1]:
import torch
import torch.nn as nn
import numpy as np
from datasets import Dataset

In [2]:
rng = np.random.default_rng(seed=2022)
train_size = 50
test_size = 10
#train_size = train_size+test_size

X_0 = {"loc":(1,1), "scale":1}
X_1 = {"loc":(-1,1), "scale":2}
eps = {"loc":(0,0), "scale":0.2}

f_transform = np.array([[ -1,  0],
                        [0.5,0.5]])

#device = "cuda" if torch.cuda.is_available() else "cpu"
#device = "cpu"

In [3]:
#Adatgen függvények

def f_bijection(array_of_points):
    return array_of_points @ f_transform - 1

def gen_features(distribution, size=1, transform=False):
    features = rng.normal(**distribution, size=(size,2))
    if transform:
        return f_bijection(features)
    else:
        return features
    
def prep_data(feature_list, label_list):
    task = np.zeros([0,3])
    for idx, features in enumerate(feature_list):
        #print(features, label_list[idx], np.ones([features.shape[0],1]) * label_list[idx])
        datapoints = np.concatenate(
            [features, 
             np.ones([features.shape[0],1]) * label_list[idx]], 
            axis=1)
        #print(datapoints)
        task = np.concatenate([task, datapoints])
        #print(task)
    rng.shuffle(task)
    return torch.from_numpy(task)

In [4]:
device = "cuda" if torch.cuda.is_available() else "cpu"
#device = "cpu"

In [5]:
#Adatgenerálás
noise1 = gen_features(eps, train_size)
noise2 = gen_features(eps, train_size)

task1_0 = gen_features(X_0, train_size)
task1_1 = gen_features(X_1, train_size)
Task1 = prep_data([task1_0, task1_1], [0, 1]).to(device)

taskA_0 = gen_features(X_0, train_size, transform=True) + noise1
taskA_1 = gen_features(X_1, train_size, transform=True) + noise2
TaskA = prep_data([taskA_0, taskA_1], [2, 3]).to(device)

taskB_0 = gen_features(X_0, train_size, transform=True) + 10*noise1
taskB_1 = gen_features(X_1, train_size, transform=True) + 10*noise2
TaskB = prep_data([taskB_0, taskB_1], [2, 3]).to(device)

In [6]:
class MockupNetwork(nn.Module):
    def __init__(self, num_classes):
        super(MockupNetwork, self).__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(2, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU(),
            nn.Linear(32, num_classes),
        )
        self.softmax = nn.Softmax()

    def forward(self, x):
        logits = self.linear_relu_stack(x)
        probs = self.softmax(logits)
        return logits

In [7]:
model = MockupNetwork(4).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

In [8]:
def train(data, model, loss_fn, optimizer):
    model.train()
    num_datapoints = data.shape[0]
    for i in range(num_datapoints):
        X, y = data[i,:-1], data[i,-1]
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y.long())

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        #if i % 10 == 0:
        #    loss, current = loss.item(), i * len(X)
        #    print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [9]:
def test(test, model, loss_fn):
    num_datapoints = test.shape[0]
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for i in range(num_datapoints):
            X, y = data[i,:-1], data[i,-1]
            X, y = X.to(device), y.to(device)

            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float)
    test_loss /= num_datapoints
    print(test_loss, correct)

In [10]:
model = model.double()
train(Task1, model, loss_fn, optimizer)

  probs = self.softmax(logits)
