## ContiNet -ModelNets_40_Classificiation

In [None]:
import os
import random
import re
from glob import glob
import time
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchmetrics
from torchmetrics.classification import MulticlassMatthewsCorrCoef
import open3d as o3

from open3d.web_visualizer import draw # for non Colab

import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline


In [None]:
# to initialize all the random sequence from always same point
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# General parameters
NUM_TRAIN_POINTS = 2500  # 4096
NUM_TEST_POINTS = 10000
NUM_CLASSES = 40
ROOT = r""
ROOT = r'C:\Users\DIAT-YSD-DS\Desktop\PointNet_Shashi\PointNet_Dataset\ModelNet40_point_cloud_data'

GLOBAL_FEATS = 1600
BATCH_SIZE = 32

In [None]:
# get class label mapping
CATEGORIES = {'airplane': 0, 'bathtub': 1, 'bed': 2, 'bench': 3, 'bookshelf': 4, 'bottle': 5, 'bowl': 6,
              'car': 7, 'chair': 8, 'cone': 9, 'cup': 10, 'curtain': 11, 'desk': 12, 'door': 13,
              'dresser': 14, 'flower_pot': 15, 'glass_box': 16, 'guitar': 17, 'keyboard': 18,
              'lamp': 19, 'laptop': 20, 'mantel': 21, 'monitor': 22, 'night_stand': 23, 'person': 24, 
              'piano': 25, 'plant': 26, 'radio': 27, 'range_hood': 28, 'sink': 29, 'sofa': 30,
              'stairs': 31, 'stool': 32, 'table': 33, 'tent': 34, 'toilet': 35, 'tv_stand': 36,
              'vase': 37, 'wardrobe': 38, 'xbox': 39}

In [None]:
from torch.utils.data import DataLoader
from Augmentation_modelnet40_dataloader import Modelnet40Dataset

# train Dataset & Data Loader
train_dataset = Modelnet40Dataset(ROOT, npoints=NUM_TRAIN_POINTS, split='train')
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# valid Dataset & Data loader
valid_dataset = Modelnet40Dataset(ROOT, npoints=NUM_TRAIN_POINTS, split='valid')
valid_dataloader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=True)

# test Dataset & Data Loader
test_dataset = Modelnet40Dataset(ROOT, npoints=NUM_TEST_POINTS, split='test')
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
sample_dataset = Modelnet40Dataset(ROOT, npoints=5000, split='train')

In [None]:
points, target = sample_dataset[-500]
pcd = o3.geometry.PointCloud()
pcd.points = o3.utility.Vector3dVector(points)
print("Label: ", next(k for k,v in CATEGORIES.items() if v == target))
o3.visualization.draw_plotly([pcd])

### Data Visualization,
#### 1. Training Data

In [None]:
total_train_targets = []
for (_, targets) in train_dataloader: 
    total_train_targets += targets

In [None]:
train_class_bins = np.bincount(total_train_targets)

cmap = plt.cm.get_cmap('tab20', NUM_CLASSES)
colors = [cmap(i) for i in range(NUM_CLASSES-1)]
plt.figure(figsize=(20, 6))
plt.bar(list(CATEGORIES.keys()), train_class_bins,
        color=colors, width=0.7)

#plt.bar(np.arrange(NUM_CLASSES), train_class_bins, width=0.5, color=colors, edgecolor='black')
plt.xticks(list(CATEGORIES.keys()), list(CATEGORIES.keys()), size=13, rotation=90)
plt.ylabel('Counts', size=13)
plt.title('Train Class Frequencies', size=16, pad=20)

train_data_dict = {}
for i in CATEGORIES:
    train_data_dict[i] = train_class_bins[CATEGORIES[i]]
print("Train Class Count:-", train_data_dict, sep='\n')
print("Total train instances :", np.sum(train_class_bins))


2. #### Validation Data

In [None]:
total_valid_targets = []
for (_, targets) in valid_dataloader: 
    total_valid_targets += targets

In [None]:
valid_class_bins = np.bincount(total_valid_targets)

cmap = plt.cm.get_cmap('tab20', NUM_CLASSES)
colors = [cmap(i) for i in range(NUM_CLASSES)]
plt.figure(figsize=(20, 6))
plt.bar(list(CATEGORIES.keys()), valid_class_bins,
        color=colors, width=0.7)

plt.xticks(list(CATEGORIES.keys()), list(CATEGORIES.keys()), size=13, rotation=90)
plt.ylabel('Counts', size=13)
plt.title('Valid Class Frequencies', size=16, pad=20)

valid_data_dict = {}
for i in CATEGORIES:
    valid_data_dict[i] = valid_class_bins[CATEGORIES[i]]
print("Valid Class Count:-", valid_data_dict, sep='\n')
print("Total valid instances :", np.sum(valid_class_bins))


#### 3. Test Data

In [None]:
_total_test_targets = []
for (_, targets) in test_dataloader:
    _total_test_targets += targets

In [None]:
test_class_bins = np.bincount(_total_test_targets)

cmap = plt.cm.get_cmap('tab20', NUM_CLASSES)
colors = [cmap(i) for i in range(NUM_CLASSES)]
plt.figure(figsize=(20, 6))

plt.bar(list(CATEGORIES.keys()), test_class_bins, 
             color=colors, width=0.7)
plt.xticks(list(CATEGORIES.keys()), list(CATEGORIES.keys()), size=13, rotation=90)
plt.ylabel('Counts', size=13)
plt.title('Test Class Frequencies', size=16, pad=20)

test_data_dict = {}
for i in CATEGORIES:
    test_data_dict[i] = test_class_bins[CATEGORIES[i]]
print("Test Class Count:-", test_data_dict, sep='\n')
print("Total test instances :", np.sum(test_class_bins))

#### Traing Scripts

In [None]:
from continet import ContiNetClassification

points, targets = next(iter(train_dataloader))
classifier = ContiNetClassification(k=NUM_CLASSES, num_global_feats = GLOBAL_FEATS, num_points=NUM_TRAIN_POINTS)
out, _, _ = classifier(points.transpose(2, 1))
print(f'Class out shape: {out.shape}')

#### GET DEVICE

In [None]:
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
DEVICE

In [None]:
import torch.optim as optim
from point_net_loss import PointNetLoss

EPOCHS = 150
LR = 0.00004
REG_WEIGHT = 0.001  #0.0005

# Use inverse class weighting
alpha = 1/train_class_bins
alpha = (alpha/alpha.max())

gamma = 1 
optimizer = optim.Adam(classifier.parameters(), lr=LR)

#  This scheduler for GPU training only, else it would be very slow.
#scheduler = torch.optim.lr_scheduler.CyclicLR(optimizer, base_lr=0.0001, max_lr=0.01, 
#                                              step_size_up=3800, cycle_momentum=False)

criterion = PointNetLoss(alpha=alpha, gamma=gamma, reg_weight=REG_WEIGHT, size_average=False).to(DEVICE)
classifier = classifier.to(DEVICE)

In [None]:
mcc_metric = MulticlassMatthewsCorrCoef(num_classes=NUM_CLASSES).to(DEVICE)

#### Begin Training Model
First define a helper function to train, validate, and test

In [None]:
def train_test(classifier, dataloader, num_batch, epoch, split='train'):
    """Function to train or test the model"""

    _loss = []
    _accuracy = []
    _mcc = []

    ## return total targets and predictions for test case only
    total_test_targets = []
    total_test_preds = []

    for i, (points, targets) in enumerate(dataloader, 0):
        points = points.transpose(2, 1).to(DEVICE)
        targets = targets.squeeze().to(DEVICE)

        # zero gradients
        optimizer.zero_grad()

        # get predicated class logits
        preds, _, A = classifier(points)

        # get loss and perform backprop
        loss = criterion(preds, targets, A)

        if split == 'train':
            loss.backward()
            optimizer.step()
            # scheduler.step()

        # get class prediction
        pred_choice = torch.softmax(preds, dim=1).argmax(dim=1)
        correct = pred_choice.eq(targets.data).cpu().sum()
        accuracy = correct.item()/float(BATCH_SIZE)
        mcc = mcc_metric(preds, targets)

        # Update epoch loss and accuracy
        _loss.append(loss.item())
        _accuracy.append(accuracy)
        _mcc.append(mcc.item())

        # add to total targets/preds
        if split == 'test':
            total_test_targets += targets.reshape(-1).cpu().tolist()
            total_test_preds += pred_choice.reshape(-1).cpu().tolist()

        if i % 100 == 0:
            print(f'\t [{epoch}: {i}/{num_batch}] '\
                  + f'{split} loss: {loss.item():.4f} '\
                    f'accuracy: {accuracy:.4f} mcc: {mcc:.4f}')
    epoch_loss = np.mean(_loss)
    epoch_accuracy = np.mean(_accuracy)
    epoch_mcc = np.mean(_mcc)

    print(f'Epoch: {epoch} - {split} Loss: {epoch_loss:.4f} '\
          + f' - {split} Accuracy: {epoch_accuracy:.4f} '\
            + f' - {split} MCC: {epoch_mcc:.4f}')
    
    if split == 'test':
        return epoch_loss, epoch_accuracy, epoch_mcc, total_test_targets, total_test_preds
    else:
        return epoch_loss, epoch_accuracy, epoch_mcc


In [None]:
# stuff for training
from tqdm import tqdm

num_train_batch = int(np.ceil(len(train_dataset)/BATCH_SIZE))
num_valid_batch = int(np.ceil(len(valid_dataset)/BATCH_SIZE))

# lists to store metrics (loss, accuracy, mcc)
train_metrics = []
valid_metrics = []

# Initialize variables to store the best validation accuracy, MCC, and their corresponding model states
best_accuracy = 0
best_valid_mcc = 0
best_valid_accuracy = 0

# Train on EPOCHS
for epoch in tqdm(range(1, EPOCHS+1)):
    ## train loop
    _train_mertics = train_test(classifier, train_dataloader, num_train_batch, epoch, split='train')
    train_metrics.append(_train_mertics)

    ## pause to cool down
    time.sleep(4)

    ## validation loop
    with torch.no_grad():
        # place model in evaluation mode
        classifier = classifier.eval()
        # validate
        _valid_metrics = train_test(classifier, valid_dataloader, num_valid_batch, epoch, split='valid')
        valid_metrics.append(_valid_metrics)

        # Get the current validation accuracy and MCC
        current_valid_accuracy = _valid_metrics[1]
        current_valid_mcc = _valid_metrics[-1]

        # Check if the current validation accuracy is better than the best so far
        if current_valid_accuracy > best_valid_accuracy:
            best_valid_accuracy = current_valid_accuracy
            best_acc_model_state = classifier.state_dict()

        # Check if the current validation MCC is better than the best so far
        if current_valid_mcc > best_valid_mcc:
            best_valid_mcc = current_valid_mcc
            best_mcc_model_state = classifier.state_dict()

# pause to cool down
time.sleep(4)

# Save model with highest valid accuracy
path = os.getcwd()
filename_1 = "continet_acc_cls_model_01.pth"
full_path_1 = os.path.join(path, filename_1)
torch.save(best_acc_model_state, full_path_1)

# Save model with highest valid MCC
filename_2 = "cointnet_mcc_cls_model_01.pth"
full_path_2 = os.path.join(path, filename_2)
torch.save(best_mcc_model_state, full_path_2)

In [None]:
metric_names = ['loss', 'accuracy', 'mcc']
fig, ax = plt.subplots(nrows=len(metric_names), ncols=1, figsize=(8, 6))

for i, m in enumerate(metric_names):
    ax[i].set_title(m)
    train_values = [t[i] for t in train_metrics]
    valid_values = [t[i] for t in valid_metrics]
    ax[i].plot(train_values, label='train')
    ax[i].plot(valid_values, label='valid')
    ax[i].set_xlabel('Epochs')
    ax[i].set_ylabel(m)
    ax[i].legend()

plt.subplots_adjust(wspace=0., hspace=0.35)
plt.show()

## Test Model Performance

In [None]:
MODEL_PATH = full_path_1  #best valid acc model
#MODEL_PATH = full_path_2  # best valid mcc model
classifier = ContiNetClassification(num_points=NUM_TEST_POINTS, num_global_feats=GLOBAL_FEATS, k=NUM_CLASSES).to(DEVICE)
classifier.load_state_dict(torch.load(MODEL_PATH))
classifier.eval();

### Run test loop and get the confusion matrix

In [None]:
num_test_batch = int(np.ceil(len(test_dataset)/BATCH_SIZE))

with torch.no_grad():
    epoch_loss, \
    epoch_accuracy, \
    epoch_mcc, \
    total_test_targets, \
    total_test_preds = train_test(classifier, test_dataloader,
                                  num_test_batch, epoch=1,
                                  split='test')

In [None]:
print(f'Test Loss: {epoch_loss:.4f} '\
      f'- Test Accuracy: {epoch_accuracy:.4f} '\
        f'- Test MCC: {epoch_mcc:.4f}')

In [None]:
from sklearn.metrics import classification_report

# Calculate the classification report
report = classification_report(total_test_targets, total_test_preds, target_names=list(CATEGORIES.keys()))
print(report)

In [None]:
from sklearn.metrics import confusion_matrix
import pandas as pd

test_confusion = pd.DataFrame(confusion_matrix(total_test_targets, total_test_preds),
                              columns=list(CATEGORIES.keys()),
                              index=list(CATEGORIES.keys()))
test_confusion.to_csv('confusion_matrix_for_mcc_model_00.csv')
test_confusion
# Columns represents Predictions and Rows represents labels

In [None]:
# Heat Map Analysis
#import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(15, 15))
sns.heatmap(test_confusion, annot=True, cmap='YlOrRd')
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()

In [None]:
# Calculate per-class accuracy
per_class_acc = {}
total_example = 0.
correct_classified = 0.
for category in (CATEGORIES.keys()):
    true_positives = test_confusion.loc[category, category]
    total_instances = test_confusion.loc[category].sum()
    per_class_acc[category] = true_positives / total_instances
    total_example += total_instances
    correct_classified += true_positives

overall_accuracy = correct_classified/total_example
# Print per-class accuracy
summation_accuray = 0
print("Per-class Accuracy:")

for category, accuracy in per_class_acc.items():
    accuracy = float(accuracy)
    summation_accuray += accuracy
    print(f"{category}: {accuracy:.4f}")

average_accuracy = summation_accuray/len(CATEGORIES)
print("\nAverage Accuracy accros all classes :", average_accuracy.__round__(4))
print("Overall Accuracy: ", overall_accuracy.__round__(4))

In [None]:
from random import randrange

torch.cuda.empty_cache()    # release GPU memory

# test Dataset (Segmentation version for display)
test_sample_dataset = Modelnet40Dataset(ROOT, npoints=NUM_TEST_POINTS, split='test',
                                       normalize=True)

# get random sample from test data
random_idx = randrange(len(test_sample_dataset))
points, seg = test_sample_dataset.__getitem__(random_idx)

# normalize points
#norm_points = test_sample_dataset.normalize_points(points)
norm_points = points

with torch.no_grad():
    norm_points = norm_points.unsqueeze(0).transpose(2, 1).to(DEVICE)
    targets = targets.squeeze().to(DEVICE)

    preds, crit_idxs, _ = classifier(norm_points)
    preds = torch.softmax(preds, dim=1)
    pred_choice = preds.squeeze().argmax()

In [None]:
print(seg)

In [None]:
pred_class = list(CATEGORIES.keys())[pred_choice.cpu().numpy()]
pred_prob = preds[0, pred_choice]
print(f'The predicted class is: {pred_class}, with probability: {pred_prob}')

In [None]:
plt.plot(list(CATEGORIES.values()), preds.cpu().numpy()[0]);
plt.xticks(list(CATEGORIES.values()), list(CATEGORIES.keys()), rotation=90)
plt.title('Predicted Classes')
plt.xlabel('Classes')
plt.ylabel('Probabilities');

In [None]:
pcd = o3.geometry.PointCloud()
pcd.points = o3.utility.Vector3dVector(norm_points[0, :, :].cpu().numpy().T)
pcd.points = o3.utility.Vector3dVector(points.cpu().numpy())
#pcd.colors = o3.utility.Vector3dVector(read_pointnet_colors(seg.numpy()))
for i, j in CATEGORIES.items():
    if seg == j:
        label = i
print("Original class: ",label.title() )
o3.visualization.draw_plotly([pcd])
#draw(pcd, point_size=5)

### Inspecting the critical sets

The critical sets are the points that make up the basic underlying structure of the point cloud. Now we will see how well the model has learned these.

See draw_plotly() source here: https://github.com/isl-org/Open3D/blob/master/python/open3d/visualization/draw_plotly.py

In [None]:
critical_points = points[crit_idxs.squeeze().to(points.device), :]
#critical_point_colors = read_pointnet_colors(seg.numpy())[crit_idxs.cpu().squeeze(), :]

pcd = o3.geometry.PointCloud()
pcd.points = o3.utility.Vector3dVector(critical_points)
#pcd.colors = o3.utility.Vector3dVector(critical_point_colors)

o3.visualization.draw_plotly([pcd])
#draw(pcd, point_size=5) # does not work in Colab