In [1]:
import os
import sys
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

torch.set_printoptions(threshold=torch.inf)
sys.path.append(os.path.dirname(os.getcwd())) #Add project dir to path



In [2]:
fulldatasetpath = '/Volumes/Data_Drive/datasets/VIDIMU'

In [3]:
subjects = ["S40","S41","S42",  "S44",  "S46","S47","S48","S49",
            "S50","S51","S52","S53","S54","S55","S56","S57"]

lower_activities = ["A01","A02","A03","A04"]
upper_activities = ["A05","A06","A07","A08","A09","A10","A11","A12","A13"]

dataset_activities = lower_activities + upper_activities

activities_legend = ["walk_forward", "walk_backward", "walk_along","sit_to_stand",
"move_right_arm","move_left_arm","drink_right_arm","drink_left_arm", "assemble_both_arms","throw_both_arms",
"reachup_right_arm","reachup_left_arm","tear_both_arms"]

dpath = os.path.join(fulldatasetpath,'dataset','videoandimusyncrop')

In [4]:
csv_path = os.path.join(dpath, 'S40', 'S40_A02_T01.csv')
print(csv_path)

df_csv = pd.read_csv(csv_path)
print(f'Shape: {df_csv.shape}')
df_csv.head()

/Volumes/Data_Drive/datasets/VIDIMU/dataset/videoandimusyncrop/S40/S40_A02_T01.csv
Shape: (1116, 103)


Unnamed: 0.1,Unnamed: 0,pelvis_x,pelvis_y,pelvis_z,left_hip_x,left_hip_y,left_hip_z,right_hip_x,right_hip_y,right_hip_z,...,left_index_knuckle_z,right_index_knuckle_x,right_index_knuckle_y,right_index_knuckle_z,left_thumb_tip_x,left_thumb_tip_y,left_thumb_tip_z,right_thumb_tip_x,right_thumb_tip_y,right_thumb_tip_z
0,0,1434.0,214.9,6528.5,1432.1,202.9,6602.1,1435.6,205.6,6453.3,...,6762.0,1490.1,308.1,6261.4,1566.4,276.9,6750.4,1499.0,327.7,6270.9
1,1,1445.2,190.8,6582.0,1436.1,177.2,6655.9,1447.5,181.7,6506.6,...,6815.8,1503.6,308.3,6317.4,1570.4,277.0,6804.0,1512.5,327.2,6326.9
2,2,1446.4,170.0,6591.3,1429.4,155.0,6665.1,1449.7,161.2,6516.3,...,6823.7,1506.9,306.0,6331.1,1560.7,275.3,6811.8,1515.9,323.7,6340.6
3,3,1441.8,158.8,6574.3,1419.2,143.0,6647.7,1446.0,150.0,6499.8,...,6804.2,1504.0,303.0,6319.0,1547.0,273.8,6792.3,1512.9,319.8,6328.5
4,4,1436.9,153.1,6556.1,1411.3,136.7,6629.2,1442.1,144.1,6482.0,...,6784.1,1500.8,300.7,6304.8,1533.3,272.6,6772.1,1509.6,317.1,6314.3


In [5]:
# Define the file path
csv_path = os.path.join(dpath, 'S40', 'ik_S40_A02_T01.mot')
print(csv_path)

with open(csv_path, 'r') as file:
    # Read all lines
    lines = file.readlines()

for i, line in enumerate(lines):
    if 'endheader' in line:
        start_idx = i + 1  # Data starts after the 'endheader'
        break

from io import StringIO

data = ''.join(lines[start_idx:])
df_mot = pd.read_csv(StringIO(data), sep='\s+')

print(f'Shape: {df_mot.shape}')
df_mot.head()


/Volumes/Data_Drive/datasets/VIDIMU/dataset/videoandimusyncrop/S40/ik_S40_A02_T01.mot
Shape: (1860, 40)


Unnamed: 0,time,pelvis_tilt,pelvis_list,pelvis_rotation,pelvis_tx,pelvis_ty,pelvis_tz,hip_flexion_r,hip_adduction_r,hip_rotation_r,...,pro_sup_r,wrist_flex_r,wrist_dev_r,arm_flex_l,arm_add_l,arm_rot_l,elbow_flex_l,pro_sup_l,wrist_flex_l,wrist_dev_l
0,0.0,0.0,0.0,0.0,0,0.94,0,0.0,0.0,0.0,...,90.0,0,0,0.0,0.0,0.0,0.0,90.0,0,0
1,0.02,0.793786,-1.966724,89.740813,0,0.94,0,1.41367,-3.92351,-4.895766,...,89.674155,0,0,1.828323,-1.828323,0.325845,9.111087,89.674155,0,0
2,0.04,0.802144,-1.964312,89.737013,0,0.94,0,1.420787,-3.923891,-4.900815,...,89.674061,0,0,1.828826,-1.828826,0.325939,9.114137,89.674061,0,0
3,0.06,0.794658,-1.980499,89.717127,0,0.94,0,1.430931,-3.940055,-4.88869,...,89.673854,0,0,1.829894,-1.829894,0.326146,9.122819,89.673854,0,0
4,0.08,0.741252,-2.008496,89.707798,0,0.94,0,1.475868,-4.013878,-4.871714,...,89.673049,0,0,1.834054,-1.834054,0.326951,9.155096,89.673049,0,0


In [6]:
class IMUDataset(Dataset):
    def __init__(self, data_dir, window_size, num_windows=1, sampling_rate=30, random_windows=True, sequential=False):
        """
        Args:
            data_dir (string): Path to the directory with all the subject folders.
            window_size (int): The length of the window to sample from each file.
            num_windows (int): Number of windows to extract per file.
            sampling_rate (int): The sampling rate of the IMU data.
            random_windows (bool): Whether to randomly sample windows (default True).
            sequential (bool): Whether to sample sequential windows (default False).
        """
        self.data_dir = data_dir
        self.window_size = window_size
        self.num_windows = num_windows
        self.sampling_rate = sampling_rate
        self.random_windows = random_windows
        self.sequential = sequential
        self.data_files = self._get_data_files(extension=".mot")
        
        # Activity mapping (you can adjust this list according to your activity codes)
        self.activities = ["A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", "A10", "A11", "A12", "A13"]
        self.num_activities = len(self.activities)

    def _get_data_files(self, extension=".mot"):
        """Fetch only 'ik_' prefixed .mot file paths based on the extension."""
        data_files = []
        for subject in os.listdir(self.data_dir):
            subject_path = os.path.join(self.data_dir, subject)
            if os.path.isdir(subject_path):
                for activity_file in os.listdir(subject_path):
                    if activity_file.startswith('ik_') and activity_file.endswith(extension):
                        data_files.append(os.path.join(subject_path, activity_file))
        return data_files

    def _get_activity_from_filename(self, filename):
        """Extract activity code (e.g., 'A01') from the filename."""
        base_filename = os.path.basename(filename)  # Get just the filename without the full path
        activity_code = base_filename.split('_')[2]  # 'ik_SXX_A02_TXX.mot' -> extracts 'A02'
        return activity_code

    def _one_hot_encode(self, activity_code):
        """Generate a one-hot encoded vector for the activity."""
        one_hot_vector = np.zeros(self.num_activities)
        index = self.activities.index(activity_code)
        one_hot_vector[index] = 1
        return torch.tensor(one_hot_vector, dtype=torch.float32)

    def __len__(self):
        return len(self.data_files) * self.num_windows  # Reflect total windows in the dataset

    def __getitem__(self, idx):
        """Load and return multiple windows from the dataset along with the one-hot encoded activity label."""
        file_idx = idx // self.num_windows  # Map to the original file index
        mot_file = self.data_files[file_idx]
        
        # Read the data and remove the first column
        data = pd.read_csv(mot_file, skiprows=self._find_start_of_data(mot_file), sep='\s+')
        data = data.iloc[:, 1:]  # Remove first column
        imu_data = torch.tensor(data.values, dtype=torch.float32)

        # Extract the activity from the filename and generate the one-hot encoded vector
        activity_code = self._get_activity_from_filename(mot_file)
        one_hot_label = self._one_hot_encode(activity_code)
        
        # Duplicate the one-hot label to match the window size
        label_windows = one_hot_label.repeat(self.window_size, 1)

        data_length = imu_data.shape[0]
        imu_windows = []

        # Sample the required number of windows
        for _ in range(self.num_windows):
            if data_length < self.window_size:
                padded_data = torch.zeros((self.window_size, imu_data.shape[1]))
                padded_data[:data_length] = imu_data
                imu_windows.append(padded_data)
            else:
                start_idx = np.random.randint(0, data_length - self.window_size + 1) if self.random_windows else 0
                imu_windows.append(imu_data[start_idx:start_idx + self.window_size])

        imu_windows = torch.stack(imu_windows)

        return imu_windows, label_windows

    def _find_start_of_data(self, filepath):
        """Find where the data starts in the .mot file (after 'endheader')."""
        with open(filepath, 'r') as file:
            for i, line in enumerate(file):
                if 'endheader' in line:
                    return i + 1
        return 0

In [7]:
# Example usage
imu_dataset = IMUDataset(data_dir=dpath, window_size=100)
imu_loader = DataLoader(imu_dataset, batch_size=32, shuffle=True)

# Fetch one batch and check its shape
imu_batch, label_batch = next(iter(imu_loader))  # Get one batch

print(f'IMU batch shape: {imu_batch.shape}')  # Shape: (batch_size, num_windows, window_size, num_features)
print(f'Label batch shape: {label_batch.shape}')  # Shape: (batch_size, window_size, num_activities)
# print(f'Example one-hot label: {label_batch[0]}')  # Example of a one-hot label for the first sample

IMU batch shape: torch.Size([32, 1, 100, 39])
Label batch shape: torch.Size([32, 100, 13])


In [8]:
class VideoDataset(Dataset):
    def __init__(self, data_dir, window_size, num_windows=1, sampling_rate=50, random_windows=True, sequential=False):
        """
        Args:
            data_dir (string): Path to the directory with all the subject folders.
            window_size (int): The length of the window to sample from each file.
            num_windows (int): Number of windows to extract per file.
            sampling_rate (int): The sampling rate of the video data.
            random_windows (bool): Whether to randomly sample windows (default True).
            sequential (bool): Whether to sample sequential windows (default False).
        """
        self.data_dir = data_dir
        self.window_size = window_size
        self.num_windows = num_windows
        self.sampling_rate = sampling_rate
        self.random_windows = random_windows
        self.sequential = sequential
        self.data_files = self._get_data_files(extension=".csv")
        
        # Activity mapping (same as IMU dataset)
        self.activities = ["A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", "A10", "A11", "A12", "A13"]
        self.num_activities = len(self.activities)

    def _get_data_files(self, extension=".csv"):
        """Fetch all the video .csv file paths."""
        data_files = []
        for subject in os.listdir(self.data_dir):
            subject_path = os.path.join(self.data_dir, subject)
            if os.path.isdir(subject_path):
                for activity_file in os.listdir(subject_path):
                    if activity_file.endswith(extension):
                        data_files.append(os.path.join(subject_path, activity_file))
        return data_files

    def _get_activity_from_filename(self, filename):
        """Extract activity code (e.g., 'A01') from the filename."""
        base_filename = os.path.basename(filename)  # Get just the filename without the full path
        activity_code = base_filename.split('_')[1]  # Assuming format is 'SXX_A02_TXX.csv'
        return activity_code

    def _one_hot_encode(self, activity_code):
        """Generate a one-hot encoded vector for the activity."""
        one_hot_vector = np.zeros(self.num_activities)
        index = self.activities.index(activity_code)
        one_hot_vector[index] = 1
        return torch.tensor(one_hot_vector, dtype=torch.float32)

    def __len__(self):
        return len(self.data_files) * self.num_windows  # Reflect total windows in the dataset

    def __getitem__(self, idx):
        """Load and return multiple windows from the video dataset along with the one-hot encoded activity label."""
        file_idx = idx // self.num_windows  # Map to the original file index
        csv_file = self.data_files[file_idx]
        
        # Load video data from the CSV file
        data = pd.read_csv(csv_file)
        data = data.iloc[:, 1:]  # Remove first column (if needed)
        video_data = torch.tensor(data.values, dtype=torch.float32)

        # Extract the activity from the filename and generate the one-hot encoded vector
        activity_code = self._get_activity_from_filename(csv_file)
        one_hot_label = self._one_hot_encode(activity_code)
        
        # Duplicate the one-hot label to match the window size
        label_windows = one_hot_label.repeat(self.window_size, 1)

        data_length = video_data.shape[0]
        video_windows = []

        # Sample the required number of windows
        for _ in range(self.num_windows):
            if data_length < self.window_size:
                padded_data = torch.zeros((self.window_size, video_data.shape[1]))
                padded_data[:data_length] = video_data
                video_windows.append(padded_data)
            else:
                start_idx = np.random.randint(0, data_length - self.window_size + 1) if self.random_windows else 0
                video_windows.append(video_data[start_idx:start_idx + self.window_size])

        video_windows = torch.stack(video_windows)

        return video_windows, label_windows

In [9]:
# Example usage for the VideoDataset
video_dataset = VideoDataset(data_dir=dpath, window_size=100)
video_loader = DataLoader(video_dataset, batch_size=32, shuffle=True)

# Fetch one batch and check its shape
video_batch, label_batch = next(iter(video_loader))  # Get one batch

print(f'Video batch shape: {video_batch.shape}')  # Shape: (batch_size, num_windows, window_size, num_features)
print(f'Label batch shape: {label_batch.shape}')  # Shape: (batch_size, window_size, num_activities)
# print(f'Example one-hot label: {label_batch[0]}')  # Example of a one-hot label for the first sample

Video batch shape: torch.Size([32, 1, 100, 102])
Label batch shape: torch.Size([32, 100, 13])


In [10]:
class IMUVideoDataset(Dataset):
    def __init__(self, data_dir, time_in_seconds, imu_sampling_rate=30, video_sampling_rate=50, num_windows=1, random_windows=True):
        """
        Args:
            data_dir (string): Path to the directory with all the subject folders.
            time_in_seconds (float): The length of the time window in seconds.
            imu_sampling_rate (int): The sampling rate of the IMU data.
            video_sampling_rate (int): The sampling rate of the video data.
            num_windows (int): Number of windows to extract per file.
            random_windows (bool): Whether to randomly sample windows (default True).
        """
        self.data_dir = data_dir
        self.time_in_seconds = time_in_seconds
        self.imu_sampling_rate = imu_sampling_rate
        self.video_sampling_rate = video_sampling_rate
        self.num_windows = num_windows
        self.random_windows = random_windows

        # Compute the window sizes for IMU and video based on the time input
        self.imu_window_size = int(self.time_in_seconds * self.imu_sampling_rate)
        self.video_window_size = int(self.time_in_seconds * self.video_sampling_rate)
        
        # Fetch the IMU and video data files
        self.imu_files = self._get_data_files(extension=".mot", prefix="ik_")
        self.video_files = self._get_data_files(extension=".csv", prefix="S")
        
        # Activity mapping (same as IMU and Video datasets)
        self.activities = ["A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", "A10", "A11", "A12", "A13"]
        self.num_activities = len(self.activities)

    def _get_data_files(self, extension, prefix):
        """Fetch the file paths based on extension and prefix."""
        data_files = []
        for subject in os.listdir(self.data_dir):
            subject_path = os.path.join(self.data_dir, subject)
            if os.path.isdir(subject_path):
                for activity_file in os.listdir(subject_path):
                    if activity_file.startswith(prefix) and activity_file.endswith(extension):
                        data_files.append(os.path.join(subject_path, activity_file))
        return data_files

    def _get_activity_from_filename(self, filename):
        """Extract activity code (e.g., 'A01') from the filename."""
        base_filename = os.path.basename(filename)  # Get just the filename without the full path
        activity_code = base_filename.split('_')[2]  # 'ik_SXX_A02_TXX.mot' -> extracts 'A02'
        return activity_code

    def _one_hot_encode(self, activity_code):
        """Generate a one-hot encoded vector for the activity."""
        one_hot_vector = np.zeros(self.num_activities)
        index = self.activities.index(activity_code)
        one_hot_vector[index] = 1
        return torch.tensor(one_hot_vector, dtype=torch.float32)

    def __len__(self):
        return len(self.imu_files) * self.num_windows  # Reflect total windows in the dataset

    def __getitem__(self, idx):
        """Load and return IMU and video data along with the one-hot encoded activity label."""
        file_idx = idx // self.num_windows  # Map to the original file index

        # Load IMU data
        imu_file = self.imu_files[file_idx]
        imu_data = pd.read_csv(imu_file, skiprows=self._find_start_of_data(imu_file), sep='\s+').iloc[:, 1:]
        imu_data = torch.tensor(imu_data.values, dtype=torch.float32)

        # Load video data
        video_file = self.video_files[file_idx]
        video_data = pd.read_csv(video_file).iloc[:, 1:]
        video_data = torch.tensor(video_data.values, dtype=torch.float32)

        # Extract the activity from the filename and generate the one-hot encoded vector
        activity_code = self._get_activity_from_filename(imu_file)
        one_hot_label = self._one_hot_encode(activity_code)
        
        # Duplicate the one-hot label to match the window size
        label_windows = one_hot_label.repeat(self.imu_window_size, 1)  # Same label for IMU and video
        
        # Sample windows from IMU and video data
        imu_length = imu_data.shape[0]
        video_length = video_data.shape[0]

        imu_windows = []
        video_windows = []

        # IMU Data
        for _ in range(self.num_windows):
            if imu_length < self.imu_window_size:
                padded_data = torch.zeros((self.imu_window_size, imu_data.shape[1]))
                padded_data[:imu_length] = imu_data
                imu_windows.append(padded_data)
            else:
                start_idx = np.random.randint(0, imu_length - self.imu_window_size + 1) if self.random_windows else 0
                imu_windows.append(imu_data[start_idx:start_idx + self.imu_window_size])

        # Video Data
        for _ in range(self.num_windows):
            if video_length < self.video_window_size:
                padded_data = torch.zeros((self.video_window_size, video_data.shape[1]))
                padded_data[:video_length] = video_data
                video_windows.append(padded_data)
            else:
                start_idx = np.random.randint(0, video_length - self.video_window_size + 1) if self.random_windows else 0
                video_windows.append(video_data[start_idx:start_idx + self.video_window_size])

        imu_windows = torch.stack(imu_windows)
        video_windows = torch.stack(video_windows)

        return imu_windows, video_windows, label_windows

    def _find_start_of_data(self, filepath):
        """Find where the data starts in the .mot file (after 'endheader')."""
        with open(filepath, 'r') as file:
            for i, line in enumerate(file):
                if 'endheader' in line:
                    return i + 1
        return 0

In [11]:
# Example usage
imu_video_dataset = IMUVideoDataset(data_dir=dpath, time_in_seconds=2)  # 2 seconds window
imu_video_loader = DataLoader(imu_video_dataset, batch_size=32, shuffle=True)

# Fetch one batch and check its shape
imu_batch, video_batch, label_batch = next(iter(imu_video_loader))  # Get one batch

print(f'IMU batch shape: {imu_batch.shape}')  # Shape: (batch_size, num_windows, imu_window_size, num_imu_features)
print(f'Video batch shape: {video_batch.shape}')  # Shape: (batch_size, num_windows, video_window_size, num_video_features)
print(f'Label batch shape: {label_batch.shape}')  # Shape: (batch_size, imu_window_size, num_activities)
# print(f'Example one-hot label: {label_batch[0]}')  # Example of a one-hot label for the first sample

IMU batch shape: torch.Size([32, 1, 60, 39])
Video batch shape: torch.Size([32, 1, 100, 102])
Label batch shape: torch.Size([32, 60, 13])
