In [1]:
import matplotlib.pylab as plt
import numpy as np

from hik.data.kitchen import Kitchen
from hik.data import PersonSequences
from hik.vis import plot_pose
from networkx.classes import nodes

In [2]:
dataset = "A"
# load geometry
kitchen = Kitchen.load_for_dataset(
    dataset=dataset,
    data_location="data/scenes"
)

# load poses
person_seqs = PersonSequences(
    person_path="data/poses"
)

smplx_path = "data/body_models"


100%|██████████| 319/319 [00:14<00:00, 21.96it/s]


In [3]:
import numpy as np

# Example skeleton edges for the 29 joints:
# Each tuple (i, j) indicates a bidirectional edge between joints i and j.
# Here, we just create a dummy list. Replace with your actual skeleton edges.
SKELETON_29 = [
    (0,1), (1,2), (2,3), (3,4), # e.g. a chain
    (2,5), (5,6), (6,7),        # branching, etc...
    # ...
]

def create_adjacency_29(num_joints=29):
    A = np.zeros((num_joints, num_joints), dtype=np.float32)
    for (i, j) in SKELETON_29:
        A[i, j] = 1.0
        A[j, i] = 1.0
    return A

A1 = create_adjacency_29()


<hik.data.person_sequence.PersonSequences at 0x78c6dcbbeca0>

In [None]:
# group_s2_indices is a list of lists, where each sub-list
# is the s1-joints that map to a single s2 node.
group_s2_indices = [
    [0,1],      # node 0 in s2 is the average of joints 0,1 in s1
    [2,3,4],    # node 1 is avg of s1 joints 2,3,4
    [5,6,7],    # node 2 is avg of s1 joints 5,6,7
    # ...
]

def group_poses(poses3d: np.ndarray, grouping: list) -> np.ndarray:
    """
    :param poses3d: shape (T, 29, 3) or (N, 29, 3)
    :param grouping: list of lists; each sub-list is the s1-joint indices to average
    :return: shape (T, #groups, 3)
    """
    # If input is (T, 29, 3), just do a loop over grouping
    T = poses3d.shape[0]
    num_groups = len(grouping)
    out = np.zeros((T, num_groups, 3), dtype=np.float32)
    for g_idx, grp in enumerate(grouping):
        out[:, g_idx] = np.mean(poses3d[:, grp, :], axis=1)
    return out

def create_adjacency_for_s2(grouping):
    """
    Build adjacency for the s2 scale.
    It's up to you how you define edges among these coarser parts.
    """
    num_parts = len(grouping)
    A2 = np.zeros((num_parts, num_parts), dtype=np.float32)
    # Example: connect them in some chain or some anatomically meaningful structure
    # For demonstration, let's just connect consecutive group indices
    for i in range(num_parts-1):
        A2[i, i+1] = 1.0
        A2[i+1, i] = 1.0
    return A2

A2 = create_adjacency_for_s2(group_s2_indices)


In [None]:
def slice_sequence(
        poses3d: np.ndarray,
        window_in: int = 25,
        window_out: int = 10
):
    """
    :param poses3d: shape (T, 29, 3)
    :param window_in: number of past frames used as input
    :param window_out: number of future frames to predict
    :return: a list of (input_poses, target_poses), each with shape (window_in, 29, 3) or (window_out, 29, 3)
    """
    T = poses3d.shape[0]
    samples = []
    # e.g. for t in range(0, T-window_in-window_out):
    for start in range(0, T - window_in - window_out + 1):
        in_poses = poses3d[start : start + window_in]
        out_poses = poses3d[start + window_in : start + window_in + window_out]
        samples.append((in_poses, out_poses))
    return samples


In [None]:
import torch
import torch.utils.data as data

class MotionPredictionDataset(data.Dataset):
    def __init__(
            self,
            person_seqs: PersonSequences,  # your loaded PersonSequences
            dataset_name: str,
            window_in=25,
            window_out=10,
            transform=None,
    ):
        """
        :param person_seqs: the PersonSequences object
        :param dataset_name: which dataset key to fetch from person_seqs
        :param window_in: how many frames to use as 'past'
        :param window_out: how many frames to predict
        :param transform: optional transform function for data augmentation, etc.
        """
        super().__init__()
        self.person_seqs = person_seqs
        self.dataset_name = dataset_name
        self.window_in = window_in
        self.window_out = window_out
        self.transform = transform

        # gather all PersonSequence objects for the chosen dataset
        self.sequences = person_seqs.get_sequences(dataset_name)

        # create a list of all (input, target) pairs across all sequences
        self.samples = []  # will hold tuples: (in_poses, out_poses)
        for seq in self.sequences:
            poses3d = seq.poses3d  # shape (T, 29, 3)
            # get slices
            seq_slices = slice_sequence(
                poses3d, window_in=window_in, window_out=window_out
            )
            # store them
            for (in_poses, out_poses) in seq_slices:
                self.samples.append((in_poses, out_poses))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        in_poses, out_poses = self.samples[index]  # shape => (window_in, 29, 3), (window_out, 29, 3)

        # Optional: multi-scale
        in_poses_s2 = group_poses(in_poses[None], group_s2_indices).squeeze(0)
        #   => shape (window_in, #groups_s2, 3)

        if self.transform is not None:
            in_poses, out_poses = self.transform(in_poses, out_poses)

        # convert to torch tensors (batch dimension can be added in collate)
        in_poses = torch.from_numpy(in_poses).float()  # => shape [window_in, 29, 3]
        in_poses_s2 = torch.from_numpy(in_poses_s2).float()  # => e.g. [window_in, 9, 3], depends on grouping
        out_poses = torch.from_numpy(out_poses).float()

        return {
            "in_s1": in_poses,      # (window_in, 29, 3)
            "in_s2": in_poses_s2,   # (window_in, #groups_s2, 3)
            "target": out_poses,    # (window_out, 29, 3)
        }


In [None]:
def motion_collate_fn(batch):
    """
    batch is a list of dicts, each from __getitem__.
    We'll transform them into batched tensors, e.g. [B, T, 29, 3].
    """
    in_s1_list  = [sample["in_s1"] for sample in batch]   # each (window_in, 29, 3)
    in_s2_list  = [sample["in_s2"] for sample in batch]
    target_list = [sample["target"] for sample in batch]

    # stack them
    in_s1  = torch.stack(in_s1_list, dim=0)    # => [B, window_in, 29, 3]
    in_s2  = torch.stack(in_s2_list, dim=0)    # => [B, window_in, #groups_s2, 3]
    target = torch.stack(target_list, dim=0)   # => [B, window_out, 29, 3]

    return in_s1, in_s2, target


In [None]:
from torch.utils.data import DataLoader

# Suppose you have your PersonSequences loaded:
person_seqs = PersonSequences(person_path="data/poses")

# Instantiate your dataset:
train_dataset = MotionPredictionDataset(
    person_seqs=person_seqs,
    dataset_name="A",      # or "B" / "C" / ...
    window_in=25,
    window_out=10
)

# Create a DataLoader:
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=0,   # or >0 if you want multiprocessing
    collate_fn=motion_collate_fn
)

# Now you can iterate:
for batch_idx, (in_s1, in_s2, target) in enumerate(train_loader):
    # in_s1: [B, 25, 29, 3]
    # in_s2: [B, 25, #groups_s2, 3]
    # target: [B, 10, 29, 3]

    # possibly rearrange if your model wants [B, 3, T, N]
    in_s1 = in_s1.permute(0, 3, 1, 2)  # => [B, 3, 25, 29]
    # likewise for in_s2

    # pass to your model, e.g. model(in_s1, in_s2)
    # ...
    pass
