In [None]:
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [None]:
%cd /content/drive/MyDrive/Colab\ Notebooks/SC201VideoClassification

/content/drive/MyDrive/Colab Notebooks/SC201VideoClassification


In [None]:
import cv2
import torch
import numpy as np
import os
from concurrent.futures import ProcessPoolExecutor
from typing import *
import time
from torch import nn
from torch.utils.data import DataLoader
from torch.optim import Adam
import torchvision

In [None]:
IMG_SIZE: int = 112
TRAIN_DIRECTORY: str = "train"
VAL_DIRECTORY: str = "test"
MAX_FRAMES: int = 16
BATCH_SIZE: int = 16
PRINT_EVERY_EPOCH: int = 2
NUM_EPOCHS: int = 10

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
def load_video_helper(cap, max_frames, resize):
    frames_count = 0
    try:
        while True:
            success, frame = cap.read()
            if not success:
                break
            frame = cv2.resize(frame, resize)
            frames_count += 1
            yield frame
            if frames_count == max_frames:
                break
    finally:
        cap.release()

In [None]:
def load_video(path, max_frames=MAX_FRAMES, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)

    # initialize variables
    frames = list(load_video_helper(cap, max_frames=max_frames, resize=resize))
    can_use = (len(frames) == max_frames)

    if can_use:
        return can_use, np.array(frames)/255
    shape = (max_frames, *resize, 3)
    return can_use, np.empty(shape)

In [None]:
def prepare_data(directory, shuffle=True, seed=42):
    start = time.time()
    # get training data
    categories = os.listdir(directory)
    categories = np.sort(categories)
    num_true_labels = [len(os.listdir(f"{directory}/{category}")) \
                       for category in categories]
    files = [f"{directory}/{category}/{file}" for category in categories \
                for file in os.listdir(f"{directory}/{category}")]
    files = np.array(files)

    true_labels = []
    for i, num in enumerate(num_true_labels):
        true_labels += num*[i]
    true_labels = np.array(true_labels)

    with ProcessPoolExecutor() as executor:
        results = list(executor.map(load_video,
                                    files,
                                    (MAX_FRAMES,) * len(files),
                                    ((IMG_SIZE, IMG_SIZE),) * len(files)))
    can_use_mask, video_data = list(zip(*results))
    video_data = np.array(video_data)
    can_use_mask = np.array(can_use_mask)
    data = video_data[can_use_mask, :, :, :, :]
    true_labels = true_labels[can_use_mask]

    if shuffle:
        np.random.seed(seed)
        size = len(true_labels)
        random_nums = np.random.rand(size)
        sort_idx = np.argsort(random_nums)
        data = data[sort_idx, :, :, :, :]
        true_labels = true_labels[sort_idx]

    data = torch.tensor(data)
    true_labels = torch.tensor(true_labels)

    result = list(zip(data, true_labels))

    end = time.time()

    print(f"This function took {end-start} seconds to complete.")

    return result

In [None]:
train_data = prepare_data("train")
val_data = prepare_data("test")
NUM_TRAIN: int = len(train_data)
NUM_VAL: int = len(val_data)

This function took 97.82791137695312 seconds to complete.
This function took 35.880200147628784 seconds to complete.


In [None]:
mini_trains = DataLoader(train_data, batch_size=BATCH_SIZE)
mini_vals = DataLoader(val_data, batch_size=BATCH_SIZE)

In [None]:
print('[D_train.shape]', next(iter(mini_trains))[0].shape)

[D_train.shape] torch.Size([16, 16, 112, 112, 3])


In [24]:
class VideoModel(nn.Module):
  def __init__(self):
    super().__init__()
    self.cnn_encoder = nn.Sequential(
      # N x 3 x 112 x 112
      nn.Conv2d(3, 64, 3, 1, 1),
      nn.BatchNorm2d(64),
      nn.ReLU(),
      nn.MaxPool2d(2,2),
      # N x 64 x 56 x 56
      nn.Conv2d(64, 128, 3, 1, 1),
      nn.BatchNorm2d(128),
      nn.ReLU(),
      nn.MaxPool2d(2,2),
      # N x 128 x 28 x 28
      nn.Conv2d(128, 256, 3, 1, 1),
      nn.BatchNorm2d(256),
      nn.ReLU(),
      nn.MaxPool2d(2,2),
      # N x 256 x 14 x 14
      nn.Conv2d(256, 256, 3, 1, 1),
      nn.BatchNorm2d(256),
      nn.ReLU(),
      nn.MaxPool2d(2,2),
      # N x 256 x 7 x 7
      nn.Conv2d(256, 256, 3, 1, 1),
      nn.BatchNorm2d(256),
      nn.ReLU(),
      nn.MaxPool2d(2,2),
      # N x 256 x 3 x 3

      nn.Flatten()
      # N x 2304
    )
    self.lstm = nn.LSTM(2304, 256, batch_first=True)
    self.fc = nn.Linear(256, 10)

  def forward(self,x):
    # 16, 16, 112, 112, 3
    N, F, H, W, C = x.shape
    lstm_input = torch.zeros((N, F, 2304)).to(device)
    for i in range(F):
      frame = x[:, i, :, :, :]
      # N, H, W, C -> N x C x H x W
      frame = frame.permute(0, 3, 1, 2)
      out = self.cnn_encoder(frame) # N x 2304
      lstm_input[:, i, :] = out


    # 接著可以將 lstm_input 傳入 LSTM 層
    output, (hn, cn) = self.lstm(lstm_input)
    out = output[:,-1,:]
    out = self.fc(out)
    return out


In [25]:
# evaluation:
def val(model, mini_vals, device) -> float:
    model.eval()
    with torch.no_grad():
        total = sum((model(x.to(device).float()).max(axis=1)[1] \
                     == y.to(device)).sum().item() \
                    for x, y in mini_vals)
    return total / NUM_VAL

In [26]:
# training:
def train(model, optimizer, mini_trains, mini_vals, device) -> None:
    loss_fn: Callable = nn.CrossEntropyLoss()
    print_every: int = len(mini_trains) // PRINT_EVERY_EPOCH
    for epoch in range(1, NUM_EPOCHS+1):
        for i, (x, y) in enumerate(mini_trains):
            # Turn on training mode:
            model.train()

            # Move data to corresponding device:
            x = x.to(device).float()
            y = y.to(device)

            # Forward propagation:
            scores = model(x)

            # Calculate loss:
            loss = loss_fn(scores, y)

            # Clear previous gradient:
            optimizer.zero_grad()

            # Backward propagation:
            loss.backward()

            # Update parameters:
            optimizer.step()

            if not (i % print_every):
                acc = val(model, mini_vals, device)
                print(f"EPOCH {epoch}".center(40, "#"))
                print(f"Validation accuracy: {acc:.4f}")
                print(40*"#", end="\n\n")

In [27]:
model = VideoModel().to(device)
optimizer = Adam(model.parameters())

In [28]:
train(model, optimizer, mini_trains, mini_vals, device)

################EPOCH 1#################
Validation accuracy: 0.1068
########################################

################EPOCH 1#################
Validation accuracy: 0.2113
########################################

################EPOCH 2#################
Validation accuracy: 0.3203
########################################

################EPOCH 2#################
Validation accuracy: 0.3725
########################################

################EPOCH 3#################
Validation accuracy: 0.4118
########################################

################EPOCH 3#################
Validation accuracy: 0.3791
########################################

################EPOCH 4#################
Validation accuracy: 0.4292
########################################

################EPOCH 4#################
Validation accuracy: 0.4423
########################################

################EPOCH 5#################
Validation accuracy: 0.4749
########################################

#