In [17]:
## Instalaciones

%pip install torch
%pip install open3d


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Collecting open3d
  Downloading open3d-0.19.0-cp311-cp311-manylinux_2_31_x86_64.whl.metadata (4.3 kB)
Collecting dash>=2.6.0 (from open3d)
  Downloading dash-3.0.4-py3-none-any.whl.metadata (10 kB)
Collecting werkzeug>=3.0.0 (from open3d)
  Downloading werkzeug-3.1.3-py3-none-any.whl.metadata (3.7 kB)
Collecting flask>=3.0.0 (from open3d)
  Downloading flask-3.1.1-py3-none-any.whl.metadata (3.0 kB)
Collecting configargparse (from open3d)
  Downloading ConfigArgParse-1.7-py3-none-any.whl.metadata (23 kB)
Collecting addict (from open3d)
  Downloading addict-2.4.0-py3-none-any.whl.metadata (1.0 kB)
Collecting pyquaternion (from open3d)
  Downloading pyquaternion-0.9.9-py3-non

In [42]:
## Dependencias

import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import open3d as o3d
import numpy as np
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

print(torch.cuda.is_available())

False


In [14]:
## T-net
"""
T-net es una 'mini-red' que aprende una matriz de transformación de tamaño
dimxdim que transforma la entrada a una representación 'canónica', la cuál
es invariante a transformaciones rigidas (rotación, translación, reflexión).
"""

class Tnet(nn.Module):
    def __init__(self, dim, num_points):
        super(Tnet, self).__init__()

        self.dim = dim

        # Función de activación
        self.act = F.relu

        # Conv1d es una implementación sencilla de una 'MLP compartida'
        self.shared_mlp1 = nn.Conv1d(dim, 64, kernel_size=1)
        self.shared_mlp2 = nn.Conv1d(64, 128, kernel_size=1)
        self.shared_mlp3 = nn.Conv1d(128, 1024, kernel_size=1)
        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(1024)

        self.max_pool = nn.MaxPool1d(kernel_size=num_points)

        # MLPs no compartidas
        self.linear1 = nn.Linear(1024, 512)
        self.linear2 = nn.Linear(512, 256)
        self.linear3 = nn.Linear(256, dim**2)
        self.bn4 = nn.BatchNorm1d(512)
        self.bn5 = nn.BatchNorm1d(256)
    
    def forward(self, x):
        bs = x.shape[0]

        # Paso a través de las MLPs compartidas
        x = self.bn1(self.act(self.shared_mlp1(x)))
        x = self.bn2(self.act(self.shared_mlp2(x)))
        x = self.bn3(self.act(self.shared_mlp3(x)))

        # Max pool
        x = self.max_pool(x).view(bs, -1)
        
        # Paso a través de las MLPs no compartidas
        x = self.bn4(self.act(self.linear1(x)))
        x = self.bn5(self.act(self.linear2(x)))
        x = self.linear3(x)
        
        # Reshape de 'T-Net(x)' a una matriz
        x = x.view(-1, self.dim, self.dim)
        # Le sumamos la matriz identidad para mayor estabilidad
        iden = torch.eye(self.dim, requires_grad=True).repeat(bs, 1, 1)
        if x.is_cuda:
            iden = iden.cuda()
        x += iden

        return x

In [15]:
## Point-net classifier

class PointnetClassifier(nn.Module):
    def __init__(self, dim, num_points, num_global_feats, num_classes):
        super(PointnetClassifier, self).__init__()

        # Función de activación
        self.act = F.relu

        # T-Net en los puntos de la entrada
        self.input_transform = Tnet(dim, num_points)

        # Primera MLP compartida, transforma los puntos de la entrada en features
        self.shared_mlp1 = nn.Conv1d(3, 64, kernel_size=1)
        self.shared_mlp2 = nn.Conv1d(64, 64, kernel_size=1)
        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(64)

        # T-Net en las features
        self.feature_transform = Tnet(64, num_points)

        # Segunda MLP compartida, determina las features globales
        self.shared_mlp3 = nn.Conv1d(64, 64, kernel_size=1)
        self.shared_mlp4 = nn.Conv1d(64, 128, kernel_size=1)
        self.shared_mlp5 = nn.Conv1d(128, num_global_feats, kernel_size=1)
        self.bn3 = nn.BatchNorm1d(64)
        self.bn4 = nn.BatchNorm1d(128)
        self.bn5 = nn.BatchNorm1d(num_global_feats)
        # Max pool para extraer las features globales
        # Devolver los indices nos permite ver los indices críticos que determinan las features globales
        self.max_pool = nn.MaxPool1d(kernel_size=num_points, return_indices=True)

        # MLP para clasificación
        self.linear1 = nn.Linear(num_global_feats, 512)
        self.linear2 = nn.Linear(512, 256)
        self.bn_linear1 = nn.BatchNorm1d(512)
        self.bn_linear2 = nn.BatchNorm1d(256)
        self.dropout = nn.Dropout(p=0.3)

        # Output layer
        self.linear3 = nn.Linear(256, num_classes)
    
    def forward(self, x):

        # Tamaño del batch, es decir cuantos ejemplos hay en el batch
        bs = x.shape[0]

        # Transformación del input
        input_matrix = self.input_transform(x)
        # x = torch.bmm(x.tranpose(2, 1), input_matrix).tranpose(2, 1)
        x = torch.transpose(torch.bmm(torch.transpose(x, 2, 1), input_matrix), 2, 1)

        # Paso a través de las primeras MLPs compartidas
        x = self.bn1(self.act(self.shared_mlp1(x)))
        x = self.bn2(self.act(self.shared_mlp2(x)))

        # Transformación de features
        feature_matrix = self.feature_transform(x)
        # x = torch.bmm(x.tranpose(2, 1), feature_matrix).tranpose(2, 1)
        x = torch.transpose(torch.bmm(torch.transpose(x, 2, 1), feature_matrix), 2, 1)

        # Paso a través de las segundas MLPs compartidas
        x = self.bn3(self.act(self.shared_mlp3(x)))
        x = self.bn4(self.act(self.shared_mlp4(x)))
        x = self.bn5(self.act(self.shared_mlp5(x)))

        global_features, critical_indexes = self.max_pool(x)
        global_features = global_features.view(bs, -1)
        critical_indexes = critical_indexes.view(bs, -1)

        # Clasificación
        x = self.bn_linear1(self.act(self.linear1(global_features)))
        x = self.bn_linear2(self.act(self.linear2(x)))
        x = self.dropout(x)
        x = self.linear3(x)

        # Devolver logits
        return x, critical_indexes, feature_matrix

            

In [43]:
# Clase para el cómputo de la pérdida

class PointNetLoss(nn.Module):
    """
    Atributos:
        alpha            El peso de las clases para pérdida CrossEntropy.
        reg_weight       Peso de regularización.
        size_average     Booleano que define si es que la pérdida final se computa como promedio o no.
    """
    def __init__(self, alpha=None, gamma=0, reg_weight=0, size_average=True):
        super(PointNetLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reg_weight = reg_weight
        self.size_average = size_average

        # Convertimos en tensores
        if isinstance(alpha, (float, int)):
            self.alpha = torch.Tensor([alpha, 1 - alpha])
        if isinstance(alpha, (list, np.ndarray)):
            self.alpha = torch.Tensor(alpha)

        self.cross_entropy_loss = nn.CrossEntropyLoss(weight=self.alpha)

    def forward(self, predictions, targets, A):
        # tamaño de batch
        batch_size = predictions.size(0)

        # computamos pérdida CE
        ce_loss = self.cross_entropy_loss(predictions, targets)

        # Probabilidades predichas
        pn = F.softmax(predictions, dim=1)
        pn.gather(1, targets.view(-1, 1)).view(-1)

        if self.reg_weight > 0:
            I = torch.eye(64).unsqueeze(0).repeat(A.shape[0], 1, 1)
            if A.is_cuda:
                I = I.cuda()
            reg = torch.linalg.norm(I - torch.bmm(A, A.transpose(2, 1)))
            reg = self.reg_weight * reg/batch_size

        loss = ((1 - pn)**self.gamma * ce_loss)
        if self.size_average:
            return loss.mean() + reg
        else:
            return loss.sum() + reg

In [44]:
class ModelNet(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return  len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [45]:
ROOT_DIR = os.getcwd()
DATASET_DIR = os.path.join(ROOT_DIR, "ModelNet10")
classes = {
    "toilet": 0,
    "monitor": 1
}



x_train = list()
y_train = list()
for label, value in classes.items():
    CLASS_DIR = os.path.join(DATASET_DIR, label, "train")    
    
    for file in os.scandir(CLASS_DIR):  
        if ".pcd" in str(file):
            pcd = o3d.io.read_point_cloud(file.path)
            points = np.asarray(pcd.points, dtype=float)
            x_train.append(points)
            y_train.append(value)

x_train = np.transpose(x_train, (0, 2, 1))

train_data = ModelNet(x_train, y_train)

In [46]:
# parametros del dataset
batch_size = 16
dim = x_train.shape[1]
num_points = 1024
num_classes = 2

# hiperparametros
num_global_feats = 1024     # número de features globales calculadas
epochs = 2
learning_rate = 0.01
reg_weight = 0.001



train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

classifier = PointnetClassifier(dim, num_points, num_global_feats, num_classes)

optimizer = optim.Adam(classifier.parameters(), lr=learning_rate)
criterion = PointNetLoss(alpha=None, gamma=1, reg_weight=reg_weight, size_average=True)

loss_dict = {"train": list(), "val": list()}

for epoch in range(1, epochs + 1):
    running_train_loss = 0
    # Modo entrenamiento
    classifier = classifier.train()

    for pcds, labels in train_loader:
        # optimizer.zero_grad()
    
        # Hacer predicciones
        out, _, A = classifier(pcds)
        loss = criterion(out, labels, A)
    
        # Calcular gradiente y optimizar
        loss.backward()
        optimizer.step()

        # Calculamos las elecciones
        pred_choice = torch.softmax(out, dim=1).argmax(dim=1)

        # Running train_loss (es promedio del batch por size_average, asi que la multiplicamos)
        running_train_loss += loss.item() * pcds.size(0)
    
    epoch_train_loss = running_train_loss / len(train_loader.dataset)

    print(epoch_train_loss)
    loss_dict["train"].append(epoch_train_loss)


26.944007826381913
50.29292091775147


In [49]:


# parametros del dataset
batch_size = 32             # número de point clouds
dim = 3                     # número de dimensiones por cada punto
num_points = 1024           # número de puntos por point cloud
num_classes = 2             # número de clases de clasificación

# hiperparametros
num_global_feats = 1024     # número de features globales calculadas

test_data = torch.rand(batch_size, dim, num_points)
print(test_data.shape)

classifier = PointnetClassifier(dim, num_points, num_global_feats, num_classes)
out, _, _ = classifier(test_data)
print(f'Class output shape: {out.shape}')
print(f'Class output: {out}')

train_test(50, 0.01, 0.001, num_classes)

torch.Size([32, 3, 1024])
Class output shape: torch.Size([32, 2])
Class output: tensor([[ 0.1882,  0.8044],
        [-0.6484,  1.7559],
        [-0.2891,  0.2091],
        [ 1.1318,  0.7276],
        [-1.4173, -0.3386],
        [ 0.0773, -0.3860],
        [ 0.1999, -0.6367],
        [ 0.1684, -1.1633],
        [ 0.8659, -0.4611],
        [ 0.3282, -0.3753],
        [ 1.2252, -0.9516],
        [ 0.2915,  0.0900],
        [-1.1991, -0.3843],
        [-0.9253, -0.0640],
        [ 0.4907,  0.1421],
        [ 0.3034, -0.1705],
        [-0.3783, -0.9598],
        [-0.2214, -0.3284],
        [ 0.5004, -0.7704],
        [-0.0100,  0.4549],
        [-0.2003, -0.6737],
        [-0.3351, -0.1048],
        [ 1.7228, -0.0268],
        [-0.0713, -0.6172],
        [ 0.2940, -0.3063],
        [-0.7988, -0.4807],
        [ 0.1332, -0.2903],
        [ 0.1220,  0.5724],
        [ 0.4042,  0.1653],
        [ 0.5742,  0.1928],
        [-0.5718, -0.1549],
        [ 0.3011, -0.1173]], grad_fn=<AddmmBackward0

NameError: name 'train_test' is not defined