In [1]:
%%capture
!pip install open3d pytorch-lightning

Download and prepare a (very) small dataset

In [None]:
!wget 'https://versaweb.dl.sourceforge.net/project/pointclouds/PCD%20datasets/hand_gestures.zip'

In [3]:
!unzip -q hand_gestures.zip
!cp -r hand_gestures/ test/
!cp -r test/ train/ 
!rm test/hand_{0..7}/image_{0002..0009}.pcd
!rm train/hand_{0..7}/image_{0000..0001}.pcd

Inspect an example

In [9]:
import numpy as np
import open3d as o3d
import plotly.graph_objects as go

def show_points(points, colors=None):

    if colors is None:
        colors = np.full((len(points),3),[0, 1, 0]) # fixed color

    fig = go.Figure(
        data=[
            go.Scatter3d(
                x=points[:,0], y=points[:,1], z=points[:,2], 
                mode='markers',
                marker=dict(size=1, color=colors)
            )
        ],
        layout=dict(
            scene=dict(
                xaxis=dict(visible=False),
                yaxis=dict(visible=False),
                zaxis=dict(visible=False)
            )
        )
    )
    fig.show()

In [None]:
cloud = o3d.io.read_point_cloud("test/hand_1/image_0001.pcd")

show_points(np.asarray(cloud.points), colors=np.asarray(cloud.colors))

Create dataset module

In [66]:
import torch
import os
from glob import glob
import random

class Dataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, transform):
        self.samples = sorted([y for y in glob(os.path.join(root_dir, '*', '*.pcd'))])
        if not len(self.samples) > 0:
            print("did not find any files")
        self.transform = transform

    def load_sample(self, path):
        cloud = o3d.io.read_point_cloud(path)
        points = np.asarray(cloud.points)
        cls = int(path.split('/')[1].split('_')[1])
        cls = torch.nn.functional.one_hot(torch.as_tensor(cls), 8)
        return points, cls

    def sample_N_random(self,x,N=1024):
        candiate_ids = [i for i in range(x.shape[0])]
        sel = []
        for _ in range(N):
            # select idx
            idx = random.randint(0,len(candiate_ids)-1)
            sel.append(candiate_ids[idx])
            # remove that idx from point_idx_options
            del candiate_ids[idx]
        return np.array(x[sel])

    def __getitem__(self, idx):
        x, y = self.load_sample(self.samples[idx])
        x = self.sample_N_random(x)
        #sample = {'input':x}
        #if self.transform:
        #    sample = self.transform(**sample)
        #    x = sample['input']
        return torch.as_tensor(x).float(), y.float()

    def __len__(self):
        return len(self.samples)


Verify the output of the dataset module

In [None]:
data_train = Dataset('train/',transform=None)
x ,y = data_train[15]
print(y)
show_points(x.numpy())

Define PointNet and ClassHead

In [68]:
import torch
import torch.nn as nn

class PointNet(nn.Module):
    def __init__(self, input_dims=3):
        super(PointNet, self).__init__()
        self.conv1 = torch.nn.Conv1d(input_dims, 64, 1)
        self.conv2 = torch.nn.Conv1d(64, 128, 1)
        self.conv3 = torch.nn.Conv1d(128, 1024, 1)
        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(1024)

    def forward(self, x):
        x = x.transpose(2, 1)
        x = F.leaky_relu(self.bn1(self.conv1(x)),0.2)
        x = F.leaky_relu(self.bn2(self.conv2(x)),0.2)
        x = self.bn3(self.conv3(x))
        x = torch.max(x, 2, keepdim=True)[0]
        feat = x.view(-1, 1024)
        return feat

class ClassHead(nn.Module):
    def __init__(self, input_dims=1024, output_dims=3, dropout_prob=0.5):
        super(ClassHead, self).__init__()
        self.fc1 = nn.Linear(input_dims, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 64)
        self.bn2 = nn.BatchNorm1d(64)
        self.fc3 = nn.Linear(64, output_dims)
        self.do = nn.Dropout(dropout_prob)

    def forward(self, x):
        x = F.leaky_relu(self.bn1(self.do(self.fc1(x))),0.2)
        x = F.leaky_relu(self.bn2(self.do(self.fc2(x))),0.2)
        x = self.fc3(x)
        return x

Build training module

In [69]:
import torch
import torch.nn.functional as F
from torch.optim import Adam
import pytorch_lightning as pl

class Classifier(pl.LightningModule):
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg
        self.encoder = PointNet()
        self.decoder = ClassHead(output_dims=cfg['n_classes'])

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

    def configure_optimizers(self):
        return Adam(self.parameters(), 
                    lr=self.cfg['lr'])

    def training_step(self, batch, batch_idx):
        x, y = batch
        output = self(x)
        loss = F.mse_loss(output, y)
        self.log('loss', loss, on_step=True, prog_bar=False)
        return {"loss": loss}

    def validation_step(self, batch, batch_idx):
        x, y = batch
        output = self(x)
        loss = F.mse_loss(output, y)
        return {"val_loss": loss}

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x["val_loss"] for x in outputs]).mean()
        self.log('avg_val_loss', avg_loss, on_epoch=True, prog_bar=True)

Config

In [85]:
cfg = {
        'experiment': 'hands',
        'train_folder': 'train/',
        'test_folder': 'test/',
        'n_points': 1024,
        'n_classes': 8,
        'max_epoch': 300,
        'gpus': 1,
        'lr': 0.0005,
        'batch_size': 16,
        'device': 'cuda',
      }

Train a model

In [None]:
import pytorch_lightning as pl
from pytorch_lightning import Trainer, loggers

pl.seed_everything(42)
logger = pl.loggers.TensorBoardLogger(os.path.join('lightning_logs/',cfg['experiment']))

from torch.utils.data import DataLoader
train_dataloader = DataLoader(Dataset('train/',transform=None), batch_size=cfg['batch_size'], shuffle=True)
val_dataloader = DataLoader(Dataset('test/',transform=None), batch_size=cfg['batch_size'], shuffle=False)

model = Classifier(cfg)
model.train()
trainer = Trainer(gpus=cfg['gpus'], max_epochs=cfg['max_epoch'], logger=logger, deterministic=True)
trainer.fit(model, train_dataloader=train_dataloader, val_dataloaders=val_dataloader)

model_dir = os.path.join("trained_models/",cfg['experiment'])
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
torch.save(model.encoder, os.path.join(model_dir,"encoder.pt"))
torch.save(model.decoder, os.path.join(model_dir,"decoder.pt"))
torch.save(model, os.path.join(model_dir,"model.pt"))

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard
%tensorboard --logdir='lightning_logs/'

Try the model. Given the small dataset and the lack of serious data augmentation performance is poor. (Check if the model has learned the training set?)

In [94]:
with torch.no_grad():
    model.eval()
    data_test = Dataset('test/',transform=None)
    for sample in data_test:
        x, y = sample
        pred = model(torch.unsqueeze(x, 0))[0]
        print("GT: {}, Pred {}".format(torch.argmax(y, dim=0),torch.argmax(pred, dim=0)))
        #show_points(x.numpy())

GT: 0, Pred 6
GT: 0, Pred 0
GT: 1, Pred 6
GT: 1, Pred 1
GT: 2, Pred 6
GT: 2, Pred 2
GT: 3, Pred 6
GT: 3, Pred 7
GT: 4, Pred 6
GT: 4, Pred 0
GT: 5, Pred 6
GT: 5, Pred 6
GT: 6, Pred 6
GT: 6, Pred 6
GT: 7, Pred 6
GT: 7, Pred 7
