# Open Max Recognition for MoViNet (18 classes)

In [1]:
import time
import torchvision
import torch.nn.functional as F
import torchvision.transforms as transforms
import torch.optim as optim
from torch.utils.data import random_split, DataLoader
import torch
# import transforms as T
from movinets import MoViNet
from movinets.config import _C

from torchvision.datasets.video_utils import VideoClips
from torchvision.datasets.vision import VisionDataset
from torchvision.datasets.folder import find_classes, make_dataset

import os
import glob
import json

import numpy as np
import cv2
import mediapy as media
import pandas as pd
import matplotlib.pyplot as plt

from tqdm import tqdm

## Video Processing

In [2]:
def load_video(file_path, image_size=(224, 224), original_fps=30, new_fps=5, start_time=None, end_time=None, gray=False):
    """Loads a video file into a TF tensor."""
    cap = cv2.VideoCapture(file_path)
    
    frameCount = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frameWidth = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frameHeight = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fc = 0
    ret = True

    if start_time and end_time:
        start_frame = original_fps * start_time
        end_frame = original_fps * end_time
    else:
        start_frame = 0
        end_frame = frameCount
    
    fps_factor = original_fps / new_fps
    now_frame = 0
    if gray:
        buf = np.zeros((int((end_frame - start_frame) / fps_factor), image_size[1], image_size[0]), np.dtype('uint8'))
    else:
        buf = np.zeros((int((end_frame - start_frame) / fps_factor), image_size[1], image_size[0], 3), np.dtype('uint8'))
    
    while (fc < frameCount  and ret):
        ret, tmp = cap.read()
        now_frame += 1
        if start_frame > now_frame:
            continue
        if end_frame < now_frame:
            break
        if now_frame % fps_factor == 0:
            tmp = cv2.resize(tmp, dsize=image_size)
            if gray:
                buf[fc] = cv2.cvtColor(tmp, cv2.COLOR_BGR2GRAY)
            else:
                buf[fc] = cv2.cvtColor(tmp, cv2.COLOR_BGR2RGB)
            fc += 1
    cap.release()
    
    return buf

In [3]:
def preprocess_video(path, image_size=(224,224), frame_num=50, dtype=np.float32):
    # thwc
    video = load_video(path, image_size=image_size)
    
    # set t=frame_num
    t,h,w,c = video.shape
    if t < frame_num:
        fill_n = frame_num - t
        video = np.concatenate([video, torch.zeros((fill_n, h, w, c), dtype=torch.uint8)], axis=0)
    elif t > frame_num:
        video = video[:frame_num]
    
    # cthw
    video = np.transpose(video, (3,0,1,2))
    
    # ncthw
    video = np.expand_dims(video, axis=0)
    
    video = video.astype(dtype)
    if dtype == np.float32:
        video /= 255
    
    return video

In [4]:
## load class names

with open("18classes.json", 'r') as f:
    classes = json.load(f)
classes_dict = {}
for key, value in classes.items():
    classes_dict[value] = key
    
print(classes_dict)

{0: '_기타정상데이터', 1: '계산기사용(실제계산기or휴대폰계산기)', 2: '그림그리기', 3: '글씨쓰기', 4: '눈썹들어올리기', 5: '눈알굴리기', 6: '마시기', 7: '머리마사지하기', 8: '머리빗기', 9: '먹기', 10: '목마사지하기', 11: '사진찍기', 12: '안경고쳐쓰기', 13: '책&보고서읽기', 14: '통화하기(이어폰or핸드폰을귀에대고)', 15: '팔스트레칭하기', 16: '하품하기', 17: '휴대폰문자하기'}


## Model Loading and Visualization

In [6]:
model = MoViNet(_C.MODEL.MoViNetA2, causal = False, pretrained = False)
model.classifier[3] = torch.nn.Conv3d(2048, 18, (1,1,1)) # 18 classes
model.load_state_dict(torch.load('fine_iter2_a2.pth', map_location=torch.device('cpu')))
model.eval()
model.cuda()

MoViNet(
  (conv1): ConvBlock3D(
    (conv_1): Conv3DBNActivation(
      (conv3d): Conv3d(3, 16, kernel_size=(1, 3, 3), stride=(1, 2, 2), padding=(0, 1, 1), bias=False)
      (norm): BatchNorm3d(16, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (act): Swish()
    )
  )
  (blocks): Sequential(
    (b0_l0): BasicBneck(
      (expand): ConvBlock3D(
        (conv_1): Conv3DBNActivation(
          (conv3d): Conv3d(16, 40, kernel_size=(1, 1, 1), stride=(1, 1, 1), bias=False)
          (norm): BatchNorm3d(40, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
          (act): Swish()
        )
      )
      (deep): ConvBlock3D(
        (conv_1): Conv3DBNActivation(
          (conv3d): Conv3d(40, 40, kernel_size=(1, 5, 5), stride=(1, 2, 2), padding=(0, 2, 2), groups=40, bias=False)
          (norm): BatchNorm3d(40, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
          (act): Swish()
        )
      )
      (se): SqueezeExcitation(
      

In [7]:
# Visualize

video_names = glob.glob("/home/workspaces/mark/Video_Data/kinetics-datasets-downloader/downloader/dataset/val/ados2/그림그리기/그림그리기_김민태.mp4")

# load video
test_path = video_names[0]
video = load_video(test_path, image_size=(224,224))

# Visualize video
media.show_video(video, fps=5)

0
This browser does not support the video tag.


## Inference

In [8]:
def computeLogit(input_data):
    with torch.no_grad():
        pred = model(input_data)
        output = F.softmax(pred, dim=1)
        return output 

In [None]:
# Inference

# Take 20 videos from each class

class_names = glob.glob("/home/workspaces/mark/Video_Data/kinetics-datasets-downloader/downloader/dataset/val/ados3/**")

num_videos = 20
video_names = [glob.glob(class_path + '/**')[:num_videos] for class_path in class_names]
video_dict = {videos[0].split('/')[-2]: videos for videos in video_names}
path = video_dict['drawing'][0]

input_data = preprocess_video(path)
input_data = torch.from_numpy(input_data).cuda()
output = computeLogit(input_data)
output

## Meta Recognition

- Meta Recognition
    - mean: Mean of logits by classes (54, 1, 54)
    - output_matrix: Outputs from the model (54, 20, 54)
    - distance: Distances between outputs and mean (54, 20, 54)

In [None]:
# Sample 20 videos from each class for open set learning
# Create output_matrix and mean

mean = torch.zeros(54, 1, 54)
output_matrix = torch.zeros(54, num_videos, 54)

c = 0
for class_name in sorted(list(video_dict.keys())): # 54
    s = torch.zeros(1, 54).cuda()
    v = 0
    for path in tqdm(video_dict[class_name], desc=class_name): # 300
        time.sleep(0.01)
        input_data = preprocess_video(path)
        input_data = torch.from_numpy(input_data).cuda()
        output = computeLogit(input_data)
        output_matrix[c][v] = output
        s = s.add(output)
        v += 1
    mean[c] = torch.mul(s, 1/num_videos)
    c += 1

print(torch.argmax(mean, dim=0)) # mean of 20 logits seems working fine

distance = torch.zeros(54, 1, num_videos)
for i in range(54):
    distance[i], indices = torch.sort(torch.norm(output_matrix[i] - mean[i], dim=1), descending=True)

- Weibull Distribution
    - sample_distance: Top k distances out of distance (54, 1, k)
    - Top k dist = Distances of the most irrelevant k samples

In [None]:
k = 10
sampled_distance = distance[:, :, 0:k]
sampled_distance[0]

In [None]:
def fit_weibull(x, iters=100, eps=1e-6, use_cuda=True):
    k = 1.0
    k_t_1 = k
    ln_x = torch.log(x)

    for i in range(iters):
        # Partial derivative df/dk
        x_k = x ** k
        x_k_ln_x = x_k * ln_x
        ff = torch.sum(x_k_ln_x)
        fg = torch.sum(x_k)
        f1 = torch.mean(ln_x)
        f = ff/fg - f1 - (1.0 / k)

        ff_prime = torch.sum(x_k_ln_x * ln_x)
        fg_prime = ff
        f_prime = (ff_prime / fg - (ff / fg * fg_prime / fg)) + (1. / (k * k))

        # Newton-Raphson method k = k - f(k;x)/f'(k;x)
        k -= f / f_prime
        # print('f=% 7.5f, dk=% 7.5f, k=% 7.5f' % (f.data[0], k.grad.data[0], k.data[0]))
        if np.isnan(f):
            return np.nan, np.nan
        if abs(k - k_t_1) < eps:
            break

        k_t_1 = k

    # Lambda (scale) can be calculated directly
    lam = torch.mean(x ** k) ** (1.0 / k)

    return torch.Tensor([[k, lam]])  # Shape (SC), Scale (FE)

In [None]:
weibull_param = torch.zeros(54, 2)
fit_weibull(sampled_distance[0][0])

for i in range(54):
    sample = sampled_distance[i][0]
    weibull_param[i] = torch.Tensor(fit_weibull(sample, iters=100))

In [None]:
def compute_weibull_likelihood(param, dist):
    k, lam = param
    weibull_likelihood = k / lam * (dist/lam)**(k-1) * torch.exp(-(dist/lam)**k)
    return weibull_likelihood

def compute_weibull_probability(param, dist):
    k, lam = param
    weibull_probability = torch.ones(1) - torch.exp(-(dist/lam)**k)
    return weibull_probability.cuda()

In [None]:
def openmax(path, model):
    input_data = preprocess_video(path)
    input_data = torch.from_numpy(input_data).cuda()

    with torch.no_grad():
        pred = model(input_data)
        output = F.softmax(pred, dim=1)
        _, result = torch.max(output, dim=1)
        print(torch.argmax(output), classes_dict[torch.argmax(output).cpu().item()])
        print(output[0])
    
    dist = torch.zeros(54)
    for i in range(54):
        dist[i] = torch.norm(output.cpu()[0] - mean[i][0])
    
    weibull_probability = torch.Tensor(54)
    for i in range(54):
        weibull_probability[i] = compute_weibull_probability(weibull_param[i], dist[i])
    
    weibull_probability = weibull_probability.cuda()
    new_output = torch.cat([torch.sum(output * weibull_probability).view(1), output[0] * (1 - weibull_probability)])
    
    value, indices = torch.topk(new_test_output, k=5, dim=0)
    result = ['unknown' if index == 0 else classes_dict[int(index) - 1] for index in indices]
    
    plt.figure(figsize=(15, 4))
    plt.bar(sorted(["Unknown"] + list(video_dict.keys())), torch.softmax(new_output.cpu(), dim=0))
    plt.xticks(rotation=90)
    plt.show()

## Test - Ados Drawing

In [None]:
test_path = '/home/workspaces/mark/Video_Data/kinetics-datasets-downloader/downloader/dataset/val/ados2/그림그리기/그림그리기_문형민.mp4'
test_video = load_video(test_path, image_size=(224,224))

media.show_video(test_video, fps=5)

In [None]:
openmax(test_path, model)

## Test - Never Seen

In [None]:
# Data never seen

never_seen_path = "./Shooting_MJ.mp4"
never_seen_video = load_video(never_seen_path, image_size=(224,224))

# Visualize video
media.show_video(never_seen_video, fps=5)

In [None]:
openmax(never_seen_path, model)

## Test - Kinetics - Wrong Prediction

In [None]:
# Data never seen

kinetics_paths = glob.glob("/home/workspaces/mark/Video_Data/kinetics-datasets-downloader/downloader/dataset/train/others/building_cabinet/*.mp4")

kinetics_path = kinetics_paths[0]
kinetics_video = load_video(kinetics_path, image_size=(224,224))
media.show_video(kinetics_video, fps=5)

In [None]:
openmax(kinetics_path, model)

## Test - Kinetics - Correct Prediction

In [None]:
# Data never seen

kinetics_paths = glob.glob("/home/workspaces/mark/Video_Data/kinetics-datasets-downloader/downloader/dataset/val/normal/taking_photo/*.mp4")
kinetics_path = kinetics_paths[4]
kinetics_video = load_video(kinetics_path, image_size=(224,224))

# Visualize video
media.show_video(kinetics_video, fps=5)

In [None]:
openmax(kinetics_path, model)