In [47]:
from pathlib import Path
import random

import av
import cv2
import numpy as np

import albumentations as A
from vidaug import augmentors as va

from IPython.display import Video

%matplotlib inline
from matplotlib import pyplot as plt
from matplotlib import animation
from IPython.display import HTML

In [48]:
def read_video_pyav(container, indices):
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])


def sample_frame_indices(clip_len, frame_sample_rate, seg_len):
    converted_len = int(clip_len * frame_sample_rate)
    end_idx = np.random.randint(converted_len, seg_len)
    start_idx = end_idx - converted_len
    indices = np.linspace(start_idx, end_idx, num=clip_len)
    indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64)
    return indices

In [49]:
def write_video(name, video, frame_sample_rate):
    video_writer = cv2.VideoWriter(
        name, 
        cv2.VideoWriter_fourcc(*'mp4v'), 
        int(25 / frame_sample_rate), 
        (video.shape[2], video.shape[1])
    )
    for frame in video:
        video_writer.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    video_writer.release()


def animate_video(video: np.ndarray):
    fig = plt.figure(figsize=(3,3))
    im = plt.imshow(video[0,:,:,:])

    plt.close() # this is required to not display the generated image

    def init():
        im.set_data(video[0,:,:,:])

    def animate(i):
        im.set_data(video[i,:,:,:])
        return im

    anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                                interval=100)
    return anim

In [50]:
def apply_video_augmentations(video, transform):
    targets={'image': video[0]}
    for i in range(1, video.shape[0]):
        targets[f'image{i}'] = video[i]
    transformed = transform(**targets)
    transformed = np.concatenate(
        [np.expand_dims(transformed['image'], axis=0)] 
        + [np.expand_dims(transformed[f'image{i}'], axis=0) for i in range(1, video.shape[0])]
    )
    return transformed

In [54]:
data_dir = Path("./data/sibur_data/train_in_out")
videos_paths = list(data_dir.rglob("*.mp4"))
file_path = random.choice(videos_paths)
container = av.open(file_path.as_posix())

print(file_path.parent.name)
Video(file_path)

train_in_out


In [55]:
frame_sample_rate = 50
indices = sample_frame_indices(
    clip_len=int(container.streams.video[0].frames / frame_sample_rate), 
    frame_sample_rate=frame_sample_rate, 
    seg_len=container.streams.video[0].frames
)
video = read_video_pyav(container, indices)
a = animate_video(video)
display(HTML(a.to_html5_video()))

In [90]:
train_transform = A.Compose([
    A.RandomResizedCrop(224, 224, scale=(0.3, 1.0), p=1),
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(
        shift_limit=0.0625, scale_limit=0.1, rotate_limit=30, p=0.5
    ),
    A.RGBShift(r_shift_limit=15, g_shift_limit=15, b_shift_limit=15, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.Blur(blur_limit=8, p=0.5),  # Blurs the image
    A.HueSaturationValue(p=0.5),
    A.Cutout(num_holes=12, p=0.15),
], additional_targets={
    f'image{i}': 'image'
    for i in range(1, len(video))
})

transform = A.Compose([
    A.Resize(256, 256),
    A.CenterCrop(224, 224),
], additional_targets={
    f'image{i}': 'image'
    for i in range(1, len(video))
})

In [91]:
train_video = apply_video_augmentations(video, train_transform)
test_video = apply_video_augmentations(video, transform)

sometimes = lambda aug: va.Sometimes(0.5, aug)
video_aug = va.Sequential([
    va.OneOf([
        va.Upsample(1.25),
        va.Downsample(0.75)
    ]),
    va.Pepper(50),
    va.RandomShear(0.1, 0.1)
])

v = animate_video(np.array(test_video))
display(HTML(v.to_html5_video()))

v = animate_video(np.array(video_aug(train_video)))
display(HTML(v.to_html5_video()))

v = animate_video(np.array(video_aug(test_video)))
display(HTML(v.to_html5_video()))