# Introduction
The goal of our project is to build a Convolutional Neural Network (CNN) model capable of achieving over 95% accuracy to classify voxelized point cloud pairs as either "similar" (1) or "dissimilar" (0). The data consists of pairs of voxelized point clouds along with corresponding labels.

## Import Required Libraries

In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import time
from tqdm import tqdm
from sklearn.metrics import accuracy_score
from torchsummary import summary


## Voxelization
The following function takes a point cloud and voxelizes it into a 32x32x32 resolution voxel grid.

In [2]:
def voxelize(pc):
    # Normalize point cloud data
    point_cloud = (pc - pc.min(axis=0)) / (pc.max(axis=0) - pc.min(axis=0))
    voxel_size = 32
    voxelized = np.zeros((voxel_size, voxel_size, voxel_size))
    point_cloud = ((voxel_size - 1) * point_cloud).astype(int)
    voxelized[point_cloud[:, 0], point_cloud[:, 1], point_cloud[:, 2]] = 1
    return voxelized


## Data Preparation
The following class does the following tasks:
- Loads pairs of voxelized point clouds from `.npy` files.
- Uses the voxelization function to convert the point clouds.
- Converts the voxelized data to PyTorch tensors.

The dataset is loaded from the `curated_data` directory as provided in class.

In [3]:
class VoxelDataset(Dataset):
    def __init__(self, file_list, data_directory, subset_size=None): 
        self.data_directory = data_directory
        self.file_list = file_list
        if subset_size is not None:
            self.file_list = self.file_list[:subset_size]  # Use only a subset of files 

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        file_path = os.path.join(self.data_directory, self.file_list[idx])
        data = np.load(file_path, allow_pickle=True)
        part1, part2, label = data

        # Voxelizing the point clouds
        voxel1 = voxelize(part1)
        voxel2 = voxelize(part2)

        # Convert to tensors
        voxel1 = torch.tensor(voxel1, dtype=torch.float32).unsqueeze(0)  # Add channel dimension
        voxel2 = torch.tensor(voxel2, dtype=torch.float32).unsqueeze(0)  # Add channel dimension
        label = torch.tensor(label, dtype=torch.float32)

        return voxel1, voxel2, label

## Data Split, DataLoader and Making a Subset
We split the dataset into **70% training**, **15% validation**, and **15% test** to evaluate model performance effectively.
We also use a dataloader and pass a subset of data through it. This is useful when parameter tuning because passing all of the data through the model takes a long time.

In [None]:
# Define data directory
data_directory = os.path.join(os.getcwd(), "curated_data/")

# Get all files in the directory
all_files = [f for f in os.listdir(data_directory) if f.endswith('.npy')]

# Split data into train, validation, and test sets
train_files, test_files = train_test_split(all_files, test_size=0.3, random_state=42)
val_files, test_files = train_test_split(test_files, test_size=0.5, random_state=42)  # Further split to validation and test

# Create Dataset instances
train_dataset = VoxelDataset(train_files, data_directory) 
val_dataset = VoxelDataset(val_files, data_directory)     
test_dataset = VoxelDataset(test_files, data_directory)   

# Create DataLoaders for each dataset
batch_size = 512
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Training on a subset
total_num_files = len(train_dataset)
subset_size = int(total_num_files * 0.1) # Tunable: for subset size. put 1 for 100% of data
def get_subset_sampler(dataset_size, subset_size):
    indices = np.arange(dataset_size)
    np.random.shuffle(indices)
    subset_indices = indices[:subset_size]
    return SubsetRandomSampler(subset_indices)



## 3D Encoder Class
We define the 3D Encoder model that:
- Uses 3D convolutional layers to extract features from the voxelized input.
- Contains two convolutional layers with ReLU activation.
- The feature maps are flattened, followed by a fully connected (fc) layer that produces a feature vector.
- Prints the shape after each layer for debugging purposes.

### Finetuning
Some of the comments in this class show code we wrote for finetuning. It has been left as comments to showcase some approaches.

In [5]:
class Encoder3D(nn.Module):
    def __init__(self):
        super(Encoder3D, self).__init__()
        self.conv1 = nn.Conv3d(1, 8, kernel_size=3, stride=1, padding=1) # Tunable: Number of filters (4). More filters may capture more features but are computationally expensive. Make changes in progressions like 2, 4, 8, 16, 32,...
        #self.pool = nn.MaxPool3d(kernel_size=2, stride=2)  # Pooling layer to reduce spatial dimensions by half
        self.conv2 = nn.Conv3d(8, 16, kernel_size=3, stride=2, padding=1)  # Tunable: Number of filters (8). Increasing may improve feature extraction at the cost of increased computation.
        #self.dropout = nn.Dropout(p=0.1)
        #self.conv3 = nn.Conv3d(16, 32, kernel_size=3, stride=2, padding=1)
        self.fc = nn.Linear(16 * 16 * 16 * 16, 32)  # Tunable: Output features (32). Increasing this can capture more complex relationships but increases model complexity.

        self.first_forward = True  # Flag to track the first forward pass. We don't want the model to print these values after every epoch.
    def forward(self, x):
        x = F.relu(self.conv1(x))
        if self.first_forward:
            print(f"Shape after conv1: {x.shape}")
        #x = self.pool(x)  # Apply max pooling to reduce spatial dimensions
        #if self.first_forward:
            #print(f"Shape after pooling: {x.shape}")
        x = F.relu(self.conv2(x))
        if self.first_forward:
            print(f"Shape after conv2: {x.shape}")
        #x = self.pool(x)  # Apply max pooling again to reduce spatial dimensions
        #if self.first_forward:
            #print(f"Shape after pooling: {x.shape}")
        x = x.view(x.size(0), -1)  # Flatten for the fully connected layer
        #if self.first_forward:
            #print(f"Shape after dropout: {x.shape}")
        #x = self.dropout(x)  # Drop some of the features to prevent overfitting
        if self.first_forward:
            print(f"Shape after flattening: {x.shape}")
        x = self.fc(x)
        if self.first_forward:
            print(f"Shape after fc: {x.shape}")
            self.first_forward = False  # Disable further shape printing after the first forward pass
        return x



## Siamese Network
The Siamese Network is built using two instances of the shared encoder. It compares the output of both encoders using L1 distance and computes a similarity score. The subnets both have the same weights and architectures. A sigmoid activation function is used.

In [6]:
class Siamese3DCNN(nn.Module):
    def __init__(self):
        super(Siamese3DCNN, self).__init__()
        self.encoder = Encoder3D()
        self.fc_out = nn.Linear(32, 1)  # Tunable: Number of input features (32). Match this to the output of the encoder.
        

    def forward(self, voxel1, voxel2):
        encoded1 = self.encoder(voxel1)
        encoded2 = self.encoder(voxel2)

        # Compute the L1 distance between the two encoded vectors
        combined = torch.abs(encoded1 - encoded2)

        # Pass through fully connected layer to get a single value
        output = self.fc_out(combined)

        # Apply sigmoid to produce a probability output
        output = torch.sigmoid(output)

        return output



### Trainer Class

This class encapsulates the entire training and evaluation process for the Siamese 3D CNN. It includes methods for training, validating, saving the best model, and plotting metrics.

In [7]:
class Siamese3DTrainer:
    def __init__(self, model, train_dataset, val_dataset, test_dataset, learning_rate=0.001, batch_size=128):
        self.model = model
        self.train_dataset = train_dataset
        self.val_dataset = val_dataset
        self.test_dataset = test_dataset
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.device = torch.device('cpu')
        self.model.to(self.device)
        self.criterion = nn.BCELoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate, weight_decay=1e-5)
        self.train_loader = DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)
        self.val_loader = DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False)
        self.test_loader = DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False)
        self.best_model_path = "best_siamese_3dcnn_model.pth"
        #self.dropout = nn.Dropout(p=0.3)

    def train(self, num_epochs):
        training_losses, validation_losses = [], []
        training_accuracies, validation_accuracies = [], []
        best_train_accuracy = 0.0
        best_validation_accuracy = 0.0

        for epoch in tqdm(range(num_epochs), desc="Training Epochs"):
            #for loading subset of data
            train_sampler = get_subset_sampler(len(train_dataset), subset_size)
            train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
            # Training Phase
            self.model.train()
            train_loss, correct_preds, total_preds = 0.0, 0, 0
            start_time = time.time()

            for voxel1, voxel2, labels in tqdm(self.train_loader, leave=False, desc=f"Epoch {epoch+1} Train Batches"):
                # Move data to appropriate device
                voxel1, voxel2, labels = voxel1.to(self.device), voxel2.to(self.device), labels.to(self.device)

                self.optimizer.zero_grad()
                outputs = self.model(voxel1, voxel2)
                labels = labels.view(-1, 1)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()

                train_loss += loss.item()

                # Calculate training accuracy
                predictions = (outputs > 0.5).float()
                correct_preds += (predictions == labels).sum().item()
                total_preds += labels.size(0)

            avg_train_loss = train_loss / len(self.train_loader)
            train_accuracy = correct_preds / total_preds
            training_losses.append(avg_train_loss)
            training_accuracies.append(train_accuracy)

            print(f"\nEpoch [{epoch + 1}/{num_epochs}], Training Loss: {avg_train_loss:.6f}, Training Accuracy: {train_accuracy:.6f}, Time Taken: {time.time() - start_time:.2f}s")

            # Validation Phase
            self.model.eval()
            val_loss, correct_preds, total_preds = 0.0, 0, 0

            with torch.no_grad():
                for voxel1, voxel2, labels in tqdm(self.val_loader, leave=False, desc=f"Epoch {epoch+1} Validation Batches"):
                    # Move data to appropriate device
                    voxel1, voxel2, labels = voxel1.to(self.device), voxel2.to(self.device), labels.to(self.device)
                    outputs = self.model(voxel1, voxel2)
                    labels = labels.view(-1, 1)
                    loss = self.criterion(outputs, labels)
                    val_loss += loss.item()

                    # Calculate validation accuracy
                    predictions = (outputs > 0.5).float()
                    correct_preds += (predictions == labels).sum().item()
                    total_preds += labels.size(0)

            avg_val_loss = val_loss / len(self.val_loader)
            val_accuracy = correct_preds / total_preds
            validation_losses.append(avg_val_loss)
            validation_accuracies.append(val_accuracy)

            print(f"Epoch [{epoch + 1}/{num_epochs}], Validation Loss: {avg_val_loss:.6f}, Validation Accuracy: {val_accuracy:.6f}")

            # Early Stopping and Save Best Model (validation)
            if max(validation_accuracies) > best_validation_accuracy:
                best_validation_accuracy = max(validation_accuracies)
                torch.save(self.model.state_dict(), self.best_model_path)
                print(f"Best model saved at epoch {epoch + 1} with Validation Accuracy: {best_validation_accuracy:.6f}")
            else:
                print(f"Validation accuracy decreased. Stopping early at epoch {epoch + 1}.")
                break

        # Plot training/validation loss and accuracy
        self.plot_metrics(training_losses, validation_losses, training_accuracies, validation_accuracies)

    def evaluate(self):
        # Load the best saved model
        self.model.load_state_dict(torch.load(self.best_model_path))
        self.model.eval()

        all_labels, all_predictions = [], []

        with torch.no_grad():
            for voxel1, voxel2, labels in self.test_loader:
                voxel1, voxel2, labels = voxel1.to(self.device), voxel2.to(self.device), labels.to(self.device)
                outputs = self.model(voxel1, voxel2)
                predictions = (outputs > 0.5).float()

                all_labels.extend(labels.cpu().numpy())
                all_predictions.extend(predictions.cpu().numpy())

        accuracy = accuracy_score(all_labels, all_predictions)
        print(f"Test Accuracy: {accuracy * 100:.2f}%")

    def plot_metrics(self, training_losses, validation_losses, training_accuracies, validation_accuracies):
        epochs = range(1, len(training_losses) + 1)

        plt.figure(figsize=(14, 7))

        plt.figure(figsize=(14, 10))

        # Plot Training & Validation Loss
        plt.subplot(2, 1, 1)
        plt.plot(epochs, training_losses, 'b', label='Training Loss')
        plt.plot(epochs, validation_losses, 'r', label='Validation Loss')
        plt.title('Training and Validation Loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()

        # Plot Training & Validation Accuracy
        plt.subplot(2, 1, 2)
        plt.plot(epochs, training_accuracies, 'b', label='Training Accuracy')
        plt.plot(epochs, validation_accuracies, 'r', label='Validation Accuracy')
        plt.title('Training and Validation Accuracy')
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy')
        plt.legend()

        plt.tight_layout()
        plt.show()


## Visualization of Model Architecture to understand parameters

In [8]:
model = Siamese3DCNN()
summary(model, input_size=[(1, 32, 32, 32), (1, 32, 32, 32)])

Layer (type:depth-idx)                   Param #
├─Encoder3D: 1-1                         --
|    └─Conv3d: 2-1                       224
|    └─Conv3d: 2-2                       3,472
|    └─Linear: 2-3                       2,097,184
├─Linear: 1-2                            33
Total params: 2,100,913
Trainable params: 2,100,913
Non-trainable params: 0


Layer (type:depth-idx)                   Param #
├─Encoder3D: 1-1                         --
|    └─Conv3d: 2-1                       224
|    └─Conv3d: 2-2                       3,472
|    └─Linear: 2-3                       2,097,184
├─Linear: 1-2                            33
Total params: 2,100,913
Trainable params: 2,100,913
Non-trainable params: 0

### Initialize the Model and Trainer, Train the Model and Validate

Here we are training and validating the model on the dataset. 

In [9]:
# Tunable parameters:
    # `learning_rate`: A smaller learning rate (0.001) helps the model converge slowly and stably. Higher rates (e.g., 0.01) may lead to faster but unstable convergence.
    # `batch_size`: Larger batch sizes help with convergence and are faster but need more memory.

# Create the model and trainer
model = Siamese3DCNN()
trainer = Siamese3DTrainer(model=model,
                           train_dataset=train_dataset,
                           val_dataset=val_dataset,
                           test_dataset=test_dataset,
                           learning_rate=0.001,
                           batch_size=512)

# Train the model for 15 epochs
trainer.train(num_epochs=15)

Training Epochs:   0%|          | 0/15 [00:00<?, ?it/s]

Shape after conv1: torch.Size([512, 8, 32, 32, 32])
Shape after conv2: torch.Size([512, 16, 16, 16, 16])
Shape after flattening: torch.Size([512, 65536])
Shape after fc: torch.Size([512, 32])





Epoch [1/15], Training Loss: 0.367198, Training Accuracy: 0.789318, Time Taken: 167.17s


Training Epochs:   7%|▋         | 1/15 [03:00<42:00, 180.03s/it]

Epoch [1/15], Validation Loss: 0.216300, Validation Accuracy: 0.907171
Best model saved at epoch 1 with Validation Accuracy: 0.907171





Epoch [2/15], Training Loss: 0.150920, Training Accuracy: 0.932924, Time Taken: 152.43s


Training Epochs:  13%|█▎        | 2/15 [05:45<37:08, 171.42s/it]

Epoch [2/15], Validation Loss: 0.164234, Validation Accuracy: 0.931842
Best model saved at epoch 2 with Validation Accuracy: 0.931842





Epoch [3/15], Training Loss: 0.081762, Training Accuracy: 0.957254, Time Taken: 169.90s


Training Epochs:  20%|██        | 3/15 [08:50<35:32, 177.72s/it]

Epoch [3/15], Validation Loss: 0.205121, Validation Accuracy: 0.941041
Best model saved at epoch 3 with Validation Accuracy: 0.941041





Epoch [4/15], Training Loss: 0.056201, Training Accuracy: 0.966574, Time Taken: 161.86s


Training Epochs:  27%|██▋       | 4/15 [11:48<32:34, 177.69s/it]

Epoch [4/15], Validation Loss: 0.179997, Validation Accuracy: 0.947941
Best model saved at epoch 4 with Validation Accuracy: 0.947941




## Model Evaluation

In [16]:
# Evaluate the best saved model
trainer.evaluate()

  self.model.load_state_dict(torch.load(self.best_model_path))


Test Accuracy: 97.47%
