In [74]:
import numpy as np
import torch
import os
import json
from torch.utils.data import Dataset, DataLoader

In [97]:
class PedestrianDataset(Dataset):
    def __init__(self,dir_path: str, device='cpu', horizon=20):
        """
        A dataset class for pedestrian data, designed for training motion prediction models.
    
        Parameters:
        -----------
        dir_path : str
            Path to the directory containing the dataset. The directory is expected to include 
            files with pedestrian positions and potentially other necessary data for training.
        
        device : str, optional (default='cpu')
            The device where tensors will be created and stored (e.g., 'cpu' or 'cuda').
        
        horizon : int, optional (default=20)
            Number of past steps used to create the motion history for each pedestrian.
            Determines the size of the historical data tensor: (N, T, 2), where:
            - N — number of pedestrians (or batch size);
            - T — length of the temporal history (equal to the horizon);
            - 2 — coordinates (x, y).
        
        Example Usage:
        --------------
        dataset = PedestrianDataset(dir_path='./data', device='cuda', horizon=30)
        """
        super().__init__()
        self._horizon = horizon
        self._path = dir_path
        if not os.path.exists(self._path):
            raise FileExistsError()
        self._device = self._check_device(device=device)
        self.scenes = []
        self.frame_indices = []  # Map global indices to scene/frame

        for scene_file in sorted(os.listdir(self._path),key=lambda x: int(x.split('_')[1].split(".")[0])):
            if scene_file.endswith('.json'):
                scene_path = os.path.join(self._path, scene_file)
                with open(scene_path, 'r') as f:
                    scene_data = json.load(f)
                    num_steps = len(scene_data['robot']['action'])
                    self.scenes.append(scene_data)
                    self.frame_indices.extend([(len(self.scenes) - 1, num_steps)])
    def __len__(self):
        return len(self.frame_indices)
    def __getitem__(self, idx):
        scene_idx, frame_idx = self._find_idx(global_idx=idx)
        print(f"Scene id is {scene_idx}, local index {frame_idx}")
        scene_data = self.scenes[scene_idx]
        pedestrians = scene_data['pedestrians']
        poses = np.array([ped['pose'] for ped in pedestrians])
        robot_goal = scene_data['robot']['goal']
        if frame_idx < self._horizon:
            add = poses[:, [0] * (self._horizon - frame_idx), :]
            history = np.concatenate((add,poses[:,:frame_idx]),axis=1)
        else:
            history = poses[:, frame_idx - self._horizon:frame_idx, :]
        return torch.tensor(history,dtype=torch.float32,device=self._device)

    
    def _find_idx(self,global_idx):
        total_length = 0
        for scene_id, frame_length in self.frame_indices:
            if (total_length <= global_idx) and (global_idx < total_length + frame_length):
                local_idx = global_idx - total_length
                return scene_id, local_idx
            else:
                total_length+=frame_length

    def _check_device(self, device: str) -> str:
        """
        Validates the chosen device and falls back to available alternatives.

        Parameters:
        -----------
        device : str
            Desired device ('cuda', 'mps', or 'cpu').

        Returns:
        --------
        str
            The validated device to be used.
        """
        if device == 'cuda' and torch.cuda.is_available():
            return 'cuda'
        elif device == 'mps' and torch.backends.mps.is_available():
            return 'mps'
        elif device == 'cpu':
            return 'cpu'
        else:
            # Fall back to the first available device, or CPU if none are available
            print(f"Warning: Device '{device}' is not available. Falling back to an available device.")
            if torch.cuda.is_available():
                return 'cuda'
            elif torch.backends.mps.is_available():
                return 'mps'
            else:
                return 'cpu'
            


In [104]:
dataset = PedestrianDataset(dir_path="../dataset",device="cpu",horizon=10)
test = dataset[100]
test.shape

Scene id is 2, local index 7


torch.Size([7, 10, 3])