# Video Classification using rolling averages

In [1]:
import matplotlib
matplotlib.use("Agg")

from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
from tensorflow import keras
from tensorflow.keras.layers import AveragePooling2D
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from collections import Counter
import cv2
import os
import random
import pickle

# Data preparation

Read in the csv files and shuffle the dataframe. Each dataframe roll consists of the video file name and the tag. The file names are read in to retrieve the video files. Each video file is then read by using OpenCV to extract the frames at n-th interval. Each frame is centre cropped and resized to (224,224). The processed frames are then appended to form the training data frames. A copy of the frame is also saved as an image in the train_image folder

In this version, only 5 classes from the UCF101 data sets are used.

In [2]:
train_df = pd.read_csv("/kaggle/input/ucf101-v2/train.csv")
test_df = pd.read_csv("/kaggle/input/ucf101-v2/test.csv")

train_df = train_df.sample(frac = 1)
test_df = test_df.sample(frac = 1)

print("Total videos for training: {}".format(len(train_df)))
print("Total videos for testing: {}".format(len(test_df)))

#train_df.sample(10)
train_df.head(10)

In [3]:
IMG_SIZE = 224
# BATCH_SIZE = 8
BATCH_SIZE = 32
EPOCHS = 5

def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]


def extract_frames(path, label, skip = 60, resize=(IMG_SIZE, IMG_SIZE)):
    if not os.path.exists("train_images/"):
        os.makedirs("train_images/")
            
    cap = cv2.VideoCapture(path)
    image_frames = []
    image_label = []
    count = 0
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            #frame = frame[:, :, [2, 1, 0]]
            filename = 'train_images/' + os.path.splitext(os.path.basename(path))[0] + '_' + str(count) + '.jpg'
            cv2.imwrite(filename, frame)
            image_frames.append(frame)
            image_label.append(label)
            
            count += skip
            cap.set(cv2.CAP_PROP_POS_FRAMES, count) #read every 5th frame
    finally:
        cap.release()
    
    return image_frames, image_label

In [4]:
def prepare_data(df, root_dir):
    num_samples = len(df)
    all_frames = []
    all_labels = []
    processed_count = 0
    
    for index, row in df.iterrows():
        frames, labels = extract_frames(os.path.join(root_dir, row['video_name']), row['tag'])
        all_frames = all_frames + frames
        all_labels = all_labels + labels
        processed_count += 1
        print("{}/{} video extracted".format(processed_count, num_samples), end = "\r")
        
    return all_frames, all_labels

In [5]:
# train_data, train_labels = prepare_data(train_df.loc[[0,200,300,400]], "/kaggle/input/ucf101-v2/train/")
train_data, train_labels = prepare_data(train_df, "/kaggle/input/ucf101-v2/train/")

In [6]:
train_data = np.array(train_data)
train_labels = np.array(train_labels)
print(train_data.shape)
print(train_labels.shape)

The training data set is then split into the train and test set using train_test_split using 75% of the data for training and the remaining 25% for testing.. The train data is then augmented using the ImageDataGenerator.

In [7]:
lb = LabelBinarizer()
train_labels = lb.fit_transform(train_labels)

(trainX, testX, trainY, testY) = train_test_split(train_data, 
                                                  train_labels, 
                                                  test_size=0.25, 
                                                  stratify=train_labels, 
                                                  random_state=21)

In [8]:
trainAug = ImageDataGenerator(rotation_range=30,
                              zoom_range=0.15,
                              width_shift_range=0.2,
                              height_shift_range=0.2,
                              shear_range=0.15,
                              horizontal_flip=True,
                              fill_mode="nearest")

valAug = ImageDataGenerator()

# define the ImageNet mean subtraction (in RGB order) and set the
# the mean subtraction value for each of the data augmentation
# objects
mean = np.array([123.68, 116.779, 103.939], dtype="float32")
trainAug.mean = mean
valAug.mean = mean

# Model definition

We use ResNet50 as the base model without the final layer. The top layers are replaced with a head model, where the parameters are trainable. 

In [9]:
baseModel = ResNet50(weights="imagenet", 
                     include_top=False,
                     input_tensor=Input(shape=(224, 224, 3)))

headModel = baseModel.output
headModel = AveragePooling2D(pool_size=(7, 7))(headModel)
headModel = Flatten(name="flatten")(headModel)
headModel = Dense(512, activation="relu")(headModel)
headModel = Dropout(0.25)(headModel)
headModel = Dense(len(lb.classes_), activation="softmax")(headModel)

model = Model(inputs=baseModel.input, outputs=headModel)

for layer in baseModel.layers:
    layer.trainable = False

# from keras.utils.vis_utils import plot_model
# plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)

# Model Training

In [10]:
# compile our model (this needs to be done after our setting our
# layers to being non-trainable)
print("[INFO] compiling model...")
opt = SGD(learning_rate=1e-4, momentum=0.9, decay=1e-4 / EPOCHS)

model.compile(loss="categorical_crossentropy", 
              optimizer=opt,
              metrics=["accuracy"])

filepath = "video_classifier_best.h5"
best_checkpoint = keras.callbacks.ModelCheckpoint(filepath, 
                                                  save_weights_only=False, 
                                                  save_best_only=True, 
                                                  verbose=1)

print("[INFO] training head...")
H = model.fit(x = trainAug.flow(trainX, trainY, batch_size = BATCH_SIZE),
              steps_per_epoch = len(trainX) // BATCH_SIZE,
              validation_data = valAug.flow(testX, testY),
              validation_steps = len(testX) // BATCH_SIZE,
              callbacks=[best_checkpoint],
              epochs=EPOCHS)

model.save('video_classifier.h5', save_format="h5")

f = open('labels.pickle', "wb")
f.write(pickle.dumps(lb))
f.close()

In [11]:
# evaluate the network
print("[INFO] evaluating network...")

predictions = model.predict(x = testX.astype("float32"), batch_size = 32)

print(classification_report(testY.argmax(axis=1),
                            predictions.argmax(axis=1), 
                            target_names=lb.classes_))

# plot the training loss and accuracy
#N = args["epochs"]
N = EPOCHS
plt.style.use("ggplot")
plt.figure()
plt.plot(np.arange(0, N), H.history["loss"], label="train_loss")
plt.plot(np.arange(0, N), H.history["val_loss"], label="val_loss")
plt.plot(np.arange(0, N), H.history["accuracy"], label="train_acc")
plt.plot(np.arange(0, N), H.history["val_accuracy"], label="val_acc")
plt.title("Training Loss and Accuracy on Dataset")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend(loc="lower left")
plt.savefig("plot.png")