In [1]:
import numpy as np
import os
import cv2
import keras
from keras.layers import Dense, Flatten, Dropout
from keras.applications import ResNet50
from keras.models import Model
from keras.preprocessing.image import ImageDataGenerator
from keras.optimizers import Adam
from sklearn.model_selection import train_test_split


In [2]:
def load_video_frames_and_labels(videos_path, activities, frame_size=(224, 224), n_frames=16):
    frames = []
    labels = []
    
    for i, activity in enumerate(activities):
        activity_folder = os.path.join(videos_path, activity)
        video_files = os.listdir(activity_folder)
        
        for video_file in video_files:
            video_path = os.path.join(activity_folder, video_file)
            cap = cv2.VideoCapture(video_path)
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            step = frame_count // n_frames

            for j in range(n_frames):
                ret, frame = cap.read()
                if ret:
                    frame = cv2.resize(frame, frame_size)
                    frames.append(frame)
                    labels.append(i)
                cap.set(cv2.CAP_PROP_POS_FRAMES, (j + 1) * step)
            cap.release()
            
    return np.array(frames), np.array(labels)


In [3]:
# load data
import os

# project_dir = os.path.abspath("./")
# os.chdir(project_dir)

videos_path = "../data/Sport Videos"
activities = ["BaseballPitch", "Basketball", "Fencing", "TennisSwing", "VolleyballSpiking"]
frame_size = (224, 224)
n_frames = 16

frames, labels = load_video_frames_and_labels(videos_path, activities, frame_size, n_frames)


In [4]:
# split data
X_train, X_val, y_train, y_val = train_test_split(frames, labels, test_size=0.2, random_state=42, stratify=labels)

# One-hot encode the labels
y_train = keras.utils.to_categorical(y_train, num_classes=len(activities))
y_val = keras.utils.to_categorical(y_val, num_classes=len(activities))


In [5]:
# Set up data augmentation

train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    horizontal_flip=True)

train_datagen.fit(X_train)



In [6]:
# Create the model using transfer learning

base_model = ResNet50(include_top=False, weights='imagenet', input_shape=(224, 224, 3), pooling='avg')

# Freeze the base model
for layer in base_model.layers:
    layer.trainable = False

x = base_model.output
x = Dense(512, activation='relu')(x)
x = Dropout(0.5)(x)
predictions = Dense(len(activities), activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=predictions)
model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])



In [7]:
# Train the model

batch_size = 32
epochs = 10

history = model.fit(train_datagen.flow(X_train, y_train, batch_size=batch_size),
                    steps_per_epoch=len(X_train) // batch_size,
                    epochs=epochs,
                    validation_data=(X_val, y_val),
                    verbose=1)


Epoch 1/10


2023-05-06 13:21:54.511452: W tensorflow/tsl/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [8]:
# Evaluate the model on the validation set

scores = model.evaluate(X_val, y_val, verbose=1)
print("Validation loss:", scores[0])
print("Validation accuracy:", scores[1])


Validation loss: 0.0052587841637432575
Validation accuracy: 0.9986238479614258


In [10]:
model.save("video_classification_model.h5")


In [11]:
def extract_features(model, frames, batch_size=32):
    features = model.predict(frames, batch_size=batch_size, verbose=1)
    return features

# Load the base model
base_model = ResNet50(include_top=False, weights='imagenet', input_shape=(224, 224, 3), pooling='avg')

# Extract features for the training and validation sets
X_train_features = extract_features(base_model, X_train)
X_val_features = extract_features(base_model, X_val)




In [12]:
def aggregate_features(features, n_frames=16):
    aggregated_features = []
    for i in range(0, len(features), n_frames):
        avg_features = np.mean(features[i:i + n_frames], axis=0)
        aggregated_features.append(avg_features)
    return np.array(aggregated_features)

X_train_aggregated = aggregate_features(X_train_features)
X_val_aggregated = aggregate_features(X_val_features)

# Adjust the labels to match the aggregated features
y_train_aggregated = y_train[::16]
y_val_aggregated = y_val[::16]


In [13]:
input_shape = X_train_aggregated.shape[1:]

classifier = keras.Sequential([
    Dense(512, activation='relu', input_shape=input_shape),
    Dropout(0.5),
    Dense(len(activities), activation='softmax')
])

classifier.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

batch_size = 32
epochs = 10

history = classifier.fit(X_train_aggregated, y_train_aggregated,
                          batch_size=batch_size,
                          epochs=epochs,
                          validation_data=(X_val_aggregated, y_val_aggregated),
                          verbose=1)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [15]:
scores = classifier.evaluate(X_val_aggregated, y_val_aggregated, verbose=1)
print("Validation loss:", scores[0])
print("Validation accuracy:", scores[1])


Validation loss: 1.5853623151779175
Validation accuracy: 0.277372270822525


In [16]:
classifier.save("video_classification_model_simplified.h5")


In [17]:
# Train a CNN for image classification on the frames

input_shape = (224, 224, 3)

base_model = ResNet50(include_top=False, weights='imagenet', input_shape=input_shape, pooling='avg')

image_classifier = keras.Sequential([
    base_model,
    Dense(len(activities), activation='softmax')
])

image_classifier.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

batch_size = 32
epochs = 10

history = image_classifier.fit(train_datagen.flow(X_train, y_train, batch_size=batch_size),
                                steps_per_epoch=len(X_train) // batch_size,
                                epochs=epochs,
                                validation_data=(X_val, y_val),
                                verbose=1)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [18]:
# Define a function to apply moving average on predictions:

def moving_average(predictions, window_size=5):
    cumsum = np.cumsum(predictions, axis=0)
    cumsum[window_size:] = cumsum[window_size:] - cumsum[:-window_size]
    return cumsum[window_size - 1:] / window_size


In [19]:
# Predict frame-level probabilities and apply the moving average:

frame_probabilities = image_classifier.predict(X_val)
averaged_probabilities = moving_average(frame_probabilities)




In [20]:
# Aggregate the predictions to video-level by averaging frame predictions for each video:

def aggregate_probabilities(probabilities, n_frames=16):
    aggregated_probabilities = []
    for i in range(0, len(probabilities), n_frames):
        avg_probabilities = np.mean(probabilities[i:i + n_frames], axis=0)
        aggregated_probabilities.append(avg_probabilities)
    return np.array(aggregated_probabilities)

y_val_aggregated_probabilities = aggregate_probabilities(averaged_probabilities)


In [21]:
# Convert the aggregated probabilities to class labels:

y_val_predicted = np.argmax(y_val_aggregated_probabilities, axis=1)


In [22]:
# Calculate the accuracy:

from sklearn.metrics import accuracy_score

y_val_true = y_val[::16]  # Use the original labels of the validation set, taking one label per video
accuracy = accuracy_score(y_val_true, y_val_predicted)
print("Validation accuracy:", accuracy)


ValueError: Found input variables with inconsistent numbers of samples: [137, 136]

# Images

In [None]:
import os
import numpy as np
import cv2
import random
from math import ceil
from sklearn.model_selection import train_test_split


In [None]:
def load_images_and_labels(image_path, activities, image_size):
    images = []
    labels = []

    for activity_id, activity in enumerate(activities):
        activity_folder = os.path.join(image_path, activity)
        for image_name in os.listdir(activity_folder):
            image = cv2.imread(os.path.join(activity_folder, image_name))
            image = cv2.resize(image, image_size)
            images.append(image)
            labels.append(activity_id)

    return np.array(images), np.array(labels)


In [None]:
image_path = "./data/Sport Images"
activities = ["baseball", "basketball", "fencing", "tennis", "volleyball"]
image_size = (224, 224)

images, labels = load_images_and_labels(image_path, activities, image_size)

X_train_val, X_test, y_train_val, y_test = train_test_split(images, labels, test_size=0.15, stratify=labels, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.176, stratify=y_train_val, random_state=42)


In [None]:
# Convert labels to one-hot encoding:

from keras.utils import to_categorical

y_train = to_categorical(y_train, num_classes=len(activities))
y_val = to_categorical(y_val, num_classes=len(activities))
y_test = to_categorical(y_test, num_classes=len(activities))


In [None]:
# Define a function to create the transfer learning model:

def create_transfer_learning_model(base_model, num_classes):
    x = base_model.output
    x = Dense(512, activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(0.2)(x)
    predictions = Dense(num_classes, activation='softmax')(x)
    return Model(inputs=base_model.input, outputs=predictions)


In [None]:
# Prepare data augmentation:

train_datagen = ImageDataGenerator(
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest')


In [None]:
# Define a function to plot the training and validation errors:

def plot_errors(history, model_name):
    plt.figure()
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title(f'{model_name} - Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()


In [None]:
# Train and evaluate the model for each pre-trained model:

pretrained_models = [ResNet50, EfficientNetB0, VGG16]
pretrained_model_names = ['ResNet50', 'EfficientNetB0', 'VGG16']

for model_id, pretrained_model in enumerate(pretrained_models):
    # Load pre-trained model
    base_model = pretrained_model(include_top=False, weights='imagenet', input_shape=(224, 224, 3), pooling='avg')

    # Create transfer learning model
    model = create_transfer_learning_model(base_model, len(activities))

    # Freeze all layers except the last Dense layers
    for layer in base_model.layers:
        layer.trainable = False

    # Compile the model
    model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

    # Train the model with data augmentation
    batch_size = 5
    epochs = 100
    early_stopping = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)

    history = model.fit(train_datagen.flow(X_train, y_train, batch_size=batch_size),
                        steps_per_epoch=len(X_train) // batch_size,
                        epochs=epochs,
                        validation_data=(X_val, y_val),
                        callbacks=[early_stopping],
                        verbose=1)

    # Plot the training and validation errors vs. epochs
    plot_errors(history, pretrained_model_names[model_id])


    # Evaluate the model
    y_train_pred = np.argmax(model.predict(X_train), axis=1)
    y_val_pred = np.argmax(model.predict(X_val), axis=1)
    y_test_pred = np.argmax(model.predict(X_test), axis=1)

    y_train_true = np.argmax(y_train, axis=1)
    y_val_true = np.argmax(y_val, axis=1)
    y_test_true = np.argmax(y_test, axis=1)

    print(f"Results for {pretrained_model_names[model_id]}:")
    print("Confusion Matrix (Train):")
    print(confusion_matrix(y_train_true, y_train_pred))
    print("Confusion Matrix (Validation):")
    print(conf
