**Load the PLY File**

In [7]:
import open3d as o3d
import numpy as np
import os
import h5py

script_dir = os.path.dirname(os.getcwd())

In [1]:



sim_pc_path = os.path.join(script_dir, '.', 'docs', 'smartLab_simulated.ply')

pcd = o3d.io.read_point_cloud(sim_pc_path)
points = np.asarray(pcd.points)          # Shape: [N, 3]
colors = np.asarray(pcd.colors)           # Shape: [N, 3]

#o3d.visualization.draw_geometries([pcd])

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


**Convert Colors to Labels**

In [3]:
color_map = np.array([
    [1.0, 0.0, 0.0],  # Label 0: Red,  'ceiling'
    [0.0, 1.0, 0.0],  # Label 1: Green, 'floor'
    [0.0, 0.0, 1.0],  # Label 2: Blue,  'wall'
    [1.0, 1.0, 0.0],  # Label 3: Yellow, 'beam'
    [1.0, 0.0, 1.0],  # Label 4: Magenta, 'column'
    [0.0, 1.0, 1.0],  # Label 5: Cyan, 'window'
    [0.5, 0.5, 0.5],  # Label 6: Gray, 'door'
    [1.0, 0.5, 0.0],  # Label 7: Orange, 'chair'
    [0.5, 0.0, 1.0],  # Label 8: Purple, 'table'
    [0.5, 1.0, 0.5],  # Label 9: Light Green, 'bookcase'
    [0.5, 0.5, 1.0],  # Label 10: Light Blue, 'sofa'
    [1.0, 0.5, 0.5],  # Label 11: Pink, 'board'
    [0.0, 0.0, 0.0]   # Label 12: Black, 'clutter'
    ])

# Find closest color in color_map for each point
labels = np.argmin(np.linalg.norm(colors[:, None] - color_map, axis=2), axis=1)

**Split into Blocks**

In [4]:
def split_into_blocks(points, colors, labels, block_size=3.0, stride=1.5):
    """
    Split into blocks of size 3m x 3m with 1.5m stride.
    Returns: List of (block_points, block_colors, block_labels)
    """
    min_coords = np.min(points[:, :2], axis=0)  # Ignore Z-axis for 2D splitting
    max_coords = np.max(points[:, :2], axis=0)
    
    blocks = []
    x = min_coords[0]
    while x < max_coords[0]:
        y = min_coords[1]
        while y < max_coords[1]:
            # Select points within the block
            mask = (points[:, 0] >= x) & (points[:, 0] < x + block_size) & \
                   (points[:, 1] >= y) & (points[:, 1] < y + block_size)
            block_points = points[mask]
            block_colors = colors[mask]
            block_labels = labels[mask]
            
            if len(block_points) > 100:  # Ignore sparse blocks
                blocks.append((block_points, block_colors, block_labels))
            y += stride
        x += stride
    return blocks

**Normalize Coordinates**

In [6]:
def normalize_block(block_points):
    centroid = np.mean(block_points[:, :3], axis=0)
    block_points[:, :3] -= centroid
    max_dist = np.max(np.sqrt(np.sum(block_points[:, :3] ** 2, axis=1)))
    block_points[:, :3] /= max_dist
    return block_points, centroid, max_dist

**Create HDF5 File**

In [7]:


def save_to_h5(blocks, filename):
    with h5py.File(filename, 'w') as f:
        for i, (points, colors, labels) in enumerate(blocks):
            # Normalize coordinates
            norm_points, _, _ = normalize_block(points.copy())
            
            # Combine features: XYZ + RGB + normalized XYZ
            features = np.hstack([points, colors, norm_points])
            
            grp = f.create_group(f'room_{i}')
            grp.create_dataset('data', data=features.astype(np.float32))
            grp.create_dataset('label', data=labels.astype(np.int32))

**Split into Train/Val**

In [8]:
import random

blocks = split_into_blocks(points, colors, labels)
random.shuffle(blocks)
split_idx = int(0.8 * len(blocks))
train_blocks = blocks[:split_idx]
val_blocks = blocks[split_idx:]

In [8]:
train_pc_path = os.path.join(script_dir, '.', 'docs', 'train.h5')
val_pc_path = os.path.join(script_dir, '.', 'docs', 'val.h5')

In [None]:
save_to_h5(train_blocks, train_pc_path)
save_to_h5(val_blocks, val_pc_path)

**Dataset Class for Training**

In [9]:
import torch
from torch.utils.data import Dataset, DataLoader

class S3DISDataset(Dataset):
    def __init__(self, h5_path):
        self.h5_file = h5py.File(h5_path, 'r')
        self.keys = list(self.h5_file.keys())
        
    def __len__(self):
        return len(self.keys)
    
    def __getitem__(self, idx):
        key = self.keys[idx]
        grp = self.h5_file[key]
        data = torch.from_numpy(grp['data'][:])  # [N, 9]
        label = torch.from_numpy(grp['label'][:])  # [N]
        return data, label

train_dataset = S3DISDataset(train_pc_path)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=False)

In [11]:
test_dataset = S3DISDataset(val_pc_path)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

**Fine-Tuning with LoRA**

In [12]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import MLP, PointNetConv, fps, global_max_pool, radius
import torch_geometric.transforms as T
from torch_geometric.loader import DataLoader
from torch_geometric.nn import MLP, knn_interpolate
from torch_geometric.typing import WITH_TORCH_CLUSTER
from torch_geometric.data import Data

from sklearn.linear_model import RANSACRegressor
from scipy.spatial import ConvexHull

if not WITH_TORCH_CLUSTER:
    quit("This example requires 'torch-cluster'")

In [13]:
class SAModule(torch.nn.Module):
    def __init__(self, ratio, r, nn):
        super().__init__()
        self.ratio = ratio
        self.r = r
        self.conv = PointNetConv(nn, add_self_loops=False)

    def forward(self, x, pos, batch):
        idx = fps(pos, batch, ratio=self.ratio)
        row, col = radius(pos, pos[idx], self.r, batch, batch[idx],
                          max_num_neighbors=64)
        edge_index = torch.stack([col, row], dim=0)
        x_dst = None if x is None else x[idx]
        x = self.conv((x, x_dst), (pos, pos[idx]), edge_index)
        pos, batch = pos[idx], batch[idx]
        return x, pos, batch


class GlobalSAModule(torch.nn.Module):
    def __init__(self, nn):
        super().__init__()
        self.nn = nn

    def forward(self, x, pos, batch):
        x = self.nn(torch.cat([x, pos], dim=1))
        x = global_max_pool(x, batch)
        pos = pos.new_zeros((x.size(0), 3))
        batch = torch.arange(x.size(0), device=batch.device)
        return x, pos, batch

In [14]:
class FPModule(torch.nn.Module):
    def __init__(self, k, nn):
        super().__init__()
        self.k = k
        self.nn = nn

    def forward(self, x, pos, batch, x_skip, pos_skip, batch_skip):
        x = knn_interpolate(x, pos, pos_skip, batch, batch_skip, k=self.k)
        if x_skip is not None:
            x = torch.cat([x, x_skip], dim=1)
        x = self.nn(x)
        return x, pos_skip, batch_skip

In [15]:
class Net(torch.nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        # Input channels account for both `pos` and node features.
        self.sa1_module = SAModule(0.2, 0.2, MLP([3 + 6, 64, 64, 128])) # 3 (pos) + 6 (x)
        self.sa2_module = SAModule(0.25, 0.4, MLP([128 + 3, 128, 128, 256]))
        self.sa3_module = GlobalSAModule(MLP([256 + 3, 256, 512, 1024]))

        self.fp3_module = FPModule(1, MLP([1024 + 256, 256, 256]))
        self.fp2_module = FPModule(3, MLP([256 + 128, 256, 128]))
        self.fp1_module = FPModule(3, MLP([128 + 6, 128, 128, 128]))

        self.mlp = MLP([128, 128, 128, num_classes], dropout=0.5, norm=None)

        self.lin1 = torch.nn.Linear(128, 128)
        self.lin2 = torch.nn.Linear(128, 128)
        self.lin3 = torch.nn.Linear(128, num_classes)

    def forward(self, data):
        sa0_out = (data.x, data.pos, data.batch)
        sa1_out = self.sa1_module(*sa0_out)
        sa2_out = self.sa2_module(*sa1_out)
        sa3_out = self.sa3_module(*sa2_out)

        fp3_out = self.fp3_module(*sa3_out, *sa2_out)
        fp2_out = self.fp2_module(*fp3_out, *sa1_out)
        x, _, _ = self.fp1_module(*fp2_out, *sa0_out)

        return self.mlp(x).log_softmax(dim=-1)

In [None]:



# Load pre-trained weights
# Load the checkpoint dictionary
checkpoint = torch.load(r".\checkpoints\pointnet2_s3dis_seg_x6_30_checkpoint.pth", map_location=device)
# Extract the model state dictionary
model_state_dict = checkpoint['model_state_dict']



In [17]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Net(num_classes=13).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

def train():
    model.train()

    total_loss = correct_nodes = total_nodes = 0
    for i, data in enumerate(train_loader):
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = F.nll_loss(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        correct_nodes += out.argmax(dim=1).eq(data.y).sum().item()
        total_nodes += data.num_nodes

        if (i + 1) % 10 == 0:
            print(f'[{i+1}/{len(train_loader)}] Loss: {total_loss / 10:.4f} '
                  f'Train Acc: {correct_nodes / total_nodes:.4f}')
            total_loss = correct_nodes = total_nodes = 0

In [19]:
from torchmetrics.functional import jaccard_index
@torch.no_grad()
def test(loader):
    model.eval()
    ious = []
    
    for data in loader:
        data = data.to(device)
        outs = model(data)

        sizes = (data.ptr[1:] - data.ptr[:-1]).tolist()
        for out, y in zip(outs.split(sizes), data.y.split(sizes)):            
            iou = jaccard_index(out.argmax(dim=-1), y, 
                                num_classes=loader.dataset.num_classes, 
                                task="multiclass")
            ious.append(iou)        

    iou = torch.tensor(ious, device=device)

    mean_iou = iou.mean()
    return float(mean_iou)


In [None]:
for epoch in range(1, 21):
    train()
    iou = test(test_loader)
    print(f'Epoch: {epoch:02d}, Test IoU: {iou:.4f}')

In [None]:
checkpoint_path = ".\checkpoints\pointnet2_s3dis_transform_seg_x6_27_checkpoint.pth"  # Path to save the checkpoint

# Save model, optimizer state, and any other info needed
torch.save({
    'epoch': 27,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    #'loss': loss,
    #'test_accuracy': test_acc
}, checkpoint_path)

print("Checkpoint saved successfully!")

In [None]:
# Apply LoRA to segmentation head
model.seg_head = lora.Linear(...)  # See earlier LoRA code

# Train
for epoch in range(epochs):
    for data, labels in train_loader:
        ...

In [None]:
import torch.nn as nn
import loralib as lora

# Load the original model
model = torch.load("original_model.pth")

# Freeze the entire base model
for param in model.parameters():
    param.requires_grad = False

# Replace specific layers with LoRA-enabled layers
# Example: Replace a linear layer in the segmentation head
model.seg_layer[0] = lora.Linear(
    in_features=128,        # Original input dimension
    out_features=64,       # Original output dimension
    r=8,                   # Rank of the low-rank matrices (hyperparameter)
    lora_alpha=16,         # Scaling factor for LoRA weights
    merge_weights=False     # Keep LoRA separate during training
)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define LoRA-compatible optimizer (only train LoRA params)
optimizer = torch.optim.Adam(
    lora.lora_parameters(model),  # Only LoRA parameters
    lr=1e-3,                      # Higher LR for adapter training
    weight_decay=0.01
)

# Training loop (similar to standard fine-tuning)
for epoch in range(num_epochs):
    model.train()
    for points, labels in train_loader:
        points, labels = points.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(points)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

In [None]:
torch.save(lora.lora_state_dict(model), "pointcloud_lora_weights.pth")

In [None]:
# Load the original model and LoRA weights
model = torch.load("original_model.pth")
lora_weights = torch.load("pointcloud_lora_weights.pth")

# Merge LoRA weights into the base model
model.load_state_dict(lora_weights, strict=False)
lora.mark_only_lora_as_trainable(model)  # Optional: Revert if needed later

# Now use `model` as a standard fine-tuned model

In [None]:
import torch
from pointnet2 import PointNet2Seg
import loralib as lora

class PointNet2LoRA(PointNet2Seg):
    def __init__(self, num_classes, r=8, alpha=16):
        super().__init__(num_classes)
        
        # Freeze base model
        for param in self.parameters():
            param.requires_grad = False
        
        # Replace final MLP layers with LoRA
        self.seg_layers = nn.Sequential(
            lora.Linear(256, 128, r=r, lora_alpha=alpha),
            nn.ReLU(),
            lora.Linear(128, num_classes, r=r, lora_alpha=alpha)
        )

# Usage
model = PointNet2LoRA(num_classes=10, r=8)