In [14]:
import argparse
import random
from typing import Any, Dict
import tqdm

import numpy as np
import torch
import torch.utils.data
from torch import nn
from torch.cuda import amp

import torchsparse
from torchsparse import SparseTensor
from torchsparse import nn as spnn
from torchsparse.utils.collate import sparse_collate_fn
from torchsparse.utils.quantize import sparse_quantize

In [17]:
inputs = np.random.uniform(-100, 100, size=(10000, 4))
labels = np.random.choice(10, size=10000)

coords, feats = inputs[:, :3], inputs
coords -= np.min(coords, axis=0, keepdims=True)
coords, indices = sparse_quantize(coords,
                                  0.2,
                                  return_index=True)

coords = np.column_stack((batches, coords))

coords = torch.tensor(coords, dtype=torch.int)
feats = torch.tensor(feats[indices], dtype=torch.float)
labels = torch.tensor(labels[indices], dtype=torch.long)

print(coords.shape)
print(feats.shape)
print(labels.shape)

input = SparseTensor(coords=coords, feats=feats)
label = SparseTensor(coords=coords, feats=labels)

torch.Size([10000, 4])
torch.Size([10000, 4])
torch.Size([10000])


In [18]:
dataset = input, label
dataflow = torch.utils.data.DataLoader(
    dataset,
    batch_size=32,
    collate_fn=sparse_collate_fn,
)

# Instead of args.device
device = 'cuda'  # or 'cpu' depending on your preference

# Instead of args.amp_enabled
amp_enabled = True  # Set to True or False based on your preference

model = nn.Sequential(
    spnn.Conv3d(4, 32, 3),
    spnn.BatchNorm(32),
    spnn.ReLU(True),
    spnn.Conv3d(32, 64, 2, stride=2),
    spnn.BatchNorm(64),
    spnn.ReLU(True),
    spnn.Conv3d(64, 64, 2, stride=2, transposed=True),
    spnn.BatchNorm(64),
    spnn.ReLU(True),
    spnn.Conv3d(64, 32, 3),
    spnn.BatchNorm(32),
    spnn.ReLU(True),
    spnn.Conv3d(32, 10, 1),
).to(device)



In [19]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scaler = amp.GradScaler(enabled=amp_enabled)

for k, feed_dict in enumerate(dataflow):
    inputs = feed_dict[0].to(device=device)
    labels = feed_dict[1].to(device=device)
    print(k)
    with amp.autocast(enabled=amp_enabled):
        outputs = model(inputs)
        loss = criterion(outputs.feats, labels.feats)
    
    print(f'[step {k + 1}] loss = {loss.item()}')

    optimizer.zero_grad()
    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()

# enable torchsparse 2.0 inference
model.eval()
# enable fused and locality-aware memory access optimization
torchsparse.backends.benchmark = True  # type: ignore

with torch.no_grad():
    for k, feed_dict in enumerate(dataflow):
        inputs = feed_dict[0].to(device=device).half()
        labels = feed_dict[1].to(device=device)
        print(k)
        with amp.autocast(enabled=True):
            outputs = model(inputs)
            loss = criterion(outputs.feats, labels.feats)

        print(f'[inference step {k + 1}] loss = {loss.item()}')

0
[step 1] loss = 2.359276533126831
0
[inference step 1] loss = 2.3025295734405518


In [None]:
class RandomDataset:

    def __init__(self, input_size: int, voxel_size: float) -> None:
        self.input_size = input_size # load in the data
        self.voxel_size = voxel_size

    def __getitem__(self, _: int) -> Dict[str, Any]:
        inputs = np.random.uniform(-100, 100, size=(self.input_size, 4)) # replace with data
        labels = np.random.choice(10, size=self.input_size)

        coords, feats = inputs[:, :3], inputs
        coords -= np.min(coords, axis=0, keepdims=True)
        coords, indices = sparse_quantize(coords,
                                          self.voxel_size,
                                          return_index=True)
        
        coords = torch.tensor(coords, dtype=torch.int)
        feats = torch.tensor(feats[indices], dtype=torch.float)
        labels = torch.tensor(labels[indices], dtype=torch.long)

        print(coords.shape)
        print(feats.shape)
        print(labels.shape)
        
        input = SparseTensor(coords=coords, feats=feats)
        label = SparseTensor(coords=coords, feats=labels)

        
        return {'input': input, 'label': label}

    def __len__(self):
        return 100

In [23]:
import torch
from torch.utils.data import Dataset, DataLoader

# Step 1: Create a custom dataset class
class CustomDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Return a single data sample and its corresponding label
        return self.data[idx], self.labels[idx]

# Step 2: Create an instance of your custom dataset
data = torch.randn(100, 3, 64, 64)  # Random data for demonstration
labels = torch.randint(0, 10, (100,))  # Random labels for demonstration

print(data.shape)
print(labels.shape)

torch.Size([100, 3, 64, 64])
torch.Size([100])


In [24]:
custom_dataset = CustomDataset(data, labels)

# Step 3: Create a DataLoader
batch_size = 16
shuffle = True  # Shuffle the data during each epoch
num_workers = 4  # Number of CPU processes to use for data loading (for parallelism)

data_loader = DataLoader(
    custom_dataset,
    batch_size=batch_size,
    shuffle=shuffle,
    num_workers=num_workers
)

# Step 4: Iterate over the DataLoader
for batch_idx, (batch_data, batch_labels) in enumerate(data_loader):
    # 'batch_data' is a batch of input data
    # 'batch_labels' is a batch of corresponding labels
    
    # In a real training loop, you would perform your training steps here,
    # e.g., forward and backward passes through a neural network.
    
    print(f"Batch {batch_idx + 1}:")
    print("Data shape:", batch_data.shape)
    print("Labels:", batch_labels)
    
    # For simplicity, let's break the loop after one iteration
    if batch_idx == 0:
        break


Batch 1:
Data shape: torch.Size([16, 3, 64, 64])
Labels: tensor([7, 4, 4, 0, 4, 8, 6, 6, 3, 7, 2, 0, 5, 1, 4, 6])
