In [None]:
!pip install tf-models-official --quiet
!pip install remotezip tqdm opencv-python einops --quiet
# !pip install tensorflow

In [None]:
import tqdm
import random
import pathlib
import itertools
import collections

import cv2
import numpy as np
import remotezip as rz
# import rarfile
import seaborn as sns
import matplotlib.pyplot as plt
import json
import keras
import tensorflow as tf
import tensorflow_hub as hub
from keras import layers
from keras.optimizers import Adam
from keras.losses import SparseCategoricalCrossentropy

# Import the MoViNet model from TensorFlow Models (tf-models-official) for the MoViNet model
from official.projects.movinet.modeling import movinet
from official.projects.movinet.modeling import movinet_model

In [None]:
def list_files_per_class(zip_url):
  files = []
  with rz.RemoteZip(URL) as zip:
    for zip_info in zip.infolist():
      files.append(zip_info.filename)
  return files

def get_class(fname):
  return fname.split('_')[-3]

def get_files_per_class(files):
  files_for_class = collections.defaultdict(list)
  for fname in files:
    class_name = get_class(fname)
    files_for_class[class_name].append(fname)
  return files_for_class

def download_from_zip(zip_url, to_dir, file_names):
  with rz.RemoteZip(zip_url) as zip:
    for fn in tqdm.tqdm(file_names):
      class_name = get_class(fn)
      zip.extract(fn, str(to_dir / class_name))
      unzipped_file = to_dir / class_name / fn

      fn = pathlib.Path(fn).parts[-1]
      output_file = to_dir / class_name / fn
      unzipped_file.rename(output_file,)

def split_class_lists(files_for_class, count):
  split_files = []
  remainder = {}
  for cls in files_for_class:
    split_files.extend(files_for_class[cls][:count])
    remainder[cls] = files_for_class[cls][count:]
  return split_files, remainder

def download_ufc_101_subset(zip_url, num_classes, splits, download_dir):
  files = list_files_per_class(zip_url)
  for f in files:
    tokens = f.split('/')
    if len(tokens) <= 2:
      files.remove(f) # Remove that item from the list if it does not have a filename

  files_for_class = get_files_per_class(files)

  classes = list(files_for_class.keys())[:num_classes]

  for cls in classes:
    new_files_for_class = files_for_class[cls]
    random.shuffle(new_files_for_class)
    files_for_class[cls] = new_files_for_class

  # Only use the number of classes you want in the dictionary
  files_for_class = {x: files_for_class[x] for x in list(files_for_class)[:num_classes]}

  dirs = {}
  for split_name, split_count in splits.items():
    print(split_name, ":")
    split_dir = download_dir / split_name
    split_files, files_for_class = split_class_lists(files_for_class, split_count)
    download_from_zip(zip_url, split_dir, split_files)
    dirs[split_name] = split_dir

  return dirs

def format_frames(frame, output_size):
  frame = tf.image.convert_image_dtype(frame, tf.float32)
  frame = tf.image.resize_with_pad(frame, *output_size)
  return frame

def frames_from_video_file(video_path, n_frames, output_size = (224,224), frame_step = 15):
  # Read each video frame by frame
  result = []
  src = cv2.VideoCapture(str(video_path))

  video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

  need_length = 1 + (n_frames - 1) * frame_step

  if need_length > video_length:
    start = 0
  else:
    max_start = video_length - need_length
    start = random.randint(0, max_start + 1)
    src.set(cv2.CAP_PROP_POS_FRAMES, start)
  # ret is a boolean indicating whether read was successful, frame is the image itself
  ret, frame = src.read()
  result.append(format_frames(frame, output_size))

  for _ in range(n_frames - 1):
    for _ in range(frame_step):
      ret, frame = src.read()
    if ret:
      frame = format_frames(frame, output_size)
      result.append(frame)
    else:
      result.append(np.zeros_like(result[0]))
  src.release()
  result = np.array(result)[..., [2, 1, 0]]

  return result

In [None]:
import os

class FrameGenerator:
    def __init__(self, path, n_frames, n_classes, training = False):
#     def __init__(self, path, n_frames, training = False):
        self.path = path
        self.n_frames = n_frames
        self.n_classes = n_classes
        self.training = training
        self.class_names = os.listdir(path)
        #     self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
        self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

    def get_all_avi(self, root_dir, classes):
        lst_avi = []
        lst_cls = []
        # Iterate over each subfolder in the root directory
        for folder_name in classes:
            folder_path = os.path.join(root_dir, folder_name)

            # Check if the item in the root directory is a directory
            if os.path.isdir(folder_path):
                # Iterate over each file in the subfolder
                for file_name in os.listdir(folder_path):
                    file_path = os.path.join(folder_path, file_name)

                    # Check if the file is an AVI file
                    if file_name.endswith('.mp4'):
                        # Print the directory of the AVI file
                        lst_avi.append(file_path)
                        lst_cls.append(folder_name)
#                         print(file_path)
        return lst_avi, lst_cls
    
    def get_files_and_class_names(self):
        classes = self.class_names[:self.n_classes]
#         video_paths = self.get_all_avi(self.path, classes)
        video_paths, lst_classes = self.get_all_avi(self.path, classes)
        return video_paths, lst_classes

    def __call__(self):
        video_paths, classes = self.get_files_and_class_names()

        pairs = list(zip(video_paths, classes))

        if self.training:
            random.shuffle(pairs)

        for path, name in pairs:
            video_frames = frames_from_video_file(path, self.n_frames)
            label = self.class_ids_for_name[name] # Encode labels
            yield video_frames, label

In [None]:
URL = 'https://storage.googleapis.com/thumos14_files/UCF101_videos.zip'
download_dir = pathlib.Path('./UCF101_subset/')
batch_size = 16
num_frames = 10
output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.int16))

In [None]:
model_id = 'a0'
resolution = 224

tf.keras.backend.clear_session()

backbone = movinet.Movinet(model_id=model_id)
backbone.trainable = False

In [None]:
# Set num_classes=600 to load the pre-trained weights from the original model
model = movinet_model.MovinetClassifier(backbone=backbone, num_classes=600)
model.build([None, None, None, None, 3])

In [None]:
# # Load pre-trained weights
!wget https://storage.googleapis.com/tf_model_garden/vision/movinet/movinet_a0_base.tar.gz -O movinet_a0_base.tar.gz -q
!tar -xvf movinet_a0_base.tar.gz

In [None]:
checkpoint_dir = f'movinet_{model_id}_base'
checkpoint_path = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint = tf.train.Checkpoint(model=model)
status = checkpoint.restore(checkpoint_path)
status.assert_existing_objects_matched()

In [None]:
def build_classifier(batch_size, num_frames, resolution, backbone, num_classes):
    """Builds a classifier on top of a backbone model."""
    model = movinet_model.MovinetClassifier(
      backbone=backbone,
      num_classes=num_classes)
    model.build([batch_size, num_frames, resolution, resolution, 3])

    return model
loss_obj = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

optimizer = keras.optimizers.Adam(learning_rate = 0.0001)
numclasses_list = list(range(85, 101, 5))  # [5, 10, ..., 95, 100, 101]

In [None]:
import json
def plot_history(history):
    fig, (ax1, ax2) = plt.subplots(2)

    fig.set_size_inches(18.5, 10.5)

    # Plot loss
    ax1.set_title('Loss')
    ax1.plot(history.history['loss'], label = 'train')
    ax1.plot(history.history['val_loss'], label = 'test')
    ax1.set_ylabel('Loss')

    # Determine upper bound of y-axis
    max_loss = max(history.history['loss'] + history.history['val_loss'])

    ax1.set_ylim([0, np.ceil(max_loss)])
    ax1.set_xlabel('Epoch')
    ax1.legend(['Train', 'Validation'])

    # Plot accuracy
    ax2.set_title('Accuracy')
    ax2.plot(history.history['accuracy'],  label = 'train')
    ax2.plot(history.history['val_accuracy'], label = 'test')
    ax2.set_ylabel('Accuracy')
    ax2.set_ylim([0, 1])
    ax2.set_xlabel('Epoch')
    ax2.legend(['Train', 'Validation'])

    plt.show()

In [None]:
def get_actual_predicted_labels(dataset):
    actual = [labels for _, labels in dataset.unbatch()]
    predicted = model.predict(dataset)

    actual = tf.stack(actual, axis=0)
    predicted = tf.concat(predicted, axis=0)
    predicted = tf.argmax(predicted, axis=1)

    return actual, predicted

def plot_confusion_matrix(actual, predicted, labels, ds_type):
    cm = tf.math.confusion_matrix(actual, predicted)
    ax = sns.heatmap(cm, annot=True, fmt='g')
    sns.set(rc={'figure.figsize':(12, 12)})
    sns.set(font_scale=1.4)
    ax.set_title('Confusion matrix of action recognition for ' + ds_type)
    ax.set_xlabel('Predicted Action')
    ax.set_ylabel('Actual Action')
    plt.xticks(rotation=90)
    plt.yticks(rotation=0)
    ax.xaxis.set_ticklabels(labels)
    ax.yaxis.set_ticklabels(labels)

def calculate_classification_metrics(y_actual, y_pred, labels):
    cm = tf.math.confusion_matrix(y_actual, y_pred)
    tp = np.diag(cm) # Diagonal represents true positives
    precision = dict()
    recall = dict()
    for i in range(len(labels)):
        col = cm[:, i]
        fp = np.sum(col) - tp[i] # Sum of column minus true positive is false negative

        row = cm[i, :]
        fn = np.sum(row) - tp[i] # Sum of row minus true positive, is false negative

        precision[labels[i]] = tp[i] / (tp[i] + fp) # Precision

        recall[labels[i]] = tp[i] / (tp[i] + fn) # Recall

    return precision, recall

In [None]:
numclasses_list = list(range(75, 81, 5))  # [25, ..., 45, 50]
subsetpaths_train = "/kaggle/input/kinetics-train-5per/kinetics600_5per/kinetics600_5per/train"
subsetpaths_val = "/kaggle/input/kinetics-train-5per/kinetics400_5per/kinetics400_5per/train"

for numclasses in numclasses_list:
    print(f"-------------------- {numclasses} classes --------------------")
    with open(f'evaluate_CNN_{numclasses}.txt', 'w') as f:
        trainds = tf.data.Dataset.from_generator(FrameGenerator(subsetpaths_train, num_frames, numclasses, training=True), output_signature=output_signature)
        trainds = trainds.batch(batch_size)
        
        valds = tf.data.Dataset.from_generator(FrameGenerator(subsetpaths_val, num_frames, numclasses), output_signature=output_signature)
        valds = valds.batch(batch_size)

        # Build the model
        model = build_classifier(batch_size, num_frames, resolution, backbone, numclasses)

        # Compile and train the model
        model.compile(loss=loss_obj, optimizer="adam", metrics=['accuracy'])
        history = model.fit(trainds,
                          validation_data=trainds,
                          epochs=10,
                          validation_freq=1,
                          verbose=1)

        # Lưu các thông số vào tệp tin
        f.write(json.dumps(history.history))
        f.write('\n')
        model.save(f"movinet_kinetic_10ep_{numclasses}.h5")