In [None]:
from combo_dataloader import ComboDataLoader, ComboDLTransform, DataLoaderType
import torchvision
import time
import torch.nn as nn
import pytorch_lightning
import torch
import json
from typing import List, Tuple
import os
import torchmetrics
import random

### Setting up video inputs and model

**Load in video paths and labels**

In [None]:
with open("kinetics_classnames.json", "r") as f:
    kinetics_classnames_json = json.load(f)

In [None]:
kinetics_classnames_to_id = {}
for k, v in kinetics_classnames_json.items():
    kinetics_classnames_to_id[str(k).replace('"', "")] = v
# Create an id to label name mapping
kinetics_id_to_classname = {}
for k, v in kinetics_classnames_to_id.items():
    kinetics_id_to_classname[v] = k

In [None]:
null_videos = {
    "/home/maureen/kinetics/kinetics400_10classes/train/xxUezLcXkDs_000256_000266.mp4",
    "/home/maureen/kinetics/kinetics400_10classes/train/CUxsn4YXksI_000119_000129.mp4"
}

In [None]:
def load_video_paths(annotation_file_path, video_base_path, shuffle=True) -> Tuple[List[str], List[int]]:
	video_paths = []
	labels = []
	with open(annotation_file_path, 'r') as annotation_file:
		for i, line in enumerate(annotation_file):
			if i != 0: # skip column headers
				line = annotation_file.readline()
				if line:
					label, youtube_id, time_start, time_end, split, is_cc = line.strip().split(',')
					label_id = kinetics_classnames_to_id.get(label)
					vpath = f'{video_base_path}/{split}/{youtube_id}_{int(time_start):06d}_{int(time_end):06d}.mp4'

					if os.path.exists(vpath) and vpath not in null_videos:
						video_paths.append(vpath)
						labels.append(label_id)

	if shuffle:
		combined = list(zip(video_paths, labels))
		random.shuffle(combined)
		video_paths, labels = zip(*combined)

	return video_paths, labels

In [None]:
val_paths, val_labels = load_video_paths(
    '/home/maureen/kinetics/kinetics400_10classes/annotations/val.csv',
    '/home/maureen/kinetics/kinetics400_10classes'
)
train_paths, train_labels = load_video_paths(
    '/home/maureen/kinetics/kinetics400_10classes/annotations/train.csv',
    '/home/maureen/kinetics/kinetics400_10classes'
)

**Set up transform**

In [None]:
transform = ComboDLTransform(
		crop=112,
		mean=[0.43216, 0.394666, 0.37645],
		std=[0.22803 , 0.22145 , 0.216989],
		short_side_scale=128
)


**Using only a DALI dataloader**

In [None]:
# dl = ComboDataLoader(
# 		dataloaders=[DataLoaderType.DALI],
# 		dataloader_portions=[1],
# 		video_paths=train_paths[:50],
# 		transform=transform,
# 		stride=2,
# 		step=32,
# 		sequence_length=16,
# 		fps=32,
# 		batch_size=8,
# 		dali_pipeline_kwargs={"num_threads": 10}
# )

In [None]:
# start = time.perf_counter()
# for batch in dl:
#     pass
# dali_time = time.perf_counter() - start
# dali_time

**Using PyTorch with a Decord backend**

Using decord, we can push the resize down to the decoding step to get over 2x speedup.

In [None]:
# # Create the dataloader
# dl = ComboDataLoader(
# 		dataloaders=[DataLoaderType.PYTORCH],
# 		dataloader_portions=[1],
# 		video_paths=train_paths[:50],
# 		transform=transform,
# 		stride=2,
# 		step=32,
# 		sequence_length=16,
# 		fps=32,
# 		batch_size=8,
# 		pytorch_dataloader_kwargs={"num_workers": 10},
# 		pytorch_dataset_kwargs=dict(decoder="decord", short_side_scale=128),
# )

In [None]:
# start = time.perf_counter()
# for batch in dl:
#     pass
# pytorch_decord_time = time.perf_counter() - start
# pytorch_decord_time

**Using the optimal combination of DALI and PyTorch with a Decord backend**

In [None]:
# dali_portion = int(round(pytorch_decord_time / (pytorch_decord_time + dali_time) * 100))
# pytorch_portion = int(round(dali_time / (pytorch_decord_time + dali_time) * 100))

# # Expected time with these portions
# dali_portion / 100 * dali_time

In [None]:
dali_portion = 30
pytorch_portion = 70

In [None]:
# Create the dataloader
train_dl = ComboDataLoader(
    dataloaders=[DataLoaderType.PYTORCH, DataLoaderType.DALI],
    dataloader_portions=[pytorch_portion, dali_portion],
    video_paths=train_paths,
    labels=train_labels,
    transform=transform,
    stride=2,
    step=32,
    sequence_length=16,
    fps=32,
    batch_size=8,
    pytorch_dataloader_kwargs={"num_workers": 10},
    pytorch_dataset_kwargs=dict(decoder="decord", short_side_scale=128),
    dali_pipeline_kwargs={"num_threads": 10},
)
test_dl = ComboDataLoader(
    dataloaders=[DataLoaderType.PYTORCH, DataLoaderType.DALI],
    dataloader_portions=[pytorch_portion, dali_portion],
    video_paths=val_paths,
    labels=val_labels,
    transform=transform,
    stride=2,
    step=32,
    sequence_length=16,
    fps=32,
    batch_size=8,
    pytorch_dataloader_kwargs={"num_workers": 10},
    pytorch_dataset_kwargs=dict(decoder="decord", short_side_scale=128),
    dali_pipeline_kwargs={"num_threads": 10},
)

### Train loop

In [None]:
class VideoClassificationLightningModule(pytorch_lightning.LightningModule):
    def __init__(self):
        super().__init__()
        model = torchvision.models.video.r3d_18(weights=torchvision.models.video.R3D_18_Weights.KINETICS400_V1)
        model = model.to("cuda")

        # Identify the fully connected layer whose weights you want to randomize
        fc_layer = model.fc

        # Randomly initialize the weights of the fc_layer
        nn.init.xavier_uniform_(fc_layer.weight)
        nn.init.zeros_(fc_layer.bias)

        # Freeze all but last fully-connected layer
        for name, param in model.named_parameters():
            if not name.startswith("fc"):
                param.requires_grad = False
        self.model = model

        self.macro_accuracy = torchmetrics.Accuracy(task="multiclass", num_classes=400, average='macro')
        self.micro_accuracy = torchmetrics.Accuracy(task="multiclass", num_classes=400, average='micro')
        self.macro_F1 = torchmetrics.classification.MulticlassF1Score(num_classes=400, average='macro')
        self.micro_F1 = torchmetrics.classification.MulticlassF1Score(num_classes=400, average='micro')

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        # The model expects a video tensor of shape (B, C, T, H, W), which is the
        # format provided by the dataset
        # pred = self.model(batch["frames"])
        pred = self.model(batch["video"].to("cuda"))

        labels = batch["label"].to(torch.long)

        # Compute cross entropy loss, loss.backwards will be called behind the scenes
        # by PyTorchLightning after being returned from this method.
        loss = torch.nn.functional.cross_entropy(pred, labels)

        self.log("train_loss", loss)
        return loss

    def test_step(self, batch, batch_idx):
        # pred = self.model(batch["frames"])
        pred = self.model(batch["video"].to("cuda"))
        labels = batch["label"].to(torch.long)

        loss = torch.nn.functional.cross_entropy(pred, labels)
        pred_labels = torch.argmax(pred, dim=1)
        micro_acc = self.micro_accuracy(pred_labels, labels)
        macro_acc = self.macro_accuracy(pred_labels, labels)
        micro_f1 = self.micro_F1(pred_labels, labels)
        macro_f1 = self.macro_F1(pred_labels, labels)

        self.log("test_loss", loss)
        self.log("test_micro_accuracy", micro_acc, on_epoch=True)
        self.log("test_macro_accuracy", macro_acc, on_epoch=True)
        self.log("test_micro_f1", micro_f1)
        self.log("test_macro_f1", macro_f1)

        return loss

    def validation_step(self, batch, batch_idx):
        pred = self.model(batch["video"].to("cuda"))
        labels = batch["label"].to(torch.long)
        
        loss = torch.nn.functional.cross_entropy(pred, labels)

        self.log("val_loss", loss)
        return loss

    def configure_optimizers(self):
        """
        Setup the Adam optimizer. Note, that this function also can return a lr scheduler, which is
        usually useful for training video models.
        """
        return torch.optim.Adam(self.parameters(), lr=1e-4)

In [None]:
import pytorchvideo.data
import torch
from torchvision.transforms import Compose, CenterCrop
from pytorchvideo.transforms import (
    ApplyTransformToKey,
    ShortSideScale,
    UniformTemporalSubsample,
    Normalize,
)
from combo_dataloader._combo_dataloader import DataLoaderParams


In [None]:
def pytorch_dl(params):
	transforms = [
		ShortSideScale(
				size=params.transform.short_side_scale
		),
		UniformTemporalSubsample(params.sequence_length),
		Normalize(params.transform.mean, params.transform.std),
		CenterCrop(params.transform.crop)
	]

	dataset_transform =  ApplyTransformToKey(
			key="video",
			transform=Compose(transforms),
	)

	reformatted_video_paths = [(path, {"label": label, "video_path": path}) for path, label in zip(params.video_paths, params.labels)]

	dataset = pytorchvideo.data.LabeledVideoDataset(
			labeled_video_paths=reformatted_video_paths,
			clip_sampler=pytorchvideo.data.make_clip_sampler(
					"uniform",
					params.stride * params.sequence_length / params.fps,
					params.step / params.fps,
			),
			video_sampler=torch.utils.data.SequentialSampler,
			transform=dataset_transform,
			decode_audio=False,
			**params.pytorch_dataset_kwargs
	)

	dataloader = torch.utils.data.DataLoader(
			dataset,
			batch_size=params.batch_size,
			**params.pytorch_dataloader_kwargs
	)

	return dataloader

		

In [None]:
params = DataLoaderParams(
    video_paths=train_paths,
    labels=train_labels,
    transform=transform,
    stride=2,
    step=32,
    sequence_length=16,
    fps=32,
    batch_size=8,
    pytorch_dataloader_kwargs={"num_workers": 10},
    dali_pipeline_kwargs={"num_threads": 10},
    pytorch_dataset_kwargs=dict(),
    pytorch_additional_transform=None,
    dali_additional_transform=None,
    dali_reader_kwargs=dict(),
)

In [None]:
test_params = DataLoaderParams(
    video_paths=val_paths,
    labels=val_labels,
    transform=transform,
    stride=2,
    step=32,
    sequence_length=16,
    fps=32,
    batch_size=8,
    pytorch_dataloader_kwargs={"num_workers": 10},
    dali_pipeline_kwargs={"num_threads": 10},
    pytorch_dataset_kwargs=dict(),
    pytorch_additional_transform=None,
    dali_additional_transform=None,
    dali_reader_kwargs=dict(),
)

In [None]:
pytorch_train_dl = pytorch_dl(params)


In [None]:
pytorch_test_dl = pytorch_dl(test_params)

In [23]:
tainer = pytorch_lightning.Trainer(accelerator='gpu', devices=1, max_epochs=5)
model = VideoClassificationLightningModule()
trainer.fit(model=model, train_dataloaders=train_dl)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name           | Type               | Params
------------------------------------------------------
0 | model          | VideoResNet        | 33.4 M
1 | macro_accuracy | MulticlassAccuracy | 0     
2 | micro_accuracy | MulticlassAccuracy | 0     
3 | macro_F1       | MulticlassF1Score  | 0     
4 | micro_F1       | MulticlassF1Score  | 0     
------------------------------------------------------
205 K     Trainable params
33.2 M    Non-trainable params
33.4 M    Total params
133.486   Total estimated model params size (MB)


Epoch 0: : 0it [00:00, ?it/s]

[/opt/dali/dali/operators/reader/loader/video_loader.h:180] ``file_list_include_preceding_frame`` uses the default value False. In future releases, the default value will be changed to True.


Epoch 0: : 136it [02:11,  1.03it/s, loss=2.62, v_num=103]


Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7fea69474320>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/torch/utils/data/dataloader.py", line 1481, in __del__
    self._shutdown_workers()
  File "/opt/conda/lib/python3.7/site-packages/torch/utils/data/dataloader.py", line 1445, in _shutdown_workers
    w.join(timeout=_utils.MP_STATUS_CHECK_INTERVAL)
  File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 140, in join
    res = self._popen.wait(timeout)
  File "/opt/conda/lib/python3.7/multiprocessing/popen_fork.py", line 45, in wait
    if not wait([self.sentinel], timeout):
  File "/opt/conda/lib/python3.7/multiprocessing/connection.py", line 921, in wait
    ready = selector.select(timeout)
  File "/opt/conda/lib/python3.7/selectors.py", line 415, in select
    fd_event_list = self._selector.poll(timeout)
KeyboardInterrupt: 


KeyError: 'video'

In [None]:
trainer.test(model=model, dataloaders=pytorch_test_dl)