In [8]:
from data_pipeline.environments.cubby_environment import CubbyEnvironment 
from robofin.collision import FrankaSelfCollisionChecker

env = CubbyEnvironment()
selfcc = FrankaSelfCollisionChecker()

In [9]:
obstacles, qs_free, poses_free, qs_collision, poses_collision = env.sample_q_pose(selfcc=selfcc, how_many=50, margin=0)
print("len(qs_free):", len(qs_free))
print("len(qs_collision):", len(qs_collision))

len(qs_free): 50
len(qs_collision): 429


In [10]:
import torch
from robofin.pointcloud.torch import FrankaSampler
fk_sampler = FrankaSampler("cpu", use_cache=True)

qs = qs_collision
q = torch.tensor(qs[5], dtype=torch.float32, requires_grad=False)
robot_points = fk_sampler.sample(q, 2048)
print("robot_points:", robot_points.shape)

robot_points: torch.Size([1, 2048, 3])


In [11]:
from data_pipeline.geometry import construct_mixed_point_cloud
obstacle_points = construct_mixed_point_cloud(obstacles, num_points=4096)

In [12]:
import open3d as o3d

# Create an Open3D PointCloud object for the robot points
robot_pcd = o3d.geometry.PointCloud()
robot_points = robot_points.squeeze().cpu().numpy()  # Convert to numpy array
robot_points = robot_points.reshape(-1, 3)  # Ensure correct shape
robot_pcd.points = o3d.utility.Vector3dVector(robot_points)  # Use precomputed robot_points
robot_pcd.paint_uniform_color([1, 0, 0])  # Red for robot points

# Create an Open3D PointCloud object for the obstacle points
obstacle_pcd = o3d.geometry.PointCloud()
obstacle_pcd.points = o3d.utility.Vector3dVector(obstacle_points[:, :3])  # Use precomputed obstacle_points
obstacle_pcd.paint_uniform_color([0, 1, 0])  # Green for obstacle points

# Combine the two point clouds into a single list
pcd = [robot_pcd, obstacle_pcd]

# Visualize both point clouds together
o3d.visualization.draw_geometries(pcd)


In [13]:
# Check the boundaries of the point clouds
robot_points = robot_points.reshape(-1, 3)  # Ensure correct shape
print("robot_points min:", robot_points.min(axis=0))
print("robot_points max:", robot_points.max(axis=0))
obstacle_points = obstacle_points[:, :3]
print("obstacle_points min:", obstacle_points.min(axis=0))
print("obstacle_points max:", obstacle_points.max(axis=0))

robot_points min: [-0.15131235 -0.10666256  0.        ]
robot_points max: [0.79742277 0.44996527 0.5832137 ]
obstacle_points min: [ 5.22638759e-01 -6.67001633e-01  5.55111512e-17]
obstacle_points max: [1.22289878 0.82198416 0.73705289]


In [7]:
# Import necessary libraries
import torch  # For loading and processing tensor data
import open3d as o3d  # For 3D visualization
import matplotlib.pyplot as plt  # For plotting and visualization in 2D

# Ensure matplotlib inline mode for Jupyter Notebook
%matplotlib inline

In [14]:
# Load Dataset

# Define the path to the tensor file
tensor_file = "./collision_bool/processed.pt"

# Load the dataset using the provided function
def load_full_data_tensor(tensor_file):
    """
    Load the full dataset from a tensor file.
    
    Args:
        tensor_file (str): Path to the tensor file
        
    Returns:
        List of samples
    """
    samples = torch.load(tensor_file)
    print(f"Full dataset loaded from {tensor_file}")
    return samples

# Load the dataset
dataset_samples = load_full_data_tensor(tensor_file)

# Display the number of samples loaded
print(f"Number of samples loaded: {len(dataset_samples)}")

  samples = torch.load(tensor_file)


Full dataset loaded from ./collision_bool/processed.pt
Number of samples loaded: 50000


In [16]:
import numpy as np
# Iterate through the dataset samples to check the boundaries of point clouds
robot_min, robot_max = [], []
obstacle_min, obstacle_max = [], []

for sample in dataset_samples:
    pointcloud_data = sample['pointcloud'].numpy()
    robot_points = pointcloud_data[pointcloud_data[:, 3] == 0][:, :3]
    obstacle_points = pointcloud_data[pointcloud_data[:, 3] == 1][:, :3]
    
    robot_min.append(robot_points.min(axis=0))
    robot_max.append(robot_points.max(axis=0))
    obstacle_min.append(obstacle_points.min(axis=0))
    obstacle_max.append(obstacle_points.max(axis=0))

# Compute overall min and max for robot and obstacle points
robot_min = np.min(robot_min, axis=0)
robot_max = np.max(robot_max, axis=0)
obstacle_min = np.min(obstacle_min, axis=0)
obstacle_max = np.max(obstacle_max, axis=0)

print("Robot Points Min:", robot_min)
print("Robot Points Max:", robot_max)
print("Obstacle Points Min:", obstacle_min)
print("Obstacle Points Max:", obstacle_max)

Robot Points Min: [-0.24984953 -0.834266    0.        ]
Robot Points Max: [0.9602809 0.8245288 0.8998966]
Obstacle Points Min: [ 3.2435596e-01 -8.4170550e-01 -1.6653345e-16]
Obstacle Points Max: [1.3019081  0.84264076 0.81443304]


In [None]:
# Visualize a Sample Point Cloud

# Select a sample from the dataset
sample_index = 100  # Change this index to visualize a different sample
sample = dataset_samples[sample_index]

# Extract the point cloud data
pointcloud_data = sample['pointcloud'].numpy()

print("Collison:", sample['collision_flag'])

# The first dim is the feature dimension, the rest are the point coordinates
# 0 for robot points, 1 for obstacle points

# Separate robot and obstacle points based on the feature dimension
robot_points = pointcloud_data[pointcloud_data[:, 3] == 0][:, :3]
obstacle_points = pointcloud_data[pointcloud_data[:, 3] == 1][:, :3]

# Create Open3D point clouds
robot_pcd = o3d.geometry.PointCloud()
robot_pcd.points = o3d.utility.Vector3dVector(robot_points)
robot_pcd.paint_uniform_color([1, 0, 0])  # Red for robot points

obstacle_pcd = o3d.geometry.PointCloud()
obstacle_pcd.points = o3d.utility.Vector3dVector(obstacle_points)
obstacle_pcd.paint_uniform_color([0, 1, 0])  # Green for obstacle points

# Visualize both point clouds
o3d.visualization.draw_geometries([robot_pcd, obstacle_pcd])





NameError: name 'np' is not defined

In [18]:
import torch
from models.ptv3 import PointTransformerV3

# Load the processed dataset from the tensor file
tensor_file = "./collision_bool/processed.pt"  # Path to your processed tensor file
samples = torch.load(tensor_file)

# Initialize the PointTransformerV3 model
model = PointTransformerV3(
    in_channels=4,  # Number of input features per point (e.g., x, y, z, feature flag)
    enc_depths=(2, 2, 2, 6, 2),
    enc_channels=(32, 64, 128, 256, 512),
    enc_num_head=(2, 4, 8, 16, 32),
    enc_patch_size=(1024, 1024, 1024, 1024, 1024),
    cls_mode=True  # Set to True if you only want the encoded features
)

# Move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Prepare a batch of samples from the loaded dataset
batch_size = 1  # Number of samples in the batch
batch_samples = samples[:batch_size]

# Prepare the input dictionary
pointclouds = torch.cat([sample["pointcloud"] for sample in batch_samples], dim=0).to(device)  # Concatenate point clouds
batch_indices = torch.cat([
    torch.zeros(len(sample["pointcloud"][:2048]), dtype=torch.long) for sample in batch_samples  # Robot points
] + [
    torch.ones(len(sample["pointcloud"][2048:]), dtype=torch.long) for sample in batch_samples  # Obstacle points
]).to(device)

input_data = {
    "coord": pointclouds[:, :3],  # Extract x, y, z coordinates
    "feat": pointclouds,         # Use the full point cloud as features (x, y, z, feature flag)
    "batch": batch_indices,      # Batch indices (0 for robot points, 1 for obstacle points)
    "grid_size": torch.tensor(0.02).to(device)  # Grid size for voxelization
}

# Forward pass through the model
model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    encoded_features = model(input_data)
    # Perform global max pooling to aggregate features into a single vector
    global_feature_vector = torch.max(encoded_features["feat"], dim=0).values
    



  samples = torch.load(tensor_file)


In [19]:
# Output the encoded features
print("Encoded Features keys:", encoded_features.keys())
print("Encoded Features Shape:", encoded_features["feat"].shape)
print(global_feature_vector.shape)

Encoded Features keys: dict_keys(['feat', 'coord', 'grid_coord', 'serialized_code', 'serialized_order', 'serialized_inverse', 'serialized_depth', 'batch', 'pooling_inverse', 'pooling_parent', 'offset', 'sparse_shape', 'sparse_conv_feat', 'pad', 'unpad', 'cu_seqlens_key'])
Encoded Features Shape: torch.Size([41, 512])
torch.Size([512])


In [5]:
import torch
from models.ptv3 import PointTransformerNet
# Load the processed dataset from the tensor file
tensor_file = "./collision_bool/processed.pt"  # Path to your processed tensor file
samples = torch.load(tensor_file)
# Prepare a batch of samples from the loaded dataset
batch_size = 8  # Define the batch size
batch_samples = samples[:batch_size]  # Select the first 'batch_size' samples

# Concatenate point clouds from all samples in the batch
pointcloud = torch.cat([sample["pointcloud"] for sample in batch_samples], dim=0)

# Initialize the PointTransformerV3 model
model = PointTransformerNet()

# Move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
pointcloud = pointcloud.to(device)  # Move the point cloud to the same device as the model



  samples = torch.load(tensor_file)


In [6]:
# Forward pass through the model
with torch.no_grad():
    feature = model(pointcloud)  # Pass the point cloud through the model
    
print("Feature shape:", feature.shape)  # Print the shape of the output feature

RuntimeError: The size of tensor a (6144) must match the size of tensor b (49152) at non-singleton dimension 0

In [None]:
import torch
import pytorch_lightning as pl
from train import CollisionNetPL
from data_loader import get_data_loaders_from_tensor

# Path to the trained model checkpoint
checkpoint_path = 'checkpoints_pointnet/collisionnet-epoch=58-val_loss=0.14.ckpt' 
# checkpoint_path = 'checkpoints_pointnet2/collisionnet-epoch=58-val_loss=0.28.ckpt'

# Load the trained model
model = CollisionNetPL.load_from_checkpoint(checkpoint_path=checkpoint_path)

# Load the test dataset
_, test_loader = get_data_loaders_from_tensor(
    "./collision_bool/processed_test.pt",  # Update with the actual test dataset path
    batch_size=64,
    train_ratio=0.01  # No training split, purely test data
)

# Initialize the PyTorch Lightning Trainer
trainer = pl.Trainer(
    accelerator='gpu' if torch.cuda.is_available() else 'cpu',
    devices=1
)

# Run the test
results = trainer.test(model, dataloaders=test_loader)

# Print the test results
print(f"Test results: {results}")

  _TORCH_CUSTOM_FWD = amp.custom_fwd(cast_inputs=torch.float16)
  def backward(ctx, grad_output):
  def backward(ctx, grad_output):
  def backward(ctx, grad_output):
  def backward(ctx, grad_output):
  def backward(ctx, grad_output):
  def backward(ctx, grad_output):
  def backward(ctx, grad_output):
  samples = torch.load(tensor_file)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA GeForce RTX 3080 Laptop GPU') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Full dataset loaded from ./collision_bool/processed_test.pt


Testing: |          | 0/? [00:00<?, ?it/s]

────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
        test_acc            0.9404040575027466
        test_loss           0.1574736088514328
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
Test results: [{'test_loss': 0.1574736088514328, 'test_acc': 0.9404040575027466}]


In [2]:
from models.collisionnet import CollisionNet
# Initialize the CollisionNet model
collision_net = CollisionNet()

# Print the structure of the CollisionNet model
print(collision_net)

CollisionNet(
  (point_cloud_encoder): PointNet2(
    (SA_modules): ModuleList(
      (0): PointnetSAModule(
        (groupers): ModuleList(
          (0): QueryAndGroup()
        )
        (mlps): ModuleList(
          (0): Sequential(
            (0): Conv2d(4, 64, kernel_size=(1, 1), stride=(1, 1))
            (1): LeakyReLU(negative_slope=0.01, inplace=True)
            (2): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1))
            (3): LeakyReLU(negative_slope=0.01, inplace=True)
            (4): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1))
            (5): LeakyReLU(negative_slope=0.01, inplace=True)
          )
        )
      )
      (1): PointnetSAModule(
        (groupers): ModuleList(
          (0): QueryAndGroup()
        )
        (mlps): ModuleList(
          (0): Sequential(
            (0): Conv2d(67, 128, kernel_size=(1, 1), stride=(1, 1))
            (1): LeakyReLU(negative_slope=0.01, inplace=True)
            (2): Conv2d(128, 128, kernel_size=(1, 1), stride