In [26]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from google.colab import drive
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report, f1_score

drive.mount('/content/drive')
base_dir = '/content/drive/MyDrive/cricshot'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [27]:
def extract_frames(video_path, num_frames=16):
    cap = tf.io.read_file(video_path)
    cap = tf.io.decode_video(cap)
    total_frames = tf.shape(cap)[0]
    if total_frames < num_frames:
        indices = tf.range(total_frames)
        indices = tf.repeat(indices, tf.cast(tf.ceil(num_frames / total_frames), tf.int32))
        indices = indices[:num_frames]
    else:
        indices = tf.linspace(0, total_frames - 1, num_frames)
        indices = tf.cast(indices, tf.int32)

    frames = tf.gather(cap, indices)
    frames = tf.image.resize(frames, (224, 224))
    frames = frames / 255.0
    return frames

In [28]:
def make_dataframes(base_dir, num_frames=16):
    filepaths = []
    labels = []
    classlist = sorted(os.listdir(base_dir))
    for klass in classlist:
        classpath = os.path.join(base_dir, klass)
        if os.path.isdir(classpath):
            flist = sorted([f for f in os.listdir(classpath) if f.endswith('.mp4')])
            desc = f'{klass:25s}'
            for f in tqdm(flist, ncols=130, desc=desc, unit='files', colour='blue'):
                fpath = os.path.join(classpath, f)
                filepaths.append(fpath)
                labels.append(klass)

    df = pd.DataFrame({'filepaths': filepaths, 'labels': labels})

    train_df, temp_df = train_test_split(df, train_size=0.7, stratify=df['labels'], random_state=42)
    valid_df, test_df = train_test_split(temp_df, train_size=0.5, stratify=temp_df['labels'], random_state=42)

    print('train_df length:', len(train_df), '  valid_df length:', len(valid_df), '  test_df length:', len(test_df))
    return train_df, valid_df, test_df

train_df, valid_df, test_df = make_dataframes(base_dir)

cover                    : 0files [00:00, ?files/s]
defense                  : 0files [00:00, ?files/s]
flick                    : 0files [00:00, ?files/s]
hook                     : 0files [00:00, ?files/s]
late_cut                 : 0files [00:00, ?files/s]
lofted                   : 0files [00:00, ?files/s]
pull                     : 0files [00:00, ?files/s]
square_cut               : 0files [00:00, ?files/s]
straight                 : 0files [00:00, ?files/s]
sweep                    : 0files [00:00, ?files/s]


ValueError: With n_samples=0, test_size=None and train_size=0.7, the resulting train set will be empty. Adjust any of the aforementioned parameters.

In [8]:
class VideoDataGenerator(keras.utils.Sequence):
    def __init__(self, dataframe, batch_size, num_frames, num_classes, shuffle=True):
        self.df = dataframe
        self.batch_size = batch_size
        self.num_frames = num_frames
        self.num_classes = num_classes
        self.shuffle = shuffle
        self.on_epoch_end()
    def __len__(self):
        return int(np.ceil(len(self.df) / float(self.batch_size)))

    def __getitem__(self, idx):
        batch_df = self.df.iloc[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_videos = []
        batch_labels = []

        for _, row in batch_df.iterrows():
            video = extract_frames(row['filepaths'], self.num_frames)
            label = keras.utils.to_categorical(row['label_index'], num_classes=self.num_classes)

            batch_videos.append(video)
            batch_labels.append(label)

        return np.array(batch_videos), np.array(batch_labels)

    def on_epoch_end(self):
        if self.shuffle:
            self.df = self.df.sample(frac=1).reset_index(drop=True)

FileNotFoundError: [Errno 2] No such file or directory: '/drive/folders/1DPHURwQk5R8blgjM8VNz6Q68LqckxljX'

In [None]:
classes = sorted(train_df['labels'].unique())
class_to_index = {cls: idx for idx, cls in enumerate(classes)}

for df in [train_df, valid_df, test_df]:
    df['label_index'] = df['labels'].map(class_to_index)
num_classes = len(classes)
batch_size = 16
num_frames = 16

train_gen = VideoDataGenerator(train_df, batch_size, num_frames, num_classes)
valid_gen = VideoDataGenerator(valid_df, batch_size, num_frames, num_classes, shuffle=False)
test_gen = VideoDataGenerator(test_df, batch_size, num_frames, num_classes, shuffle=False)

In [None]:
def make_model(num_frames, img_size, num_classes, lr=0.001):
    input_shape = (num_frames, *img_size, 3)
    model = keras.Sequential([
        keras.layers.Conv3D(32, (3, 3, 3), activation='relu', input_shape=input_shape),
        keras.layers.MaxPooling3D((2, 2, 2)),
        keras.layers.Conv3D(64, (3, 3, 3), activation='relu'),
        keras.layers.MaxPooling3D((2, 2, 2)),
        keras.layers.Conv3D(128, (3, 3, 3), activation='relu'),
        keras.layers.MaxPooling3D((2, 2, 2)),
        keras.layers.Flatten(),
        keras.layers.Dense(256, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer=Adam(learning_rate=lr), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

model = make_model(num_frames, (224, 224), num_classes)

In [None]:
early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6)
epochs = 20
callbacks = [early_stop, reduce_lr]

history = model.fit(
    train_gen,
    epochs=epochs,
    verbose=1,
    callbacks=callbacks,
    validation_data=valid_gen,
    validation_steps=None,
    shuffle=False,
    initial_epoch=0
)

In [None]:
def tr_plot(tr_data, start_epoch):
    tacc = tr_data.history['accuracy']
    tloss = tr_data.history['loss']
    vacc = tr_data.history['val_accuracy']
    vloss = tr_data.history['val_loss']
    Epoch_count = len(tacc) + start_epoch
    Epochs = [i for i in range(start_epoch, Epoch_count)]
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 8))
    ax1.plot(Epochs, tloss, 'r', label='Training loss')
    ax1.plot(Epochs, vloss, 'g', label='Validation loss')
    ax1.set_title('Training and Validation Loss')
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax2.plot(Epochs, tacc, 'r', label='Training Accuracy')
    ax2.plot(Epochs, vacc, 'g', label='Validation Accuracy')
    ax2.set_title('Training and Validation Accuracy')
    ax2.set_xlabel('Epochs')
    ax2.set_ylabel('Accuracy')
    ax2.legend()
    plt.tight_layout()
    plt.show()

tr_plot(history, 0)

In [None]:
def predictor(test_gen):
    y_pred = []
    y_true = test_gen.df['label_index'].values
    classes = list(test_gen.df['labels'].unique())
    class_count = len(classes)
    errors = 0
    preds = model.predict(test_gen, verbose=1)
    tests = len(preds)
    for i, p in enumerate(preds):
        pred_index = np.argmax(p)
        true_index = test_gen.df['label_index'].iloc[i]
        if pred_index != true_index:
            errors += 1
        y_pred.append(pred_index)

    acc = (1 - errors / tests) * 100
    print(f'There were {errors} errors in {tests} tests for an accuracy of {acc:.2f}%')
    y_pred = np.array(y_pred)
    y_true = np.array(y_true)
    f1score = f1_score(y_true, y_pred, average='weighted') * 100
    if class_count <= 30:
        cm = confusion_matrix(y_true, y_pred)
        plt.figure(figsize=(12, 8))
        sns.heatmap(cm, annot=True, vmin=0, fmt='g', cmap='Blues', cbar=False)
        plt.xticks(np.arange(class_count) + .5, classes, rotation=90)
        plt.yticks(np.arange(class_count) + .5, classes, rotation=0)
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        plt.title("Confusion Matrix")
        plt.show()

    clr = classification_report(y_true, y_pred, target_names=classes, digits=4)
    print("Classification Report:\n----------------------\n", clr)
    return errors, tests, f1score

errors, tests, f1score = predictor(test_gen)

In [None]:
name = f'CRICKET-{len(train_gen.df["labels"].unique())}-({224}x{224})'
save_id = f'{name}-{f1score:.2f}.h5'
model_save_loc = os.path.join('/content/drive/MyDrive', save_id)
model.save(model_save_loc)
print(f'Model was saved as {model_save_loc}')