In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import dataset_utils
import importlib
import matplotlib
import numpy as np

In [6]:
class PointRNNCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(PointRNNCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # Linear transformations for the input and hidden state
        self.W_i = nn.Linear(input_size, hidden_size)
        self.W_h = nn.Linear(hidden_size, hidden_size)

    def forward(self, x, h_prev):
        # Compute the new hidden state
        h_new = torch.tanh(self.W_i(x) + self.W_h(h_prev))
        return h_new

class PointRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(PointRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # Create RNN layers
        self.rnn_cells = nn.ModuleList(
            [PointRNNCell(input_size if i == 0 else hidden_size, hidden_size) for i in range(num_layers)]
        )

        # Fully connected layer for classification
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, point_cloud_seq):
        # Initialize hidden states
        h = [torch.zeros(point_cloud_seq.size(0), self.hidden_size).to(point_cloud_seq.device) for _ in range(self.num_layers)]

        # Process each point cloud in the sequence
        for t in range(point_cloud_seq.size(1)):
            x_t = point_cloud_seq[:, t]  # Extract point cloud at time step t
            for i, cell in enumerate(self.rnn_cells):
                h[i] = cell(x_t, h[i])

        # Use the final hidden state for classification
        out = self.fc(h[-1])
        return out


In [2]:
class PointCloudDataset(Dataset):
    def __init__(self, num_samples, seq_length, num_points, num_classes):
        self.num_samples = num_samples
        self.seq_length = seq_length
        self.num_points = num_points
        self.num_classes = num_classes

        # Generate random point cloud sequences and labels
        self.data = torch.randn(num_samples, seq_length, num_points, 3)  # Random 3D points
        self.labels = torch.randint(0, num_classes, (num_samples,))  # Random labels

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        point_cloud_seq = self.data[idx]
        label = self.labels[idx]
        # Flatten the points in each point cloud for RNN input
        point_cloud_seq = point_cloud_seq.view(self.seq_length, -1)
        return point_cloud_seq, label


In [7]:
def train(model, dataloader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0.0
        for batch_idx, (point_cloud_seq, labels) in enumerate(dataloader):
            point_cloud_seq, labels = point_cloud_seq.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(point_cloud_seq)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        avg_loss = total_loss / len(dataloader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")


In [9]:
# Hyperparameters
num_samples = 1000
seq_length = 10  # Number of point clouds in a sequence
num_points = 100  # Number of points in each point cloud
input_size = 5  # Each point has x, y, z coordinates
hidden_size = 128
num_layers = 2
num_classes = 3  # Number of output classes
batch_size = 8
num_epochs = 10
learning_rate = 0.001

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')



In [None]:
# Dataset and DataLoader
dataset = PointCloudDataset(num_samples, seq_length, num_points, num_classes)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize the model, loss function, and optimizer
model = PointRNN(input_size * num_points, hidden_size, num_layers, num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [11]:
DATA_DIR = "data/entire_field_raw_3_class/"
NUM_POINTS = 100
NUM_CLASSES = 2
BATCH_SIZE = 32
train_points, test_points, train_labels, test_labels, CLASS_MAP = dataset_utils.parse_dataset(NUM_POINTS, DATA_DIR)

processing class: jumping
processing class: walking
