In [17]:
import torch
import torch.nn as nn
import os
import numpy as np

In [18]:
import trimesh
import tensorflow as tf

DATA_DIR = tf.keras.utils.get_file(
    "modelnet.zip",
    "http://3dvision.princeton.edu/projects/2014/3DShapeNets/ModelNet10.zip",
    extract=True,
)
DATA_DIR = os.path.join(os.path.dirname(DATA_DIR), "ModelNet10")

In [19]:
import glob
from torch.utils.data import DataLoader, TensorDataset, RandomSampler

batch_size = 8

# bathtub 0, bed 1, chair 2, desk 3, dresser 4, monitor 5, night_stand 6, sofa 7, table 8, toliet 9
item_names = ["bathtub", "bed", "chair", "desk", "dresser", "monitor", "night_stand", "sofa", "table", "toliet"]
train_labels = np.array([])
train_inputs = []
test_labels = np.array([])
test_inputs = []


for i in range(len(item_names)):
# for i in range(2):
    item_name = item_names[i]
    
    train_files = glob.glob(os.path.join(DATA_DIR, item_name, "train/*.off"))
    train_labels = np.concatenate([train_labels, np.ones(len(train_files)) * i])
    for file in train_files:
        train_inputs.append(trimesh.load(file).sample(2048))
        
    test_files = glob.glob(os.path.join(DATA_DIR, item_name, "test/*.off"))
    test_labels = np.concatenate([test_labels, np.ones(len(test_files)) * i])
    for file in test_files:
        test_inputs.append(trimesh.load(file).sample(2048))


train_inputs = torch.tensor(train_inputs)
train_labels = torch.tensor(train_labels).flatten()

test_inputs = torch.tensor(test_inputs)
test_labels = torch.tensor(test_labels).flatten()

# normalization
train_inputs = train_inputs - train_inputs.mean(dim=1).unsqueeze(1)
train_inputs = train_inputs / torch.max(torch.sqrt(torch.sum(train_inputs ** 2, dim=2)), dim=1).values.view(-1, 1, 1)
test_inputs = test_inputs - test_inputs.mean(dim=1).unsqueeze(1)
test_inputs = test_inputs / torch.max(torch.sqrt(torch.sum(test_inputs ** 2, dim=2)), dim=1).values.view(-1, 1, 1)


train_dataset = TensorDataset(train_inputs, train_labels)
random_sampler = RandomSampler(train_dataset)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, sampler=random_sampler)

test_dataset = TensorDataset(test_inputs, test_labels)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)


In [20]:
import datetime
import time
import torch.nn.functional as F
from transformers import AdamW
from sklearn.metrics import f1_score
from transformers import AdamW, get_linear_schedule_with_warmup
import sys

EPOCHS = 10
LEARNING_RATE = 0.001
EPS = 1e-8
WARMUP = 100
num_classes = 10

def update_progress(progress):
    sys.stdout.write('\r%d%%' % progress)
    # sys.stdout.write(f'{progress}%  {msg}')
    sys.stdout.flush()
    
def format_time(time):
    time_rounded = int(round((time)))
    return str(datetime.timedelta(seconds=time_rounded))


def train_model(model, epochs, datalodaer):
    optimizer = AdamW(model.parameters())
    num_training_steps = len(datalodaer) * epochs
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=WARMUP, num_training_steps=num_training_steps)
    
    model.to('cuda')
    model.train()

    batch_size = datalodaer.batch_size
    num_data = len(datalodaer) * batch_size
    start_time = time.time()

    print(" --- training model")
    for epoch in range(epochs):
        total_loss = 0
        epoch_start_time = time.time()
        
        for step, batch in enumerate(datalodaer):
            batch_inputs = tuple(t.to('cuda') for t in batch)
            inputs = batch_inputs[0].type(torch.float32)
            labels = batch_inputs[1]

            output = model(inputs)
            loss = F.cross_entropy(output, labels.long())
            total_loss += loss.item()
            
            optimizer.zero_grad(set_to_none=True)
            loss.backward()
            optimizer.step()
            scheduler.step()

            update_progress((step+1)*batch_size / num_data * 100)

        avg_train_loss = total_loss / len(datalodaer)
        
        
        print(f' {epoch+1}/{epochs} - elapsed: {format_time(time.time() - epoch_start_time)}, average train loss: {avg_train_loss}')

    print(f' --- train finished, elapsed: {format_time(time.time() - start_time)}')

In [21]:

from model import PointTransformer

class PointTransformerCls(nn.Module):
    def __init__(self, n_classes) -> None:
        super().__init__()
        self.transformer = PointTransformer(3)
        self.cls = nn.Sequential(
            nn.Linear(512, 256),
            # nn.BatchNorm1d(256),
            nn.GELU(),
            # nn.Dropout(),
            nn.Linear(256, 128),
            # nn.BatchNorm1d(128),
            nn.GELU(),
            # nn.Dropout(),
            nn.Linear(128, n_classes)
        )
    
    def forward(self, x):
        x = self.transformer(x) # b, n, 512
        x = torch.mean(x, dim=1)

        return self.cls(x)

In [22]:
# model = PointTransformerCls(10)
# model.train()
# train_model(model, 10, train_dataloader)
# torch.save(model, "./p_transformer.pt")
model = torch.load("p_transformer.pt")
model.cuda()

PointTransformerCls(
  (transformer): PointTransformer(
    (mlp): SingleMLP(
      (fc): Linear(in_features=3, out_features=32, bias=True)
      (bn): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (blocks): ModuleList(
      (0): PointTransformerBlock(
        (transformerLayer): PointTransformerLayer(
          (query): Linear(in_features=32, out_features=32, bias=True)
          (key): Linear(in_features=32, out_features=32, bias=True)
          (value): Linear(in_features=32, out_features=32, bias=True)
          (fc_delta1): Linear(in_features=3, out_features=3, bias=True)
          (fc_delta2): Linear(in_features=3, out_features=32, bias=True)
          (bn_delta): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (fc_gamma1): Linear(in_features=32, out_features=32, bias=True)
          (fc_gamma2): Linear(in_features=32, out_features=32, bias=True)
          (bn_gamma1): BatchNorm2d(32, eps=1e-05,

In [23]:
def test_model(model, dataloader, draw = False):
    test_loss = 0
    labels = np.array([])
    predictions = np.array([])

    model.to('cuda')
    model.eval()

    batch_size = dataloader.batch_size
    num_data = len(dataloader) * batch_size
    start_time = time.time()

    with torch.no_grad():
        for step, batch in enumerate(dataloader):
            batch_inputs = tuple(t.to('cuda') for t in batch)
            inputs = batch_inputs[0].type(torch.float32)
            label = batch_inputs[1]

            output = model(inputs)
            loss = F.cross_entropy(output, label.long())
            test_loss += loss.item()
            
            softmaxed_output = F.softmax(output, dim=1)
            prediction = softmaxed_output.argmax(dim=1).detach().cpu().numpy()
            predictions = np.concatenate([predictions, prediction])
            labels = np.concatenate([labels, label.cpu().long().numpy()])
            

            update_progress((step * batch_size) / num_data * 100)
            
    n_rights = predictions[predictions == labels].shape[0]
    

    test_loss /= num_data
    print(f'\nloss: {test_loss}, {n_rights}/{num_data}, f1_score : {f1_score(labels, predictions, average="micro")}')
    print(f' --- evaluation finished {format_time(time.time() - start_time)}')


In [24]:
model.eval()
test_model(model, test_dataloader)

99%
loss: 0.032046578397723964, 741/808, f1_score : 0.9170792079207921
 --- evaluation finished 0:00:25
