In [1]:
import os
import cv2
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

tensor([1.], device='mps:0')


In [3]:
NUMCLASSES = 5
TRAINPATH = 'UCF101Dataset/train'
TESTPATH = 'UCF101Dataset/test'
TRAINLABELSPATH = 'UCF101Dataset/train.csv'
TESTLABELSPATH = 'UCF101Dataset/test.csv'
train_df = pd.read_csv(TRAINLABELSPATH)
test_df = pd.read_csv(TESTLABELSPATH)

"""
    Loads and frames from the provided file path
    
    returns: numpy array of generated frames
"""

def load_frames(path, numFrames=16): 

    cap = cv2.VideoCapture(path) # opening video
    frames = []

    totalFrames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frameInterval = max(totalFrames // numFrames, 1)
    for i in range(numFrames):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i*frameInterval) # set frame position
        ret, frame = cap.read() # read frame at position

        if not ret: # exit loop if at end of video
            break

        frame = cv2.resize(frame, (112,112))
        frames.append(frame)

    while len(frames) < numFrames:
        frames.append(np.zeros((112,112,3), np.uint8)) # fill in blank frames with zeroes 

    return np.array(frames)

In [4]:
class VideoDataset(Dataset):
    def __init__(self, paths,labels):
        self.paths = paths
        self.labels = labels
    
    def __len__(self):
        return len(self.paths)

    def __getitem__(self,x):
        path = self.paths[x]
        label = self.labels[x]
        
        frames = torch.tensor(load_frames(path))
        frames = frames.float()

        return frames, label
    
class VideoClassifier(nn.Module):
    def __init__(self,numClasses):
        super(VideoClassifier, self).__init__()
        self.conv3D1 = nn.Conv3d(3,64, kernel_size=(3,3,3), padding=(1,1,1))
        self.conv3D2 = nn.Conv3d(64,128, kernel_size=(3,3,3), padding=(1,1,1))
        self.conv3D3 = nn.Conv3d(128,256, kernel_size=(3,3,3), padding=(1,1,1))

        self.FC1 = nn.Linear(256*4*4*4, 128)
        self.FC2 = nn.Linear(128, numClasses)
    
    def forward(self, x):
        x = F.relu(F.max_pool3d(self.conv3D1(x), kernel_size=(2,2,2)))
        x = F.relu(F.max_pool3d(self.conv3D2(x), kernel_size=(2,2,2)))
        x = F.relu(F.max_pool3d(self.conv3D3(x), kernel_size=(2,2,2)))
        
        x = x.reshape(-1, 256*4*4*4)

        x = F.relu(self.FC1(x))
        x = self.FC2(x)
        return x

In [5]:
train_video_paths = 'week7/UCF101Dataset/train'
train_labels = train_df[['tag', 'video_name']]

test_video_paths = 'week7/UCF101Dataset/test'
test_labels = test_df[['tag', 'video_name']]

traindataset = VideoDataset(train_video_paths, train_labels)
traindataloader = DataLoader(traindataset, batch_size=32, shuffle=True)

testdataset = VideoDataset(test_video_paths, test_labels)
testdataloader = DataLoader(testdataset, batch_size=32, shuffle=False)

model = VideoClassifier(NUMCLASSES)
optimizer = optim.Adam(model.parameters(), lr=0.001)
lossFunction = nn.CrossEntropyLoss()


def load_data(labels, video_dir, num_classes, label_mapping, num_frames=16):
    X = []
    y = []
    
    for idx, label in labels.iterrows():
        video_path = os.path.join(video_dir, label['video_name'])
        
        frames = load_frames(video_path, num_frames)
        
        if len(frames) == num_frames:
            X.append(frames)
            y.append(label_mapping[label['tag']])
    
    X = np.array(X)
    y = np.array(y)
    
    # Convert the labels to one-hot encoded format
    y = np.eye(num_classes, dtype='uint8')[y]
    
    return X, y

unique_labels = train_labels['tag'].unique()
label_mapping = {label: idx for idx, label in enumerate(unique_labels)}

X_train, y_train = load_data(train_labels, TRAINPATH, NUMCLASSES, label_mapping)
X_test, y_test = load_data(test_labels, TESTPATH, NUMCLASSES, label_mapping)


In [11]:
# MODEL TRAINING
for epoch in range(10):
    for batch in traindataloader:
        frames, labels = batch
        inputs = frames.permute(0,4,1,2,3) # convert dimensions
        outputs = model(inputs)
        
        loss = lossFunction(outputs, torch.tensor(labels))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f'Epoch {epoch+1}, Loss: {loss.item()}')

KeyError: 15

In [None]:
# MODEL TESTING
model.eval()
testLoss,correct = 0
with torch.no_grad():
    for batch in testdataloader:
        frames, labels = batch
        inputs = frames.permute(0,4,1,2,3)
        outputs = model(inputs)
        loss = lossFunction(outputs, labels)
        testLoss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()


In [None]:
accuracy = correct / len(testdataset)
print(f'Test Loss: {testLoss/len(testdataloader)}')
print(f'Accuracy: {accuracy:.2f}')