In [1]:
#--Imports and Configuration--#
import os#--import os module
import numpy as np#--import numpy for numerical operations
import torch#--import torch
import torch.optim as optim#--import optim from torch
import torch.nn.functional as F#--import functional from torch.nn
from pytorch3d.loss import chamfer_distance#--import chamfer_distance loss function
from RGB_model import PointCloudAE#--import PointCloudAE model from RGB_model
from Dataloadersv2 import GetDataLoaders#--import GetDataLoaders function from Dataloadersv2
import RGB_utils as utils#--import RGB_utils as utils
import matplotlib.pyplot as plt#--import matplotlib for plotting
import random#--import random module
import math#--import math module
import time#--import time module

#--Parameters--#
batch_size = 32#--set batch size
output_dir = os.path.expanduser("~/meoutputsRGBpc/")#--set output directory
os.makedirs(output_dir, exist_ok=True)#--create output directory if it doesn't exist
save_results = True#--flag to save results
use_GPU = True#--flag to use GPU
latent_size = 128#--set latent size for the autoencoder
fixed_size = 1028#--set fixed point cloud size
train_path = "data/ModelNet10/alltrain.npy"#--path to training data
test_path = "data/ModelNet10/alltest.npy"#--path to test data
train_processed = "data/alltrain_adjusted.npy"#--path to processed training data
test_processed = "data/alltest_adjusted.npy"#--path to processed test data
model_weights_path = 'General_rgb_1028pc_Encoder.pth'#--path to save/load model weights
num_epochs = 500#--number of training epochs
save_every = 20#--save model every 20 epochs
plot_every = 50#--plot outputs every 50 epochs


In [2]:
#--Utility Functions--#
def generate_random_function():
    functions = [
        lambda x, y, z: (x + y + z) % 1.0,#--sum modulo 1.0
        lambda x, y, z: (math.sin(x) + math.cos(y) + z) % 1.0,#--sine and cosine modulation
        lambda x, y, z: (x * y * z) % 1.0,#--product modulo 1.0
        lambda x, y, z: ((x**2 + y**2 + z**2)**0.5) % 1.0,#--Euclidean norm modulo 1.0
    ]
    return random.choice(functions)#--select a random function

def normalize_point_cloud(pc_array):
    normalized_pc_array = []#--initialize list for normalized point clouds
    for pc in pc_array:
        spatial_coords = pc[:, :3]#--extract spatial coordinates
        centroid = np.mean(spatial_coords, axis=0)#--compute centroid
        spatial_coords -= centroid#--center the point cloud
        d_max = np.max(np.linalg.norm(spatial_coords, axis=1))#--compute maximum distance
        if d_max > 0:
            spatial_coords /= d_max#--normalize to unit scale
        pc[:, :3] = spatial_coords#--update spatial coordinates
        normalized_pc_array.append(pc)#--add to normalized list
    return np.array(normalized_pc_array)#--return as numpy array

def assign_color_to_point_clouds(pc_array):
    colored_pc_array = []#--initialize list for colored point clouds
    for pc in pc_array:
        r_func, g_func, b_func = generate_random_function(), generate_random_function(), generate_random_function()#--generate color functions
        rgb_values = np.array([
            [
                max(0, min(1, r_func(x, y, z))),#--compute red channel
                max(0, min(1, g_func(x, y, z))),#--compute green channel
                max(0, min(1, b_func(x, y, z)))#--compute blue channel
            ]
            for x, y, z in pc#--apply to each point
        ])#--create RGB array
        colored_pc = np.hstack((pc, rgb_values))#--concatenate RGB to point cloud
        colored_pc_array.append(colored_pc)#--add to colored list
    return np.array(colored_pc_array)#--return as numpy array

def adjust_point_clouds(pc_array, fixed_size=1028):
    adjusted_pc_array = []#--initialize list for adjusted point clouds
    for pc in pc_array:
        num_points = pc.shape[0]#--number of points in current point cloud
        if num_points < fixed_size:
            indices = np.random.choice(num_points, fixed_size - num_points, replace=True)#--upsample indices
            upsampled_points = pc[indices, :]#--upsampled points
            adjusted_pc = np.vstack((pc, upsampled_points))#--append upsampled points
        elif num_points > fixed_size:
            indices = np.random.choice(num_points, fixed_size, replace=False)#--downsample indices
            adjusted_pc = pc[indices, :]#--select subset of points
        else:
            adjusted_pc = pc#--no adjustment needed
        adjusted_pc_array.append(adjusted_pc)#--add to adjusted list
    return np.array(adjusted_pc_array)#--return as numpy array


In [3]:
#--Data Preprocessing--#
def preprocess_point_clouds(input_path, output_path, fixed_size=1028):
    if os.path.isfile(output_path):
        print(f"Loading preprocessed data from {output_path}")#--inform about loading preprocessed data
        return np.load(output_path, allow_pickle=True)#--load and return preprocessed data
    else:
        print(f"Preprocessing data from {input_path}")#--inform about preprocessing data
        pc_array = np.load(input_path, allow_pickle=True)#--load raw point cloud data
        colored_pc_array = assign_color_to_point_clouds(pc_array)#--assign colors to point clouds
        adjusted_pc_array = adjust_point_clouds(colored_pc_array, fixed_size=fixed_size)#--adjust point cloud sizes
        normalized_pc_array = normalize_point_cloud(adjusted_pc_array)#--normalize point clouds
        np.save(output_path, normalized_pc_array)#--save preprocessed data
        return normalized_pc_array#--return preprocessed data

#--Preprocess Train and Test Sets--#
adjusted_pc_train_array = preprocess_point_clouds(train_path, train_processed, fixed_size=fixed_size)#--preprocess training data
adjusted_pc_test_array = preprocess_point_clouds(test_path, test_processed, fixed_size=fixed_size)#--preprocess test data


Loading preprocessed data from data/alltrain_adjusted.npy
Loading preprocessed data from data/alltest_adjusted.npy


In [4]:
#--DataLoaders--#
train_loader, test_loader = GetDataLoaders(
    train_source=train_processed,#--source for training data
    test_source=test_processed,#--source for test data
    batch_size=batch_size,#--batch size
    shuffle=True,#--shuffle training data
    num_workers=8,#--number of worker threads
)#--initialize DataLoaders


In [5]:
#--Model Initialization--#
point_size = fixed_size#--set point size
net = PointCloudAE(point_size, latent_size)#--initialize PointCloudAE model

if os.path.isfile(model_weights_path):
    net.load_state_dict(torch.load(model_weights_path))#--load model weights if available
    print("Loaded saved model weights.")#--inform about loaded weights
else:
    print("No saved model weights found. Starting from scratch.")#--inform about starting fresh

device = torch.device("cuda:0" if use_GPU and torch.cuda.is_available() else "cpu")#--set device to GPU if available
if torch.cuda.device_count() > 1:
    net = torch.nn.DataParallel(net)#--use DataParallel if multiple GPUs
net = net.to(device)#--move model to device


Loaded saved model weights.


In [6]:
#--Training Setup--#
optimizer = optim.Adam(net.parameters(), lr=0.0006)#--initialize Adam optimizer with learning rate

def combined_loss(pred, target):
    pred_spatial, pred_color = pred[..., :3], pred[..., 3:]#--split predicted data into spatial and color
    target_spatial, target_color = target[..., :3], target[..., 3:]#--split target data into spatial and color
    spatial_loss, _ = chamfer_distance(pred_spatial, target_spatial)#--compute spatial chamfer loss
    color_loss, _ = chamfer_distance(pred_color, target_color)#--compute color chamfer loss
    return spatial_loss + color_loss#--return combined loss


In [7]:
#--Training and Testing Functions--#
def train_epoch():
    net.train()#--set model to training mode
    epoch_loss = 0#--initialize epoch loss
    for batch in train_loader:
        data = batch[0] if isinstance(batch, (tuple, list)) else batch#--extract data from batch
        data = data.to(device)#--move data to device
        optimizer.zero_grad()#--zero the optimizer gradients
        output = net(data.permute(0, 2, 1))#--forward pass (transpose for N, C, L format)
        loss = combined_loss(output, data)#--compute loss
        loss.backward()#--backpropagate
        optimizer.step()#--update weights
        epoch_loss += loss.item()#--accumulate loss
    return epoch_loss / len(train_loader)#--return average loss

def test_batch(data):
    with torch.no_grad():#--disable gradient computation
        data = data.to(device)#--move data to device
        output = net(data.permute(0, 2, 1))#--forward pass
        loss = combined_loss(output, data)#--compute loss
    return loss.item(), output.cpu()#--return loss and output

def test_epoch():
    net.eval()#--set model to evaluation mode
    epoch_loss = 0#--initialize epoch loss
    with torch.no_grad():#--disable gradient computation
        for data in test_loader:
            loss, _ = test_batch(data)#--compute loss for batch
            epoch_loss += loss#--accumulate loss
    return epoch_loss / len(test_loader)#--return average loss


In [8]:
#--Training Loop--#
train_loss_list = []#--initialize list to store training losses
test_loss_list = []#--initialize list to store test losses
counter = 0#--initialize counter

for epoch in range(1, num_epochs + 1):
    start_time = time.time()#--record start time
    
    train_loss = train_epoch()#--train for one epoch and get average loss
    train_loss_list.append(train_loss)#--append training loss
    
    test_loss = test_epoch()#--test on test set and get average loss
    test_loss_list.append(test_loss)#--append test loss
    
    epoch_time = time.time() - start_time#--compute epoch duration
    
    log_str = f"epoch {epoch} train loss:{train_loss} test loss:{test_loss} epoch time:{epoch_time}\n"#--create log string
    
    counter += 1#--increment counter
    
    if epoch % save_every == 0:
        print(f"Iteration {epoch}, Loss:{train_loss}")#--print loss every save_every epochs
        torch.save(net.state_dict(), model_weights_path)#--save model weights
    
    #--Plotting--#
    plt.figure()#--create new figure
    plt.plot(train_loss_list, label="Train")#--plot training loss
    plt.plot(test_loss_list, label="Test")#--plot test loss
    plt.legend()#--add legend
    
    if save_results:#--check if results should be saved
        with open(os.path.join(output_dir, "prints.txt"), "a") as file:
            file.write(log_str)#--append log to file
        plt.savefig(os.path.join(output_dir, "loss.png"))#--save loss plot
        plt.close()#--close plot
        
        if epoch % plot_every == 0:
            test_samples = next(iter(test_loader))#--get a batch of test samples
            loss, test_output = test_batch(test_samples)#--compute loss and output
            utils.plotPCbatch(test_samples, test_output, show=False, save=True, name=os.path.join(output_dir, f"epoch_{epoch}_test_set"))#--plot and save test samples
    else:
        test_samples = next(iter(test_loader))#--get a batch of test samples
        loss, test_output = test_batch(test_samples)#--compute loss and output
        utils.plotPCbatch(test_samples, test_output, show=False, save=True, name=os.path.join(output_dir, f"epoch_{epoch}_test_set"))#--plot and save test samples
        
        train_samples = next(iter(train_loader))#--get a batch of training samples
        loss, train_output = test_batch(train_samples)#--compute loss and output
        utils.plotPCbatch(train_samples, train_output, show=False, save=True, name=os.path.join(output_dir, f"epoch_{epoch}_train_set"))#--plot and save training samples
        
        print(log_str)#--print log string
        plt.show()#--display plot
    
    torch.cuda.empty_cache()#--clear GPU cache


Iteration 20, Loss:0.008933065481483937
Iteration 40, Loss:0.008868347983807326
Iteration 60, Loss:0.008871182285249234
Iteration 80, Loss:0.008771081674844026
Iteration 100, Loss:0.008819755848497153
Iteration 120, Loss:0.008662331372499466
Iteration 140, Loss:0.008604264426976442
Iteration 160, Loss:0.008567607823759317
Iteration 180, Loss:0.008598636712878943
Iteration 200, Loss:0.008546384241431951
Iteration 220, Loss:0.008451450351625681
Iteration 240, Loss:0.008394523493945599
Iteration 260, Loss:0.008425404407083989
Iteration 280, Loss:0.008444365706294775
Iteration 300, Loss:0.008317923326045275
Iteration 320, Loss:0.008314042780548335
Iteration 340, Loss:0.0082446807064116
Iteration 360, Loss:0.008253087144345046
Iteration 380, Loss:0.008232076652348042
Iteration 400, Loss:0.008220853757113218
Iteration 420, Loss:0.008151164323091507
Iteration 440, Loss:0.008174640260636806
Iteration 460, Loss:0.00815116873383522
Iteration 480, Loss:0.008150010999292136
Iteration 500, Loss:0.0

In [9]:
#--Evaluation and Visualization--#
def evaluate_model():
    net.eval()#--set model to evaluation mode
    total_loss = 0#--initialize total loss
    all_outputs = []#--initialize list to store all outputs
    all_targets = []#--initialize list to store all targets
    
    with torch.no_grad():#--disable gradient computation
        for data in test_loader:
            loss, output = test_batch(data)#--compute loss and output
            total_loss += loss#--accumulate loss
            all_outputs.append(output)#--store outputs
            target = data[0] if isinstance(data, (tuple, list)) else data#--extract target data
            all_targets.append(target.cpu())#--store targets
    
    avg_loss = total_loss / len(test_loader)#--compute average loss
    print(f"Average Test Loss:{avg_loss}")#--print average test loss
    
    #--Visualize Some Results--#
    num_visuals = 5#--number of samples to visualize
    for i in range(num_visuals):
        utils.plotPCbatch(all_targets[i], all_outputs[i], show=True, save=False, title=f"Sample {i+1}")#--plot each sample

#--Run Evaluation--#
evaluate_model()#--execute evaluation function


Average Test Loss:0.018942401680196154


TypeError: plotPCbatch() got an unexpected keyword argument 'title'