In [1]:
# -*- coding: utf-8 -*-
# """
# PointNetSeg.ipynb
# Created on Oct Sept 02, 2024
# """

In [2]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
root_dir = "/content/gdrive/MyDrive/ColabNotebooks/IECO/PointNet_Classification_Segmentation/pointnet"

Mounted at /content/gdrive


In [3]:
root_dir

'/content/gdrive/MyDrive/ColabNotebooks/IECO/PointNet_Classification_Segmentation/pointnet'

In [4]:
!pip install path.py -q;
!pip install gdown -q
from path import Path
import sys
import os
import shutil
sys.path.append(root_dir)

In [5]:
import plotly.graph_objects as go
import numpy as np
import scipy.spatial.distance
import math
import random
import os

In [6]:
%pwd

'/content'

In [7]:
def cent_norm(verts):
    """
    Center and normalize the point cloud data.
    """
    centroid = np.mean(verts, axis=0)
    verts = verts - centroid
    furthest_distance = np.max(np.sqrt(np.sum(verts**2, axis=1)))
    verts = verts / furthest_distance
    return verts

def rotation_z(verts, theta):
    """
    Rotate the point cloud around the z-axis by theta degrees.
    """
    theta = np.radians(theta)
    rotation_matrix = np.array([
        [np.cos(theta), -np.sin(theta), 0],
        [np.sin(theta), np.cos(theta), 0],
        [0, 0, 1]
    ])
    return np.dot(verts, rotation_matrix)

def add_noise(verts, noise_level=0.01):
    """
    Add random noise to the point cloud data.
    """
    noise = np.random.normal(0, noise_level, verts.shape)
    return verts + noise

In [8]:
import random

def read_pts(file):
    verts = np.genfromtxt(file)
    return cent_norm(verts)
    # return utils.cent_norm(verts)
    #return verts

def read_seg(file):
    verts = np.genfromtxt(file, dtype= (int))
    return verts

def sample_2000(pts, pts_cat):
    res1 = np.concatenate((pts,np.reshape(pts_cat, (pts_cat.shape[0], 1))), axis= 1)
    res = np.asarray(random.choices(res1, weights=None, cum_weights=None, k=2000))
    images = res[:, 0:3]
    categories = res[:, 3]
    categories-=np.ones(categories.shape)

    return images, categories

## Model

In [9]:
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F

class Tnet(nn.Module):
   def __init__(self, k=3):
      super().__init__()
      self.k=k
      self.conv1 = nn.Conv1d(k,64,1)
      self.conv2 = nn.Conv1d(64,128,1)
      self.conv3 = nn.Conv1d(128,1024,1)
      self.fc1 = nn.Linear(1024,512)
      self.fc2 = nn.Linear(512,256)
      self.fc3 = nn.Linear(256,k*k)

      self.bn1 = nn.BatchNorm1d(64)
      self.bn2 = nn.BatchNorm1d(128)
      self.bn3 = nn.BatchNorm1d(1024)
      self.bn4 = nn.BatchNorm1d(512)
      self.bn5 = nn.BatchNorm1d(256)


   def forward(self, input):
      # input.shape == (bs,n,3)
      bs = input.size(0)
      xb = F.relu(self.bn1(self.conv1(input)))
      xb = F.relu(self.bn2(self.conv2(xb)))
      xb = F.relu(self.bn3(self.conv3(xb)))
      pool = nn.MaxPool1d(xb.size(-1))(xb)
      flat = nn.Flatten(1)(pool)
      xb = F.relu(self.bn4(self.fc1(flat)))
      xb = F.relu(self.bn5(self.fc2(xb)))

      #initialize as identity
      init = torch.eye(self.k, requires_grad=True).repeat(bs,1,1)
      if xb.is_cuda:
        init=init.cuda()
      matrix = self.fc3(xb).view(-1,self.k,self.k) + init
      return matrix


class Transform(nn.Module):
   def __init__(self):
        super().__init__()
        self.input_transform = Tnet(k=3)
        self.feature_transform = Tnet(k=128)
        self.fc1 = nn.Conv1d(3,64,1)
        self.fc2 = nn.Conv1d(64,128,1)
        self.fc3 = nn.Conv1d(128,128,1)
        self.fc4 = nn.Conv1d(128,512,1)
        self.fc5 = nn.Conv1d(512,2048,1)


        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(128)
        self.bn4 = nn.BatchNorm1d(512)
        self.bn5 = nn.BatchNorm1d(2048)

   def forward(self, input):
        n_pts = input.size()[2]
        matrix3x3 = self.input_transform(input)
        xb = torch.bmm(torch.transpose(input,1,2), matrix3x3).transpose(1,2)
        outs = []

        out1 = F.relu(self.bn1(self.fc1(xb)))
        outs.append(out1)
        out2 = F.relu(self.bn2(self.fc2(out1)))
        outs.append(out2)
        out3 = F.relu(self.bn3(self.fc3(out2)))
        outs.append(out3)
        matrix128x128 = self.feature_transform(out3)

        out4 = torch.bmm(torch.transpose(out3,1,2), matrix128x128).transpose(1,2)
        outs.append(out4)
        out5 = F.relu(self.bn4(self.fc4(out4)))
        outs.append(out5)

        xb = self.bn5(self.fc5(out5))

        xb = nn.MaxPool1d(xb.size(-1))(xb)
        out6 = nn.Flatten(1)(xb).repeat(n_pts,1,1).transpose(0,2).transpose(0,1)#.repeat(1, 1, n_pts)
        outs.append(out6)


        return outs, matrix3x3, matrix128x128


class PointNetSeg(nn.Module):
    def __init__(self, classes=10):  # Default to 10 classes, but can be set dynamically
        super().__init__()
        self.transform = Transform()

        self.fc1 = nn.Conv1d(3008, 256, 1)
        self.fc2 = nn.Conv1d(256, 256, 1)
        self.fc3 = nn.Conv1d(256, 128, 1)
        self.fc4 = nn.Conv1d(128, classes, 1)  # Use the `classes` parameter here

        self.bn1 = nn.BatchNorm1d(256)
        self.bn2 = nn.BatchNorm1d(256)
        self.bn3 = nn.BatchNorm1d(128)
        self.bn4 = nn.BatchNorm1d(classes)  # Adjust batch norm to the number of classes

        self.logsoftmax = nn.LogSoftmax(dim=1)

    def forward(self, input):
        inputs, matrix3x3, matrix128x128 = self.transform(input)
        stack = torch.cat(inputs, 1)

        xb = F.relu(self.bn1(self.fc1(stack)))
        xb = F.relu(self.bn2(self.fc2(xb)))
        xb = F.relu(self.bn3(self.fc3(xb)))

        output = F.relu(self.bn4(self.fc4(xb)))

        return self.logsoftmax(output), matrix3x3, matrix128x128

## Dataset

In [10]:
from __future__ import print_function, division
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
from torch.utils.data.dataset import random_split
# import utils

class Data(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, root_dir, valid=False, transform=None):

        self.root_dir = root_dir
        self.files = []
        self.valid=valid

        newdir = root_dir + '/data/expert_verified/points_label/'

        for file in os.listdir(newdir):
            o = {}
            o['category'] = newdir + file
            # print(o['category'])
            o['img_path'] = root_dir + '/data/points/'+ file.replace('.seg', '.pts')
            self.files.append(o)



    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        img_path = self.files[idx]['img_path']
        category = self.files[idx]['category']
        with open(img_path, 'r') as f:
            image1 = read_pts(f)
        with open(category, 'r') as f:
            category1 = read_seg(f)
        image2, category2 = sample_2000(image1, category1)
        if not self.valid:
            theta = random.random()*360
            # image2 = utils.rotation_z(utils.add_noise(image2), theta)
            image2 = rotation_z(add_noise(image2), theta)

        return {'image': np.array(image2, dtype="float32"), 'category': category2.astype(int)}


In [11]:
dset = Data(root_dir , transform=None)

In [12]:
dset[0]

{'image': array([[-0.29141837, -0.09733603,  0.06703932],
        [-0.04618602,  0.47226623, -0.0262938 ],
        [ 0.5259993 , -0.5028936 , -0.03009269],
        ...,
        [ 0.08609722,  0.43893936,  0.03292134],
        [-0.01806649, -0.29052722,  0.05339945],
        [-0.28360838, -0.66734713, -0.03541172]], dtype=float32),
 'category': array([1, 1, 0, ..., 1, 1, 0])}

In [13]:
train_num = int(len(dset) * 0.95)
val_num = int(len(dset) *0.05)
if int(len(dset)) - train_num -  val_num >0 :
    train_num = train_num + 1
elif int(len(dset)) - train_num -  val_num < 0:
    train_num = train_num -1
#train_dataset, val_dataset = random_split(dset, [3000, 118])
train_dataset, val_dataset = random_split(dset, [train_num, val_num])
val_dataset.valid=True

print('######### Dataset class created #########')
print('Number of images: ', len(dset))
print('Sample image shape: ', dset[0]['image'].shape)
#print('Sample image points categories', dset[0]['category'], end='\n\n')

train_loader = DataLoader(dataset=train_dataset, batch_size=64)
val_loader = DataLoader(dataset=val_dataset, batch_size=64)

#dataloader = torch.utils.data.DataLoader(dset, batch_size=4, shuffle=True, num_workers=4)

######### Dataset class created #########
Number of images:  233
Sample image shape:  (2000, 3)


## Training loop

In [14]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
print(device)

cuda:0


In [15]:
pointnet = PointNetSeg(classes=2)  # For 2 classes

In [16]:
pointnet.to(device);

In [17]:
optimizer = torch.optim.Adam(pointnet.parameters(), lr=0.001)

In [18]:
def pointnetloss(outputs, labels, m3x3, m128x128, alpha = 0.0001):
    criterion = torch.nn.NLLLoss()
    bs=outputs.size(0)
    id3x3 = torch.eye(3, requires_grad=True).repeat(bs,1,1)
    id128x128 = torch.eye(128, requires_grad=True).repeat(bs,1,1)
    if outputs.is_cuda:
        id3x3=id3x3.cuda()
        id128x128=id128x128.cuda()
    diff3x3 = id3x3-torch.bmm(m3x3,m3x3.transpose(1,2))
    diff128x128 = id128x128-torch.bmm(m128x128,m128x128.transpose(1,2))
    return criterion(outputs, labels) + alpha * (torch.norm(diff3x3)+torch.norm(diff128x128)) / float(bs)

In [19]:
# # Remove Previous Models
# if os.path.exists(root_dir+"/modelsSeg_v0/"):
#     shutil.rmtree(root_dir+"/modelsSeg_v0/")

In [20]:
# from os.path import exists
# def train(model, train_loader, val_loader=None,  epochs=15, save=True):
#     for epoch in range(epochs):
#         pointnet.train()
#         running_loss = 0.0
#         for i, data in enumerate(train_loader, 0):
#             inputs, labels = data['image'].to(device), data['category'].to(device)
#             optimizer.zero_grad()
#             outputs, m3x3, m64x64 = pointnet(inputs.transpose(1,2))

#             # Debugging prints for labels and outputs
#             # print("Unique labels:", torch.unique(labels))
#             # print("Outputs shape:", outputs.shape)

#             # print("Shape of m64x64:", m64x64.shape)
#             # print("Outputs shape:", outputs.shape)
#             # print("Labels shape:", labels.shape)
#             # print("Outputs min/max:", outputs.min(), outputs.max())
#             # print("Labels min/max:", labels.min(), labels.max())

#             loss = pointnetloss(outputs, labels, m3x3, m64x64)
#             loss.backward()
#             optimizer.step()

#             # print statistics
#             running_loss += loss.item()
#             if i % 10 == 9:    # print every 10 mini-batches
#                     print('[%d, %5d] loss: %.3f' %
#                         (epoch + 1, i + 1, running_loss / 10))
#                     running_loss = 0.0

#         pointnet.eval()
#         correct = total = 0

#         # validation
#         if val_loader:
#             with torch.no_grad():
#                 for data in val_loader:
#                     inputs, labels = data['image'].to(device), data['category'].to(device)
#                     outputs, __, __ = pointnet(inputs.transpose(1,2))
#                     _, predicted = torch.max(outputs.data, 1)
#                     total += labels.size(0) * labels.size(1) ##
#                     correct += (predicted == labels).sum().item()
#             val_acc = 100 * correct / total
#             print('Valid accuracy: %d %%' % val_acc)

#         # save the model
#         if save:
#             if not os.path.exists(root_dir+"/modelsSeg_v0/"):
#                 os.mkdir(root_dir+"/modelsSeg_v0")
#             torch.save(pointnet.state_dict(), root_dir+"/modelsSeg_v0/"+str(epoch)+"_"+str(val_acc))


In [21]:
# train(pointnet, train_loader, val_loader,  save=True)

## test

In [22]:
pointnet = PointNetSeg(classes=2)  # For 2 classes
# MODEL_PATH = "/content/gdrive/MyDrive/ColabNotebooks/IECO/PointNet_Classification_Segmentation/pointnet/modelsSeg/14_88.45"
MODEL_PATH = os.path.join(root_dir, "modelsSeg_v0/14_88.45")
# pointnet.load_state_dict(torch.load(root_dir+"/modelsSeg/"+"14_88.98358208955224"))
pointnet.load_state_dict(torch.load(MODEL_PATH))

pointnet.eval()

  pointnet.load_state_dict(torch.load(MODEL_PATH))


PointNetSeg(
  (transform): Transform(
    (input_transform): Tnet(
      (conv1): Conv1d(3, 64, kernel_size=(1,), stride=(1,))
      (conv2): Conv1d(64, 128, kernel_size=(1,), stride=(1,))
      (conv3): Conv1d(128, 1024, kernel_size=(1,), stride=(1,))
      (fc1): Linear(in_features=1024, out_features=512, bias=True)
      (fc2): Linear(in_features=512, out_features=256, bias=True)
      (fc3): Linear(in_features=256, out_features=9, bias=True)
      (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (bn2): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (bn3): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (bn4): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (bn5): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (feature_transform): Tnet(
      (conv1): Conv1d(128, 64, kernel_size

In [23]:
batch = next(iter(val_loader))
pred = pointnet(batch['image'].transpose(1,2))
pred_np = np.array(torch.argmax(pred[0],1));
pred_np

array([[0, 0, 0, ..., 0, 0, 0],
       [0, 1, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 1, 0],
       ...,
       [0, 0, 1, ..., 0, 0, 0],
       [1, 0, 0, ..., 0, 1, 0],
       [0, 1, 0, ..., 0, 0, 0]])

In [24]:
len(batch['image'])

11

In [25]:
batch['image'][-1].shape

torch.Size([2000, 3])

In [26]:
batch['image'][-1]

tensor([[-0.5776,  0.6678, -0.0140],
        [-0.0317,  0.1609,  0.0669],
        [ 0.3663,  0.5228,  0.0155],
        ...,
        [-0.4952, -0.1351, -0.0574],
        [-0.1348,  0.6788, -0.0608],
        [-0.1832,  0.3579, -0.0737]])

In [27]:
len(batch['category'])

11

In [28]:
batch['category'][0]

tensor([0, 0, 0,  ..., 0, 0, 0])

In [29]:
pred_np==np.array(batch['category'])

array([[ True,  True,  True, ...,  True,  True,  True],
       [ True, False,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       ...,
       [ True,  True,  True, ...,  True,  True,  True],
       [ True,  True, False, ...,  True,  True,  True],
       [ True, False,  True, ...,  True,  True, False]])

In [30]:
acc = (pred_np==np.array(batch['category']))

In [31]:
resulting_acc = np.sum(acc, axis=1) / 2000

In [32]:
resulting_acc

array([0.936 , 0.886 , 0.854 , 0.877 , 0.7985, 0.924 , 0.8475, 0.9685,
       0.894 , 0.888 , 0.7075])

In [33]:
pred_np

array([[0, 0, 0, ..., 0, 0, 0],
       [0, 1, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 1, 0],
       ...,
       [0, 0, 1, ..., 0, 0, 0],
       [1, 0, 0, ..., 0, 1, 0],
       [0, 1, 0, ..., 0, 0, 0]])

In [35]:
i = 0
x,y,z=np.array(batch['image'][i]).T
# c = np.array(batch['category'][0]).T
c = pred_np[i].tolist()

fig = go.Figure(data=[go.Scatter3d(x=x, y=y, z=z,
                                   mode='markers',
                                   marker=dict(
        size=30,
        color=c,                # set color to an array/list of desired values
        colorscale='Viridis',   # choose a colorscale
        opacity=1.0
    ))])
fig.update_traces(marker=dict(size=2,
                              line=dict(width=2,
                                        color='DarkSlateGrey')),
                  selector=dict(mode='markers'))
fig.show()