In [25]:
import torch
import torch.nn as nn

def confidence_loss(output):
    ...
    
class TwoLayerNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(TwoLayerNet, self).__init__()
        self.fully_connected_1 = nn.Linear(input_size, hidden_size)
        self.fully_connected_2 = nn.Linear(hidden_size, output_size)

        print("TwoLayerNet initialized")

    def forward(self, x):
        x = torch.relu(self.fully_connected_1(x))
        x = self.fully_connected_2(x)
        return x

class TwoLayerNetDynamic(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(TwoLayerNet, self).__init__()
        self.fully_connected_1 = nn.Linear(input_size, hidden_size)
        self.early_exit_1 = nn.Linear(hidden_size, output_size)

        self.fully_connected_2 = nn.Linear(hidden_size, output_size)

        print("TwoLayerNetDynamic initialized")

    def forward(self, x):
        i, j = x[:, 0], x[:, 1]
        _sum = i + j
        x = torch.relu(self.fully_connected_1(x))

        if _sum == self.early_exit_1(x).round():
            x = self.early_exit_1(x)
            print("Exiting early")
        else:
            x = self.fully_connected_2(x)
        return x

In [None]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def train_model(model, x_train, y_train, batch_size=5, epochs=100, learning_rate=0.01):
    # pass data to device
    x_train = x_train.to(DEVICE)
    y_train = y_train.to(DEVICE)

    # Check if CUDA is available and move model to GPU if possible
    model.to(DEVICE)

    # Define loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

    # Training loop
    for epoch in range(epochs):
        model.train()
        permutation = torch.randperm(x_train.size(0))

        for i in range(0, x_train.size(0), batch_size):
            indices = permutation[i:i + batch_size]
            batch_x, batch_y = x_train[indices], y_train[indices]

            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

        if (epoch + 1) % 10 == 0:
            epoch_print = f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}"
    
            # Calculate accuracy
            model.eval()
            with torch.no_grad():
                outputs = model(x_train)
                predicted = outputs.round()
                accuracy = (predicted == y_train).sum().item() / y_train.size(0)
                epoch_print += f', Accuracy: {accuracy * 100:.2f}%'

            print(epoch_print)
            if accuracy > 0.90:
                print('Accuracy is above 90%, stopping training')
                break
    
    return model

def eval_model(model, x_test, y_test):
    x_test = x_test.to(DEVICE)
    y_test = y_test.to(DEVICE)

    # Calculate accuracy
    model.eval()
    with torch.no_grad():
        outputs = model(x_test)
        predicted = outputs.round()
        accuracy = (predicted == y_test).sum().item() / y_test.size(0)
        print(f'Evaluation accuracy: {accuracy * 100:.2f}%')

# Prepare data
# Generate training data
train_range = 11
num_classes = 21 # 0-20. max is 10+10=20

x_train = torch.tensor([[i, j] for i in range(train_range) for j in range(train_range)], dtype=torch.float32)
y_train = torch.tensor([[i + j] for (i, j) in x_train], dtype=torch.float32)
# y_one_hot = torch.nn.functional.one_hot(y_train.to(torch.int64), num_classes=21).to(torch.float32)

# Train the model
model = TwoLayerNet(input_size=2, hidden_size=3, output_size=1)
model = train_model(model, epochs=500, x_train=x_train, y_train=y_train)
eval_model(model, x_test=x_train, y_test=y_train)


TwoLayerNet initialized
Epoch [10/500], Loss: 15.7539, Accuracy: 9.09%
Epoch [20/500], Loss: 34.6432, Accuracy: 9.09%
Epoch [30/500], Loss: 0.6336, Accuracy: 43.80%
Epoch [40/500], Loss: 0.8069, Accuracy: 2.48%
Epoch [50/500], Loss: 0.0211, Accuracy: 9.92%
Epoch [60/500], Loss: 0.0531, Accuracy: 28.10%
Epoch [70/500], Loss: 0.0154, Accuracy: 69.42%
Epoch [80/500], Loss: 1.1580, Accuracy: 60.33%
Epoch [90/500], Loss: 0.0002, Accuracy: 87.60%
Epoch [100/500], Loss: 2.5832, Accuracy: 14.05%
Epoch [110/500], Loss: 0.1360, Accuracy: 24.79%
Epoch [120/500], Loss: 0.0000, Accuracy: 91.74%
Accuracy is above 90%, stopping training


In [21]:
# Inside the 'train' fn...
# arguments
batch_size = 5
# function
_permutation = torch.randperm(x_train.size(0))
# print(_permutation)
# for i in range(0, x_train.size(0), batch_size):
i = 0
_indices = _permutation[i:i + batch_size]
_batch_x, _batch_y = x_train[_indices], y_train[_indices]
print(_batch_x)
batch_i, batch_j = _batch_x[:, 0], _batch_x[:, 1]
print(batch_i)
print(batch_j)

_sum = batch_i + batch_j
print(_sum)
print(_batch_y.T)
print(torch.equal(_batch_y.squeeze(dim=1), _sum))
print(_batch_y.squeeze(dim=1).shape)
print(_sum.shape)
# _outputs = model(_batch_x)

tensor([[8., 1.],
        [0., 5.],
        [2., 7.],
        [8., 8.],
        [9., 4.]])
tensor([8., 0., 2., 8., 9.])
tensor([1., 5., 7., 8., 4.])
tensor([ 9.,  5.,  9., 16., 13.])
tensor([[ 9.,  5.,  9., 16., 13.]])
True
torch.Size([5])
torch.Size([5])


## Trying the model

In [7]:
def sum_numbers(x, model):
    model.eval()
    y_pred = model(x)
    return y_pred.item()

i_rand = torch.randint(0, 10, (1,)).item()
j_rand = torch.randint(0, 10, (1,)).item()
x = torch.tensor([i_rand, j_rand], dtype=torch.float32).to(DEVICE)
result = sum_numbers(x, model)

print(f'i: {i_rand}, j: {j_rand}')
print(f'Expected: {i_rand + j_rand}, got: {result}')


i: 5, j: 6
Expected: 11, got: 11.012231826782227


## ONNX export

In [22]:
i_rand = torch.randint(0, 10, (1,)).item()
j_rand = torch.randint(0, 10, (1,)).item()
x = torch.tensor([i_rand, j_rand], dtype=torch.float32).to(DEVICE)

model_name = "two_layer_net"
torch.onnx.export(model=model,args=x, f=f"./models/onnx/{model_name}.onnx", input_names=["input"], output_names=["prediction"])
