In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import os
import csv
import cv2
from collections import deque
import sys
import gc
import pickle, gzip

from sklearn.metrics import confusion_matrix, classification_report, roc_curve, \
auc, accuracy_score, precision_score, recall_score, f1_score

from packaging import version
from matplotlib import pyplot as plt

print("TensorFlow version: ", tf.__version__)
assert version.parse(tf.__version__).release[0] >= 2, \
    "This notebook requires TensorFlow 2.0 or above."

""" Set Hyper parameters """
MAX_SEQ_LENGTH = 100
NUM_FEATURES = 1024
IMG_H = 128
IMG_W = 192
NUM_EPOCHS = 20
IMG_CHANNELS = 3  ## Change this to 1 for grayscale.
BATCH_SIZE = 32

# set dir of files
TRAIN_DATASET_VERSION = "test_train03"
TEST_DATASET_VERSION = "test_test03"
TRAIN_DATASET_PATH = TRAIN_DATASET_VERSION+".csv"
TEST_DATASET_PATH = TEST_DATASET_VERSION+".csv"
ROOT_DATASET_PATH = "dataset/UCF-101/"
SAVED_MODEL_PATH = "saved_model/"

AUTOTUNE = tf.data.AUTOTUNE
AUGMENTATION = False
TRAIN_MODE = True
GENERATE_DATASET = True
RETRAIN_MODEL = False

In [None]:
train_df = pd.read_csv(TRAIN_DATASET_PATH)
test_df = pd.read_csv(TEST_DATASET_PATH)

print(f"Total videos for training: {len(train_df)}")
print(f"Total videos for testing: {len(test_df)}")

In [None]:
def read_frames(root_folder,arr,each_nth=10):
    videos=[]
    for j  in range(len(arr)):
        print(np.round(100*j/len(arr),3))
            
        vcap=cv2.VideoCapture(root_folder+arr[j])
        success=True
  
        frames=[]
        cnt=0
        while success:
            try:
                success,image=vcap.read()
                cnt+=1
                if cnt%each_nth==0:
                    image=resize(image,(IMG_H,IMG_W))
                    frames.append(image)
            except Exception as e:
                print(e)
        videos.append(frames)
    
    return videos

In [None]:
def select_frames(frames_arr , n=10):
    videos=[]
    for i in range(len(frames_arr)):
        frames=[]
        for t in np.linspace(0, len(frames_arr[i])-1, num=n):
            frames.append(frames_arr[i][int(t)])
        videos.append(frames)
        
    videos = np.array(videos)
    print(videos.shape)
    return videos

In [None]:
def confusion_matrix_report(labels, predicts, target_names):
    confusion = confusion_matrix(labels, predicts)
    print('Confusion Matrix\n')
    print(confusion)
    
    print('\nAccuracy: {:.2f}\n'.format(accuracy_score(labels, predicts)))

    print('Micro Precision: {:.2f}'.format(precision_score(labels, predicts, average='micro')))
    print('Micro Recall: {:.2f}'.format(recall_score(labels, predicts, average='micro')))
    print('Micro F1-score: {:.2f}\n'.format(f1_score(labels, predicts, average='micro')))

    print('Macro Precision: {:.2f}'.format(precision_score(labels, predicts, average='macro')))
    print('Macro Recall: {:.2f}'.format(recall_score(labels, predicts, average='macro')))
    print('Macro F1-score: {:.2f}\n'.format(f1_score(labels, predicts, average='macro')))

    print('Weighted Precision: {:.2f}'.format(precision_score(labels, predicts, average='weighted')))
    print('Weighted Recall: {:.2f}'.format(recall_score(labels, predicts, average='weighted')))
    print('Weighted F1-score: {:.2f}'.format(f1_score(labels, predicts, average='weighted')))

    print('\nClassification Report\n')
    print(classification_report(labels, predicts, target_names=target_names))

In [None]:
def plot_epoch_result(epochs, loss, name, model_name, colour):
    plt.plot(epochs, loss, colour, label=name)
#     plt.plot(epochs, disc_loss, 'b', label='Discriminator loss')
    plt.title(name)
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig(model_name+ '_'+name+'_epoch_result.png')
    plt.show()
    
class CustomSaver(tf.keras.callbacks.Callback):
    def __init__(self,
                 model_path,
                 n_model
                ):
        super(CustomSaver, self).__init__()
        self.history = {}
        self.epoch = []
        self.model_path = model_path
    
        self.name_model = n_model
        self.custom_loss = []
        self.epochs_list = []
            
    def on_train_end(self, logs=None):
        print(self.model_path)
        self.model.save(self.model_path)
        
        plot_epoch_result(self.epochs_list, self.custom_loss, "Loss", self.name_model, "g")

    def on_epoch_end(self, epoch, logs=None):
        if logs is None:
            logs = {}
        self.epoch.append(epoch)
        for k, v in logs.items():
#             print(k, v)
            self.history.setdefault(k, []).append(v)
        
        self.epochs_list.append(epoch)
        self.custom_loss.append(logs["loss"])

        if (epoch + 1) % 15 == 0:
            self.model.save_weights(self.model_path)
            print('saved for epoch',epoch + 1)

In [None]:
def testing_stage(model, df, root_dir):
    print("testing start")
    num_samples = len(df)
    df['video_paths'] = root_dir + df['tag'] + "/" + df['video_name']
    video_paths = df["video_paths"].values.tolist()
    labels = df["tag"].values
    # labels = label_processor(labels[..., None]).numpy().flatten()
    class_vocab = label_processor.get_vocabulary()
    predictions = []
    name_list = []
    # print(labels)
    for idx, path in enumerate(video_paths):
        print(path)
        frames = load_video(path)
        frame_features, frame_mask = prepare_single_video(frames)
        probabilities = sequence_model.predict([frame_features, frame_mask])[0]
        probs = np.argsort(probabilities)[::-1]
        name_image = os.path.basename(path)
        print(name_image)
        predictions.append(class_vocab[probs[0]])
        name_list.append(name_image)
        # print(class_vocab[probs[0]], probs[0])
        for i in probs:
            print(f"{class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
    
    
    confusion_matrix_report(labels, predictions, class_vocab)
    
    
    print("created csv for the result.")
    with open('predictions_result.csv', 'w') as f:
        writer = csv.writer(f)
        writer.writerow(['ImageName', 'Label'])
        writer.writerows(zip(name_list, predictions))

In [None]:
# Utility for running experiments.
def run_experiment():
    name_model = str(IMG_SIZE)+"_UCF101_"+str(NUM_EPOCHS)
    class_vocab = label_processor.get_vocabulary()
    
    seq_model = build_our_model(len(class_vocab))
    
    # checkpoint = tf.keras.callbacks.ModelCheckpoint(
    #     SAVED_MODEL_PATH, save_weights_only=True, save_best_only=True, verbose=1
    # )
    
    path_model = SAVED_MODEL_PATH + name_model + "_model" + ".h5"
    print(path_model)
    saver_callback = CustomSaver(
            path_model,
            name_model
        )
    
    if RETRAIN_MODEL:
        print("Model Load Weights.")
        # seq_model.load_weights(path_model)
        seq_model = tf.keras.models.load_model(path_model)
    
    if TRAIN_MODE: 
        history = seq_model.fit(
            [train_data[0], train_data[1]],
            train_labels,
            # validation_split=0.2,
            epochs=NUM_EPOCHS,
            callbacks=[
                # checkpoint, 
                saver_callback],
        )
    # seq_model.load_weights(SAVED_MODEL_PATH)
    
    seq_model = tf.keras.models.load_model(path_model)
    _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return seq_model

In [None]:
if __name__ == "__main__":
    print("run experiments")
    sequence_model = run_experiment()
    testing_stage(sequence_model, test_df, ROOT_DATASET_PATH)