# Read data
Read both affine params and points, the normal datagen pipeline for training should do. 

In [56]:
from utils.utils0 import *
from utils.utils1 import *
from utils.utils1 import ModelParams, model_loader, print_summary#, test_repeat
import matplotlib.pyplot as plt
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Device: {device}')

Device: cuda


In [57]:
model_params = ModelParams(dataset=2, sup=1)
train_dataset = datagen(model_params.dataset, True, model_params.sup)
test_dataset = datagen(model_params.dataset, False, model_params.sup)

Model name:  dataset2_sup1_image1_points0_loss_image0
Model code:  21100_0.001_0_10_1
Model params:  {'dataset': 2, 'sup': 1, 'image': 1, 'points': 0, 'loss_image_case': 0, 'loss_image': MSELoss(), 'loss_affine': <utils.utils1.loss_affine object at 0x7f62b407db80>, 'learning_rate': 0.001, 'decay_rate': 0.96, 'start_epoch': 0, 'num_epochs': 10, 'batch_size': 1, 'model_name': 'dataset2_sup1_image1_points0_loss_image0'}


In [58]:
# draw a pair of images from the test set
outputs = list(train_dataset)[0]

In [59]:
for output in outputs:
    print(output.shape)

torch.Size([1, 1, 256, 256])
torch.Size([1, 1, 256, 256])
torch.Size([1, 2, 3])
torch.Size([1, 81, 2])
torch.Size([1, 81, 2])
torch.Size([1, 81, 2])


# Define NN model
- Input: affine parms (2x3), original points (2x1)
- Output: predicted transformed points (2x1)
- Groundtruth: true transformed points (2x1)

In [60]:
import torch
import torch.nn as nn
import torch.optim as optim

In [61]:
class AffineTransformationNetwork(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size):
        super(AffineTransformationNetwork, self).__init__()

        # Check if hidden_sizes is empty
        if not hidden_sizes:
            self.hidden_layers = nn.ModuleList([])
            self.fc = nn.Linear(input_size, output_size)
        else:
            layers = [nn.Linear(input_size, hidden_sizes[0]), nn.ReLU(), nn.Dropout(0.1)]
            for i in range(1, len(hidden_sizes)):
                layers += [nn.Linear(hidden_sizes[i - 1], hidden_sizes[i]), nn.ReLU(), nn.Dropout(0.1)]
            layers += [nn.Linear(hidden_sizes[-1], output_size)]
            self.hidden_layers = nn.ModuleList(layers)

    def forward(self, affine_matrix, point):
        # Flatten and concatenate the input matrices
        input_data = torch.cat((affine_matrix, point))

        # Pass the input through hidden layers
        for layer in self.hidden_layers:
            input_data = layer(input_data)

        # Reshape the output to (2x1)
        transformed_point = input_data#.view(2)

        return transformed_point

# Example: Create an instance of the neural network with 2 hidden layers and 64 neurons in each hidden layer
input_size = 8  # Affine matrix (2x3) + Point (2)
num_hidden_layers = 2
num_perceptrons = 64
hidden_sizes = num_hidden_layers*[num_perceptrons]
output_size = 2  # Transformed point (2)

model = AffineTransformationNetwork(input_size, hidden_sizes, output_size)
print(model)
model.to(device)
# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
parameters = list(model.parameters())
print(f'Number of parameters: {len(parameters)}')
# print(f'Parameters: {parameters}')
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1400, gamma=0.01)


AffineTransformationNetwork(
  (hidden_layers): ModuleList(
    (0): Linear(in_features=8, out_features=64, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.1, inplace=False)
    (3): Linear(in_features=64, out_features=64, bias=True)
    (4): ReLU()
    (5): Dropout(p=0.1, inplace=False)
    (6): Linear(in_features=64, out_features=2, bias=True)
  )
)
Number of parameters: 6


## Training

In [62]:
train_dataset_new = []
# loop through all items in the train_dataset 
# and modify the input and output as described above
train_bar = tqdm(train_dataset, desc='Training')
# Loop over all training examples
for i, (source_img, target_img, affine_params, \
        matches1, matches2, matches1_2) in enumerate(train_bar):
    for j in range(matches1.shape[1]):
        train_dataset_new.append((torch.cat((affine_params.view(-1), matches1[0][j])), matches2[0][j]))
        if j > 3:
            break   

test_dataset_new = []
# loop through all items in the train_dataset 
# and modify the input and output as described above
test_bar = tqdm(test_dataset, desc='Testing')
# Loop over all training examples
for i, (source_img, target_img, affine_params, \
        matches1, matches2, matches1_2) in enumerate(test_bar):
    for j in range(matches1.shape[1]):
        test_dataset_new.append((torch.cat((affine_params.view(-1), matches1[0][j])), matches2[0][j]))
        if j > 1:
            break   

Training:   7%|▋         | 14/200 [00:00<00:03, 54.27it/s]

Training: 100%|██████████| 200/200 [00:03<00:00, 62.31it/s]
Testing: 100%|██████████| 100/100 [00:01<00:00, 76.70it/s]


In [63]:
# # Create empty list to store epoch number, train loss and validation loss
# epoch_loss_list = []
# running_loss_list = []
# val_loss_list = []
# print('Seed:', torch.seed())

# # Training loop
# epochs = 1000
# for epoch in range(epochs):
#     model.train()
    
#     # optimizer.zero_grad()
#     running_loss = 0.0
    
#     train_bar = tqdm(train_dataset_new, desc='Training Epoch %d/%d' % (epoch+1, epochs))
#     # Loop over all training examples
#     for i, (input, matches1_2) in enumerate(train_bar):
#         optimizer.zero_grad()
#         # Set the inputs and labels to the training device
#         # source_img = source_img.to(device)
#         # target_img = target_img.to(device)
#         affine_params = input[:6].to(device)
#         matches1 = input[6:].to(device)
#         # matches2 = matches2.to(device)
#         matches1_2 = matches1_2.to(device)

#         # optimizer.zero_grad()
#         # for j in range(matches1.shape[1]):
#         # Forward pass
#         predicted_points = model(affine_params, matches1)

#         # Compute the loss
#         loss = criterion(predicted_points, matches1_2)

#         running_loss += loss.item()
        
#         train_bar.set_postfix({'loss': running_loss / (i+1)})

#         # Backward pass and optimization
#         loss.backward()
#         optimizer.step()

#     scheduler.step()
#     epoch_loss_list.append(running_loss / len(train_dataset_new))
#     # if epoch % 10 == 0:
#     #     print(f'Epoch [{epoch + 1}/{epochs}], Loss: {running_loss / len(train_dataset_new)}')

#     # Validation loop
#     model.eval()

#     val_running_loss = 0.0

#     test_bar = tqdm(test_dataset_new, desc='Testing Epoch %d/%d' % (epoch+1, epochs))
#     # Loop over all validation examples
#     for i, (input, matches1_2) in enumerate(test_bar):
#         # Set the inputs and labels to the validation device
#         # source_img = source_img.to(device)
#         # target_img = target_img.to(device)
#         affine_params = input[:6].to(device)
#         matches1 = input[6:].to(device)
#         # matches2 = matches2.to(device)
#         matches1_2 = matches1_2.to(device)

#         # Forward pass
#         predicted_points = model(affine_params, matches1)

#         # Compute the loss
#         loss = criterion(predicted_points, matches1_2)

#         val_running_loss += loss.item()
#         test_bar.set_postfix({'loss': val_running_loss / (i+1)})

#     val_loss_list.append(val_running_loss / len(test_dataset_new))
#     # if epoch % 10 == 0:
#     #     print(f'Epoch [{epoch + 1}/{epochs}], Val Loss: {val_running_loss / len(test_dataset_new)}')

#     # Save the model if the validation loss is the lowest we've seen so far
#     # if val_loss_list[-1] == min(val_loss_list):
#     #     torch.save(model.state_dict(), f'trained_models/affine_NN_{epoch:03d}.pth')
#     #     print('Model saved')

    

In [64]:
# # save model to file train_model/affine_transformation_network.pt
# torch.save(model.state_dict(), f'trained_models/affine_NN_{num_hidden_layers}_{num_perceptrons}.pth')

In [65]:
#  # Plot train loss and validation loss against epoch number
# fig = plt.figure(figsize=(10, 5))
# plt.plot(range(1, epochs+1), epoch_loss_list, label='Running Train Loss', linewidth=3)
# plt.plot(range(1, epochs+1), val_loss_list, label='Running Validation Loss', linewidth=3)
# plt.title('Train Loss')
# plt.legend()
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.grid(True)
# # plt.yscale('log')
# plt.tight_layout()

# # Save plot
# # signaturebar_gray(fig, f'{model_params.get_model_code()} - epoch{model_params.num_epochs} - {timestamp}')
# # fig.savefig(save_plot_name)

# Test

In [69]:
# load model
input_size = 8  # Affine matrix (2x3) + Point (2)
num_hidden_layers = 2
num_perceptrons = 64
hidden_sizes = num_hidden_layers*[num_perceptrons]
model = AffineTransformationNetwork(input_size, hidden_sizes, output_size)
# model.load_state_dict(torch.load(f'trained_models/affine_NN_{num_hidden_layers}_{num_perceptrons}.pth'))
model.load_state_dict(torch.load(f'trained_models/affine_NN_2_64_2000_20231220_200233.pth'))
model.to(device)

AffineTransformationNetwork(
  (hidden_layers): ModuleList(
    (0): Linear(in_features=8, out_features=64, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.1, inplace=False)
    (3): Linear(in_features=64, out_features=64, bias=True)
    (4): ReLU()
    (5): Dropout(p=0.1, inplace=False)
    (6): Linear(in_features=64, out_features=2, bias=True)
  )
)

In [70]:
model.eval()

test_loss_list = []
test_bar = tqdm(test_dataset_new, desc='Testing')
for i, (input, matches1_2) in enumerate(test_bar):
    # Set the inputs and labels to the training device
    # source_img = source_img.to(device)
    # target_img = target_img.to(device)
    affine_params = input[:6].to(device)
    matches1 = input[6:].to(device)
    # matches2 = matches2.to(device)
    matches1_2 = matches1_2.to(device)

    # optimizer.zero_grad()
    # for j in range(matches1.shape[1]):
    # Forward pass
    predicted_points = model(affine_params, matches1)

    # Compute the loss
    loss = criterion(predicted_points, matches1_2)

    test_loss_list.append(loss.item())

    test_bar.set_postfix({'loss': loss.item() / (i+1)})
    print(f'True: {matches1_2.detach().cpu().numpy()}, Predicted: {predicted_points.detach().cpu().numpy()}')
    if i > 10:
        break

# mean and std of the test loss
mean_test_loss = np.mean(test_loss_list)
std_test_loss = np.std(test_loss_list)
print(f'Mean test loss: {mean_test_loss}, Std test loss: {std_test_loss}')

Testing:   4%|▎         | 11/300 [00:00<00:02, 131.63it/s, loss=5.77]

True: [ 79. 140.], Predicted: [ 68.33421 145.55869]
True: [98. 68.], Predicted: [90.15383 84.63431]
True: [221. 132.], Predicted: [182.98907 135.83365]
True: [ 23. 170.], Predicted: [ 43.54608 138.3555 ]
True: [212. 211.], Predicted: [214.14929 165.19345]
True: [ 10. 105.], Predicted: [32.144115 78.57881 ]
True: [113. 118.], Predicted: [130.6988 122.4299]
True: [165.  46.], Predicted: [177.16812  73.19113]
True: [36. 50.], Predicted: [50.301846 78.6088  ]
True: [175.  35.], Predicted: [167.07      55.677673]
True: [128.  20.], Predicted: [130.94019  44.48124]
True: [120. 106.], Predicted: [123.69107 117.1729 ]
Mean test loss: 422.3936487833659, Std test loss: 293.5627552739249



