In [None]:
!pip install transformers
!pip install accelerate

In [None]:
!pip install --upgrade keras
!pip install albumentations

In [None]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip "/content/drive/MyDrive/Bakis/cnnlstm/split.zip" -d "/content/dataset"

In [None]:
import os
import io
import imageio
import pandas as pd
import ipywidgets
import numpy as np
import tensorflow as tf  # for data preprocessing only
import keras
from keras import layers, ops
import cv2
from sklearn.preprocessing import LabelEncoder
import albumentations as A
import random

In [None]:
grab_frames = 32
target_shape = 224

In [None]:
train_dir = '/content/dataset/training'
test_dir = '/content/dataset/test'
val_dir = '/content/dataset/validation'

In [None]:
labels = np.array(['aciu', 'berniukas', 'kamuolys', 'koks', 'labas', 'mama', 'namas', 'tevas', 'valgyti', 'vardas'])

In [None]:
def process_dataset(train_dir, set_size, augment=False):
    X_data = np.zeros((set_size, grab_frames, target_shape, target_shape, 3), dtype=np.uint8)
    y_temp = np.zeros((set_size,), dtype=object)

    index = 0
    random.seed(7)

    transform = A.ReplayCompose([
      A.HorizontalFlip(p=0.5),
      A.MotionBlur(blur_limit=5, p=0.3),
      A.ShiftScaleRotate(shift_limit=0.02, scale_limit=0.02, rotate_limit=10, p=0.5),
      A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.4),
      A.CLAHE(clip_limit=2, p=0.3),
      A.HueSaturationValue(hue_shift_limit=10, sat_shift_limit=20, val_shift_limit=10, p=0.3),
      A.ISONoise(color_shift=(0.01, 0.03), intensity=(0.1, 0.3), p=0.3),
    ])

    data = []

    source_directory = os.path.abspath(train_dir)
    sub_folders = os.listdir(source_directory)

    for folder in sub_folders:
        folder_path = os.path.join(source_directory, folder)
        files = os.listdir(folder_path)

        for file in files:

            video_path = os.path.join(folder_path, file)
            video = cv2.VideoCapture(video_path)
            frames = []
            framesAug = []
            count = 1
            lastFrame = None
            skip = False

            while count <= grab_frames:
                ret, frame = video.read()
                if not ret:
                    break

                if skip == True:
                  skip = not skip
                  continue

                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frame = cv2.resize(frame, (target_shape, target_shape))
                lastFrame = frame

                if augment != True:
                  frames.append(frame)

                count += 1

                if augment:
                  if data == []:
                    data = transform(image=frame)
                    framesAug.append(data['image'])
                  else:
                    augImg = A.ReplayCompose.replay(data['replay'], image=frame)
                    framesAug.append(augImg['image'])

                skip = not skip

            video.release()
            cv2.destroyAllWindows()

            if augment != True:
              X_data[index] = np.array(frames)
              y_temp[index] = folder
              index += 1
            if augment:
              X_data[index] = np.array(framesAug)
              y_temp[index] = folder
              index += 1
              data = []
            print(index)


    return X_data, y_temp

In [None]:
X_train, y_train = process_dataset(train_dir, 2100, True)

In [None]:
X_valid, y_valid = process_dataset(val_dir, 200)

In [None]:
X_test, y_test = process_dataset(test_dir, 200)

In [None]:
le = LabelEncoder()

# Fit the encoder on the labels array
le.fit(labels)  # Learning the mapping from the labels array

# Transform the dataset using the learned encoder
y_train = le.transform(y_train)
y_valid = le.transform(y_valid)
y_test = le.transform(y_test)

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=1)
    accuracy = accuracy_score(labels, predictions)
    return {"accuracy": accuracy}

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import VivitForVideoClassification, VivitImageProcessor, TrainingArguments, Trainer
from PIL import Image
import numpy as np

class CustomVideoDataset(Dataset):
    def __init__(self, videos, labels):
        self.videos = videos
        self.labels = labels

    def __len__(self):
        return len(self.videos)

    def __getitem__(self, idx):
        video = self.videos[idx]
        label = self.labels[idx]
        # Convert numpy arrays to PIL images
        video_frames = [Image.fromarray(frame.astype('uint8')) for frame in video]
        return video_frames, label

train_dataset = CustomVideoDataset(X_train, y_train)
val_dataset = CustomVideoDataset(X_valid, y_valid)

train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8)


model = VivitForVideoClassification.from_pretrained('google/vivit-b-16x2')
processor = VivitImageProcessor.from_pretrained('google/vivit-b-16x2')

def preprocess_function(videos):
    inputs = processor(videos, return_tensors="pt")
    return inputs

def collate_fn(batch):
    videos, labels = zip(*batch)
    inputs = preprocess_function(videos)
    inputs['labels'] = torch.tensor(labels)
    return inputs

training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=2,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=1,
    load_best_model_at_end=True,
    fp16=True,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=collate_fn,
    compute_metrics=compute_metrics
)

torch.cuda.empty_cache()
trainer.train()

In [None]:
test_dataset = CustomVideoDataset(X_test, y_test)
test_dataloader = DataLoader(test_dataset, batch_size=8)

In [None]:
metrics = trainer.evaluate(eval_dataset=test_dataset)
print(metrics)

In [None]:
from google.colab import runtime
runtime.unassign()