In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.functional as F
from tqdm import tqdm
import pickle

## Load data and create dataloaders

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

device

In [None]:
with open("/kaggle/input/modelnet-minimal/consolidated_feat_train.pkl", "rb") as f:
    all_train_data = pickle.load(f)
print(f"Number of train examples: {len(all_train_data)}")

with open("/kaggle/input/modelnet-minimal/consolidated_feat_test.pkl", "rb") as f:
    all_test_data = pickle.load(f)
print(f"Number of test examples: {len(all_test_data)}")

In [None]:
from torch.utils.data import DataLoader

train_dataloader = DataLoader(all_train_data, batch_size = 256, shuffle = False)
test_dataloader = DataLoader(all_test_data, batch_size = 128, shuffle = False)

## Define model arch

In [None]:
class LinearProjectionHeadBase(nn.Module):
    def __init__(self, input_emb_size = 1024, output_emb_size = 1024, inter_size_1 = 2048, inter_size_2 = 4096, bottle_size = 8192, dropout_rate = 0.20, device = device):
        super().__init__()
        # Initialise parameters
        self.input_emb_size = input_emb_size
        self.output_emb_size = output_emb_size
        self.inter_size_1 = inter_size_1
        self.inter_size_2 = inter_size_2
        self.bottle_size = bottle_size
        self.dropout_rate = dropout_rate
        self.device = device
        
        # Up projection -1
        self.up1 = nn.Sequential(
            nn.Linear(self.input_emb_size, self.inter_size_1),
            nn.PReLU(),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Up projection -2
        self.up2 = nn.Sequential(
            nn.Linear(self.inter_size_1, self.inter_size_2),
            nn.PReLU(),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Bottleneck layer
        self.bottleneck = nn.Sequential(
            nn.Linear(self.inter_size_2, self.bottle_size),
            nn.Tanh(),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Down projection-1
        self.down1 = nn.Sequential(
            nn.Linear(self.bottle_size, self.inter_size_2),
            nn.Tanh(),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Down projection-2
        self.down2 = nn.Sequential(
            nn.Linear(self.inter_size_2, self.inter_size_1),
            nn.Tanh(),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Final projection to output space
        self.fc = nn.Sequential(
            nn.Linear(self.inter_size_1, self.output_emb_size),
        )
        
    def forward(self, x):
        x = x.to(device)
        x = self.up1(x)
        x = self.up2(x)
        x = self.bottleneck(x)
        x = self.down1(x)
        x = self.down2(x)
        x = self.fc(x)
        return x

In [None]:
class LinearProjectionHeadLN(nn.Module):
    def __init__(self, input_emb_size = 1024, output_emb_size = 1024, inter_size_1 = 2048, inter_size_2 = 4096, bottle_size = 8192, dropout_rate = 0.20, device = device):
        super().__init__()
        # Initialise parameters
        self.input_emb_size = input_emb_size
        self.output_emb_size = output_emb_size
        self.inter_size_1 = inter_size_1
        self.inter_size_2 = inter_size_2
        self.bottle_size = bottle_size
        self.dropout_rate = dropout_rate
        self.device = device
        
        # Up projection -1
        self.up1 = nn.Sequential(
            nn.Linear(self.input_emb_size, self.inter_size_1),
            nn.PReLU(),
            nn.LayerNorm(self.inter_size_1),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Up projection -2
        self.up2 = nn.Sequential(
            nn.Linear(self.inter_size_1, self.inter_size_2),
            nn.PReLU(),
            nn.LayerNorm(self.inter_size_2),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Bottleneck layer
        self.bottleneck = nn.Sequential(
            nn.Linear(self.inter_size_2, self.bottle_size),
            nn.Tanh(),
            nn.LayerNorm(self.bottle_size),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Down projection-1
        self.down1 = nn.Sequential(
            nn.Linear(self.bottle_size, self.inter_size_2),
            nn.Tanh(),
            nn.LayerNorm(self.inter_size_2),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Down projection-2
        self.down2 = nn.Sequential(
            nn.Linear(self.inter_size_2, self.inter_size_1),
            nn.Tanh(),
            nn.LayerNorm(self.inter_size_1),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Final projection to output space
        self.fc = nn.Sequential(
            nn.Linear(self.inter_size_1, self.output_emb_size),
        )
        
    def forward(self, x):
        x = x.to(device)
        x = self.up1(x)
        x = self.up2(x)
        x = self.bottleneck(x)
        x = self.down1(x)
        x = self.down2(x)
        x = self.fc(x)
        return x

In [None]:
class LinearProjectionHeadRN(nn.Module):
    def __init__(self, input_emb_size = 1024, output_emb_size = 1024, inter_size_1 = 2048, inter_size_2 = 4096, bottle_size = 8192, dropout_rate = 0.20, device = device):
        super().__init__()
        # Initialise parameters
        self.input_emb_size = input_emb_size
        self.output_emb_size = output_emb_size
        self.inter_size_1 = inter_size_1
        self.inter_size_2 = inter_size_2
        self.bottle_size = bottle_size
        self.dropout_rate = dropout_rate
        self.device = device
        
        # Up projection -1
        self.up1 = nn.Sequential(
            nn.Linear(self.input_emb_size, self.inter_size_1),
            nn.PReLU(),
            nn.LocalResponseNorm(1),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Up projection -2
        self.up2 = nn.Sequential(
            nn.Linear(self.inter_size_1, self.inter_size_2),
            nn.PReLU(),
            nn.LocalResponseNorm(1),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Bottleneck layer
        self.bottleneck = nn.Sequential(
            nn.Linear(self.inter_size_2, self.bottle_size),
            nn.Tanh(),
            nn.LocalResponseNorm(1),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Down projection-1
        self.down1 = nn.Sequential(
            nn.Linear(self.bottle_size, self.inter_size_2),
            nn.Tanh(),
            nn.LocalResponseNorm(1),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Down projection-2
        self.down2 = nn.Sequential(
            nn.Linear(self.inter_size_2, self.inter_size_1),
            nn.Tanh(),
            nn.LocalResponseNorm(1),
            nn.Dropout(p = self.dropout_rate)
        )
        
        # Final projection to output space
        self.fc = nn.Sequential(
            nn.Linear(self.inter_size_1, self.output_emb_size),
        )
        
    def forward(self, x):
        x = x.to(device)
        x = self.up1(x)
        x = self.up2(x)
        x = self.bottleneck(x)
        x = self.down1(x)
        x = self.down2(x)
        x = self.fc(x)
        return x

In [None]:
net = LinearProjectionHeadBase(device = device).to(device)

In [None]:
net.train()

## Define loss function and optimiser

In [None]:
criterion = nn.L1Loss().to(device)

In [None]:
import torch.optim as optim

optimiser = optim.Adam(net.parameters(), lr = 0.005)

## Training loop

In [None]:
NUM_EPOCHS = 1_000
all_scores = []
all_test_scores = []
import torch.nn.functional as F

for epoch in tqdm(range(NUM_EPOCHS)):
    # Training loop
    net.train()
    scores = []
    for i, data in enumerate(train_dataloader, 0):
        # Split the input data into point cloud and image data
        pcl, img = data
        X = torch.Tensor(pcl).to(device)
        y = torch.Tensor(img).to(device)
        
        # Zero the optim
        optimiser.zero_grad()
        
        # Forward + Backward + Optimise
        X_t = net(X)
        X_t = torch.Tensor(X_t).to(device)
        y.to(device)
        loss = criterion(X_t, y)
        scores.append(loss)
        
        loss.backward()
        optimiser.step()
    
    scores = torch.Tensor(scores)
    scores = np.array(scores.detach().cpu())
    all_scores.append((sum(scores)/len(scores))) 
    
    # Testing loop
    test_scores = []
    net.eval()
    with torch.no_grad():
        for i, data in enumerate(test_dataloader, 0):
            # Split the input data into point cloud and image data
            pcl, img = data
            X = torch.Tensor(pcl).to(device)
            y = torch.Tensor(img).to(device)
            # Forward
            X_t = net(X)
            X_t = torch.Tensor(X_t).to(device)
            y.to(device)
            loss = criterion(X_t, y)
            test_scores.append(loss)


        test_scores = torch.Tensor(test_scores)
        test_scores = np.array(test_scores.detach().cpu())
        all_test_scores.append((sum(test_scores)/len(test_scores))) 

In [None]:
import matplotlib.pyplot as plt
all_train_scores = all_scores

_ = plt.plot(all_train_scores)
_ = plt.plot(all_test_scores)
plt.xlabel('Epoch')
plt.ylabel('L1Loss Scores')
plt.title('LinearProjectionHead Scores')
plt.legend()
plt.show()

In [None]:
all_train_scores[-1]

In [None]:
all_test_scores[-1]

## Save the model

In [None]:
PATH = f"mapping_base_Modelnet_{NUM_EPOCHS}.pth"
torch.save(net.state_dict(), PATH)

[Download model](./mapping_base_Modelnet_1000.pth)