In [7]:
import os
import pytorch_lightning
import pytorchvideo.data
import torch.utils.data

  from .autonotebook import tqdm as notebook_tqdm


In [1]:
class ASLDataModule(pytorch_lightning.LightningDataModule):

  # Dataset configuration
  _DATA_PATH = "../WLASL/start_kit/videos/"
  _CLIP_DURATION = 2  # Duration of sampled clip for each video
  _BATCH_SIZE = 8
  _NUM_WORKERS = 8  # Number of parallel processes fetching data

  def train_dataloader(self):
    """
    Create the ASL train partition from the list of video labels
    in {self._DATA_PATH}/train
    """
    train_dataset = pytorchvideo.data.Kinetics(
        data_path=os.path.join(self._DATA_PATH, "train"),
        clip_sampler=pytorchvideo.data.make_clip_sampler("random", self._CLIP_DURATION),
        decode_audio=False,
    )
    return torch.utils.data.DataLoader(
        train_dataset,
        batch_size=self._BATCH_SIZE,
        num_workers=self._NUM_WORKERS,
    )

  def val_dataloader(self):
    """
    Create the Kinetics validation partition from the list of video labels
    in {self._DATA_PATH}/val
    """
    val_dataset = pytorchvideo.data.Kinetics(
        data_path=os.path.join(self._DATA_PATH, "val"),
        clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", self._CLIP_DURATION),
        decode_audio=False,
    )
    return torch.utils.data.DataLoader(
        val_dataset,
        batch_size=self._BATCH_SIZE,
        num_workers=self._NUM_WORKERS,
    )

In [8]:
pytorchvideo.data.Kinetics

<function pytorchvideo.data.kinetics.Kinetics(data_path: str, clip_sampler: pytorchvideo.data.clip_sampling.ClipSampler, video_sampler: Type[torch.utils.data.sampler.Sampler] = <class 'torch.utils.data.sampler.RandomSampler'>, transform: Union[Callable[[Dict[str, Any]], Dict[str, Any]], NoneType] = None, video_path_prefix: str = '', decode_audio: bool = True, decoder: str = 'pyav') -> pytorchvideo.data.labeled_video_dataset.LabeledVideoDataset>

In [4]:
labels = os.listdir(video_dir)


In [1]:
pip list

Package                       Version
----------------------------- -----------
anyio                         3.6.2
appnope                       0.1.3
argon2-cffi                   21.1.0
asttokens                     2.2.1
attrs                         22.2.0
Babel                         2.12.1
backcall                      0.2.0
backports.functools-lru-cache 1.6.4
beautifulsoup4                4.12.2
bleach                        6.0.0
brotlipy                      0.7.0
certifi                       2022.12.7
cffi                          1.15.1
charset-normalizer            3.1.0
colorama                      0.4.6
cryptography                  39.0.1
decorator                     5.1.1
defusedxml                    0.7.1
entrypoints                   0.4
executing                     1.2.0
fastjsonschema                2.16.3
fsspec                        2023.4.0
future                        0.18.3
idna                          3.4
importlib-metadata            6.6.0
importlib

In [26]:
import torch
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split

class VideoDataset(Dataset):
    def __init__(self, data_dir, split="train", test_size=0.1, val_size=0.1, random_state=429):
        self.labels = os.listdir(data_dir)
        self.label_to_idx = {label: idx for idx, label in enumerate(self.labels)}
        self.data = []
        for label in self.labels:
            label_dir = os.path.join(data_dir, label)
            video_files = os.listdir(label_dir)
            video_paths = [os.path.join(label_dir, video_file) for video_file in video_files]

            train_val_paths, test_paths = train_test_split(video_paths, test_size=test_size, random_state=random_state)
            train_paths, val_paths = train_test_split(train_val_paths, test_size=val_size/(1-test_size), random_state=random_state)

            if split == "train":
                self.data += [(video_path, label) for video_path in train_paths]
            elif split == "val":
                self.data += [(video_path, label) for video_path in val_paths]
            elif split == "test":
                self.data += [(video_path, label) for video_path in test_paths]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        video_path, label = self.data[idx]
        video = cv2.VideoCapture(video_path)
        frames = []
        while True:
            ret, frame = video.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.resize(frame, (224, 224))
            frame = transforms.ToTensor()(frame)
            frames.append(frame)
        video_tensor = torch.stack(frames, dim=0)
        label_idx = self.label_to_idx[label]
        return video_tensor, label_idx


In [27]:
train_data = VideoDataset(video_dir, split="train")
train_loader = DataLoader(train_data, batch_size=4, shuffle=True)

val_data = VideoDataset(video_dir, split="val")
val_loader = DataLoader(val_data, batch_size=4, shuffle=True)

test_data = VideoDataset(video_dir, split="test")
test_loader = DataLoader(test_data, batch_size=4, shuffle=True)

In [34]:
train_loader

<torch.utils.data.dataloader.DataLoader at 0x13d8d0550>