In [1]:
import os
import cv2
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow import keras
from sklearn.utils.class_weight import compute_class_weight




In [2]:
def setup_gpu():
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    print("✅ GPU setup complete.")
setup_gpu()

✅ GPU setup complete.


In [None]:
# Load and preprocess .mp4 videos
def load_video(video_path, max_frames=30, target_size=(224, 224)):
    cap = cv2.VideoCapture(video_path.numpy().decode("utf-8"))
    frames = []
    
    for _ in range(max_frames):
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, target_size)  # Resize for MoViNet
        frame = frame / 255.0  # Normalize pixels
        frames.append(frame)

    cap.release()

    # Pad if fewer than max_frames
    while len(frames) < max_frames:
        frames.append(np.zeros(target_size + (3,)))

    return np.array(frames, dtype=np.float32)

In [None]:
# Create TensorFlow dataset
def create_dataset(directory, batch_size=8):
    video_paths = []
    labels = []

    for label, event in enumerate(sorted(os.listdir(directory))):
        event_path = os.path.join(directory, event)
        if os.path.isdir(event_path):
            for file in os.listdir(event_path):
                if file.endswith(".mp4"):
                    video_paths.append(os.path.join(event_path, file))
                    labels.append(label)  

    dataset = tf.data.Dataset.from_tensor_slices((video_paths, labels))
    dataset = dataset.map(lambda x, y: (tf.py_function(load_video, [x], tf.float32), y), num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset, labels

# Load dataset directories
train_data, train_labels = create_dataset("split_data/train", batch_size=8)
val_data, val_labels = create_dataset("split_data/eval", batch_size=8)
test_data, test_labels = create_dataset("split_data/test", batch_size=8)

In [None]:
# Compute class weights (handling rare event detection)
class_weights = compute_class_weight(class_weight="balanced", classes=np.unique(train_labels), y=train_labels)
class_weights_dict = {i: class_weights[i] for i in range(len(class_weights))}
print(f"Class Weights: {class_weights_dict}")

In [None]:
# Load MoViNet-A3 from TensorFlow Hub
model_id = 'a3'
mode = 'stream'
version = '3'
model_url = f"https://tfhub.dev/tensorflow/movinet/{model_id}/{mode}/kinetics-600/classification/{version}"

model = keras.Sequential([
    hub.KerasLayer(model_url, trainable=True),
    keras.layers.GlobalAveragePooling3D(),
    keras.layers.BatchNormalization(),  
    keras.layers.Dense(1024, activation="relu"),
    keras.layers.Dropout(0.5),
    keras.layers.Dense(512, activation="relu"),  
    keras.layers.Dropout(0.5),
    keras.layers.Dense(len(class_weights), activation="softmax")
])

# Compile model with weighted loss
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    loss=keras.losses.SparseCategoricalCrossentropy(),
    metrics=["accuracy"]
)

In [None]:
# Train model with class weights
epochs = 10  
history = model.fit(train_data, validation_data=val_data, epochs=epochs, class_weight=class_weights_dict)

# Save trained model (architecture + optimizer state)
model.save("movinet_model.h5")
model.save("movinet_saved_model")  

In [None]:
# Evaluate on test data
loss, accuracy = model.evaluate(test_data)
print(f"🎯 Test Accuracy: {accuracy * 100:.2f}%")
