In [2]:
!pip install tensorflow==2.18.0 opencv-python mediapipe scikit-learn matplotlib



In [4]:
!pip install seaborn

Collecting seaborn
  Using cached seaborn-0.13.2-py3-none-any.whl.metadata (5.4 kB)
Collecting pandas>=1.2 (from seaborn)
  Using cached pandas-2.2.3-cp310-cp310-win_amd64.whl.metadata (19 kB)
Collecting pytz>=2020.1 (from pandas>=1.2->seaborn)
  Downloading pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas>=1.2->seaborn)
  Downloading tzdata-2025.2-py2.py3-none-any.whl.metadata (1.4 kB)
Using cached seaborn-0.13.2-py3-none-any.whl (294 kB)
Using cached pandas-2.2.3-cp310-cp310-win_amd64.whl (11.6 MB)
Downloading pytz-2025.2-py2.py3-none-any.whl (509 kB)
Downloading tzdata-2025.2-py2.py3-none-any.whl (347 kB)
Installing collected packages: pytz, tzdata, pandas, seaborn
Successfully installed pandas-2.2.3 pytz-2025.2 seaborn-0.13.2 tzdata-2025.2


In [37]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import seaborn 
import numpy as np
import os
import random
import pathlib
import cv2
import pandas as pd

In [38]:
# Set the parameters
NUM_CLASSES = 5  # Adjust as per your subset
BATCH_SIZE = 8
EPOCHS = 100
IMAGE_SIZE = (160, 160)
N_FRAMES = 20  # Number of frames per video
FRAME_STEP = 5

In [39]:
def format_frames(frame, output_size):
    """
    Pad and resize an image from a video, and ensure it's in the correct format for grayscale.
    
    Args:
      frame: Grayscale image that needs to be resized and padded. 
      output_size: Pixel size of the output frame image.

    Return:
      Formatted grayscale frame with padding of specified output size.
    """
    # Ensure frame is in float32 format
    frame = tf.cast(frame, tf.float32)
    
    # Normalize pixel values
    frame = frame / 255.0
    
    # Add channel dimension if it's missing
    if len(frame.shape) == 2:
        frame = tf.expand_dims(frame, axis=-1)
    
    # Resize and pad
    frame = tf.image.resize_with_pad(frame, *output_size)
    
    return frame

def frames_from_video_file(video_path, n_frames, output_size = (160, 160), frame_step = 15):
    """
    Creates grayscale frames from each video file present for each category.

    Args:
      video_path: File path to the video.
      n_frames: Number of frames to be created per video file.
      output_size: Pixel size of the output frame image.

    Return:
      A NumPy array of grayscale frames in the shape of (n_frames, height, width, 1).
    """
    # Read each video frame by frame
    result = []
    src = cv2.VideoCapture(str(video_path))  

    video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

    need_length = 1 + (n_frames - 1) * frame_step

    if need_length > video_length:
        start = 0
    else:
        max_start = int(video_length - need_length)
        start = random.randint(0, max_start + 1)

    src.set(cv2.CAP_PROP_POS_FRAMES, start)
    # ret is a boolean indicating whether read was successful, frame is the image itself
    ret, frame = src.read()
    if ret:
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        result.append(format_frames(gray_frame, output_size))

    for _ in range(n_frames - 1):
        for _ in range(frame_step):
            ret, frame = src.read()
        if ret:
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            frame = format_frames(gray_frame, output_size)
            result.append(frame)
        else:
            result.append(np.zeros_like(result[0]))
    src.release()
    result = np.array(result)

    return result
    
class FrameGenerator:
    def __init__(self, path, n_frames, training=False):
        """ Returns a set of frames with their associated label. 

        Args:
          path: Video file paths.
          n_frames: Number of frames. 
          training: Boolean to determine if training dataset is being created.
        """
        self.path = path
        self.n_frames = n_frames
        self.training = training
        self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
        self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

    def get_files_and_class_names(self):
        video_paths = list(self.path.glob('*/*.mov')) + list(self.path.glob('*/*.mp4'))
        classes = [p.parent.name for p in video_paths] 
        return video_paths, classes

    def __call__(self):
        video_paths, classes = self.get_files_and_class_names()

        pairs = list(zip(video_paths, classes))

        if self.training:
            random.shuffle(pairs)

        for path, name in pairs:
            video_frames = frames_from_video_file(path, self.n_frames) 
            label = self.class_ids_for_name[name] # Encode labels
            yield video_frames, label

In [41]:
import pathlib
import random
from sklearn.model_selection import train_test_split
import shutil  # To move or copy files

# Path to your datasets (videos and CSV files)
vid_dir = pathlib.Path(r'C:\campus\3rd\2 sem\caspone\sign bridge\Dataset\Dataset - MP - VID')
original_dir = pathlib.Path(r'C:\campus\3rd\2 sem\caspone\sign bridge\Dataset\Dataset - Original')
csv_dir = pathlib.Path(r'C:\campus\3rd\2 sem\caspone\sign bridge\Dataset\Dataset - MP - CSV')

# Find all video files (MOV and MP4) and CSV files
mov_files = list(vid_dir.rglob("*.mov"))
mp4_files = list(original_dir.rglob("*.mp4"))
csv_files = list(csv_dir.rglob("*.csv"))

# Print the found video and CSV files for debugging
print(f"Found MOV files: {len(mov_files)}")
print(f"Found MP4 files: {len(mp4_files)}")
print(f"Found CSV files: {len(csv_files)}")

# Combine all video files
all_video_files = mov_files + mp4_files

# Ensure that there is a one-to-one correspondence between video files and CSV files
video_to_csv = {}
for video_file in all_video_files:
    # Check if a CSV file with the same name exists
    csv_file = csv_dir / (video_file.stem + '.csv')
    if csv_file.exists():
        video_to_csv[video_file] = csv_file

# Check how many video-to-CSV matches we have
print(f"Found {len(video_to_csv)} video-CSV pairs")

# Now, split data into training, validation, and testing sets if we have valid pairs
if len(video_to_csv) > 0:
    train_video_files, temp_video_files = train_test_split(list(video_to_csv.keys()), test_size=0.4, random_state=42)
    val_video_files, test_video_files = train_test_split(temp_video_files, test_size=0.5, random_state=42)

    # Get the corresponding CSV files for the splits
    train_csv_files = [video_to_csv[video_file] for video_file in train_video_files]
    val_csv_files = [video_to_csv[video_file] for video_file in val_video_files]
    test_csv_files = [video_to_csv[video_file] for video_file in test_video_files]

    # Print out the number of files in each split to verify
    print("Training video samples:", len(train_video_files))
    print("Validation video samples:", len(val_video_files))
    print("Test video samples:", len(test_video_files))

    print("Training CSV samples:", len(train_csv_files))
    print("Validation CSV samples:", len(val_csv_files))
    print("Test CSV samples:", len(test_csv_files))

    # Define directories for the splits
    train_dir = pathlib.Path(r'C:\campus\3rd\2 sem\caspone\sign bridge\Dataset\train')
    val_dir = pathlib.Path(r'C:\campus\3rd\2 sem\caspone\sign bridge\Dataset\val')
    test_dir = pathlib.Path(r'C:\campus\3rd\2 sem\caspone\sign bridge\Dataset\test')

    # Ensure that the directories exist, or create them
    train_dir.mkdir(parents=True, exist_ok=True)
    val_dir.mkdir(parents=True, exist_ok=True)
    test_dir.mkdir(parents=True, exist_ok=True)

    # Function to move or copy files to the target directory
    def move_files(file_list, target_dir):
        for file in file_list:
            shutil.copy(file, target_dir)  # Or use shutil.move to move the files instead of copying

    # Move video files and CSV files to corresponding directories
    move_files(train_video_files, train_dir)
    move_files(val_video_files, val_dir)
    move_files(test_video_files, test_dir)

    move_files(train_csv_files, train_dir)
    move_files(val_csv_files, val_dir)
    move_files(test_csv_files, test_dir)

    print("Files have been successfully moved to their respective directories.")
else:
    print("No matching video-CSV pairs found. Please check the file names and paths.")



Found MOV files: 16
Found MP4 files: 23
Found CSV files: 40
Found 0 video-CSV pairs
No matching video-CSV pairs found. Please check the file names and paths.


In [23]:
# Load the pre-trained I3D model (or download one)
def build_i3d_model(num_classes):
    base_model = tf.keras.applications.InceptionV3(
        include_top=False,
        weights="imagenet",
        input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3),
        pooling='avg'  # Global Average Pooling for reducing the dimensions
    )
    
    # Adding temporal dimension using TimeDistributed layers
    inputs = tf.keras.layers.Input(shape=(N_FRAMES, IMAGE_SIZE[0], IMAGE_SIZE[1], 1))

    # Convert grayscale frames to 3 channels
    x = tf.keras.layers.TimeDistributed(tf.keras.layers.Conv2D(3, (3, 3), padding="same"))(inputs)

    # Feed into I3D model (Inflating ConvNet layers)
    x = tf.keras.layers.TimeDistributed(base_model)(x)

    # Temporal pooling to aggregate frame-level features
    x = tf.keras.layers.GlobalAveragePooling1D()(x)

    # Classification layer
    x = tf.keras.layers.Dense(1024, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.85)(x)
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)

    # Create the final model
    model = tf.keras.models.Model(inputs, outputs)

    # Compile the model with optimizer, loss, and metrics
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    return model

In [28]:
def prepare_datasets(train_path, val_path, batch_size, n_frames):
    # Initialize frame generators for training and validation datasets
    train_gen = FrameGenerator(train_path, n_frames, training=True)
    val_gen = FrameGenerator(val_path, n_frames, training=False)

    # Create TensorFlow Dataset from generators
    train_dataset = tf.data.Dataset.from_generator(train_gen,
                                                   output_signature=(
                                                       tf.TensorSpec(shape=(n_frames, 160, 160, 1), dtype=tf.float32),
                                                       tf.TensorSpec(shape=(), dtype=tf.int64)))
    val_dataset = tf.data.Dataset.from_generator(val_gen,
                                                 output_signature=(
                                                     tf.TensorSpec(shape=(n_frames, 160, 160, 1), dtype=tf.float32),
                                                     tf.TensorSpec(shape=(), dtype=tf.int64)))
    
    # Apply batching and prefetching to optimize performance
    train_dataset = train_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    val_dataset = val_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

    return train_dataset, val_dataset

# Define the function to compile the I3D model
def compile_i3d_model(model):
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])


In [25]:
# Define EarlyStopping and ModelCheckpoint callbacks
early_stopping = EarlyStopping(
    monitor='val_loss',  # Monitors validation loss
    patience=7,          # Number of epochs with no improvement after which training will be stopped
    restore_best_weights=True  # Restores model weights from the epoch with the best value of the monitored quantity
)

checkpoint = ModelCheckpoint(
    'fine_tuned_i3d_ucf101_ssl400.weights.h5',  # Change filename extension
    monitor='val_accuracy',
    save_best_only=True,
    save_weights_only=True  # Ensures only weights are saved
)


In [26]:
# Fine-tune the model
train_dataset, val_dataset = prepare_datasets(subset_paths['train'], subset_paths['val'], BATCH_SIZE, N_FRAMES)

AttributeError: 'list' object has no attribute 'iterdir'

In [None]:
model = build_i3d_model(NUM_CLASSES)

In [None]:
model.summary()

In [None]:
# Train the model with callbacks
history = model.fit(train_dataset,
                    validation_data=val_dataset,
                    epochs=EPOCHS,
                    callbacks=[early_stopping, checkpoint])

In [None]:
# Print the best training and validation accuracies
best_train_acc = max(history.history['accuracy'])
best_val_acc = max(history.history['val_accuracy'])
best_train_loss = min(history.history['loss'])
best_val_loss = min(history.history['val_loss'])

print(f'Best Training Accuracy: {best_train_acc:.4f}')
print(f'Best Validation Accuracy: {best_val_acc:.4f}')
print(f'Best Training Loss: {best_train_loss:.4f}')
print(f'Best Validation Loss: {best_val_loss:.4f}')

In [None]:
import matplotlib.pyplot as plt
def plot_history(history):
  """
    Plotting training and validation learning curves.

    Args:
      history: model history with all the metric measures
  """
  fig, (ax1, ax2) = plt.subplots(2)

  fig.set_size_inches(18.5, 10.5)

  # Plot loss
  ax1.set_title('Loss')
  ax1.plot(history.history['loss'], label = 'train')
  ax1.plot(history.history['val_loss'], label = 'test')
  ax1.set_ylabel('Loss')
  
  # Determine upper bound of y-axis
  max_loss = max(history.history['loss'] + history.history['val_loss'])

  ax1.set_ylim([0, np.ceil(max_loss)])
  ax1.set_xlabel('Epoch')
  ax1.legend(['Train', 'Validation']) 

  # Plot accuracy
  ax2.set_title('Accuracy')
  ax2.plot(history.history['accuracy'],  label = 'train')
  ax2.plot(history.history['val_accuracy'], label = 'test')
  ax2.set_ylabel('Accuracy')
  ax2.set_ylim([0, 1])
  ax2.set_xlabel('Epoch')
  ax2.legend(['Train', 'Validation'])

  plt.show()

plot_history(history)

In [None]:
def get_actual_predicted_labels(dataset): 
  """
    Create a list of actual ground truth values and the predictions from the model.

    Args:
      dataset: An iterable data structure, such as a TensorFlow Dataset, with features and labels.

    Return:
      Ground truth and predicted values for a particular dataset.
  """
  actual = [labels for _, labels in dataset.unbatch()]
  predicted = model.predict(dataset)

  actual = tf.stack(actual, axis=0)
  predicted = tf.concat(predicted, axis=0)
  predicted = tf.argmax(predicted, axis=1)

  return actual, predicted

In [None]:
import seaborn as sns
def plot_confusion_matrix(actual, predicted, labels, ds_type):
  cm = tf.math.confusion_matrix(actual, predicted)
  ax = sns.heatmap(cm, annot=True, fmt='g')
  sns.set(rc={'figure.figsize':(12, 12)})
  sns.set(font_scale=1.4)
  ax.set_title('Confusion matrix of action recognition for ' + ds_type)
  ax.set_xlabel('Predicted Action')
  ax.set_ylabel('Actual Action')
  plt.xticks(rotation=90)
  plt.yticks(rotation=0)
  ax.xaxis.set_ticklabels(labels)
  ax.yaxis.set_ticklabels(labels)

In [None]:
fg = FrameGenerator(subset_paths['train'], N_FRAMES, training=True)
labels = list(fg.class_ids_for_name.keys())

In [None]:
actual, predicted = get_actual_predicted_labels(train_dataset)
plot_confusion_matrix(actual, predicted, labels, 'training')

In [None]:
# Evaluate the model
test_gen = FrameGenerator(subset_paths['test'], N_FRAMES, training=False)
test_dataset = tf.data.Dataset.from_generator(test_gen,
                                              output_signature=(
                                                  tf.TensorSpec(shape=(N_FRAMES, 160, 160, 1), dtype=tf.float32),
                                                  tf.TensorSpec(shape=(), dtype=tf.int64)))
test_dataset = test_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

test_loss, test_accuracy = model.evaluate(test_dataset)
print(f"Test Accuracy: {test_accuracy:.2f}")

In [None]:
# Save the fine-tuned model
model.save('fine_tuned_i3d_ucf101_ssl400.h5')

In [None]:
import tensorflow as tf
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
# Function to calculate and print precision, recall, F1 score, and accuracy
def print_metrics(actual, predicted):
    actual_np = actual.numpy()
    predicted_np = predicted.numpy()
    
    precision = precision_score(actual_np, predicted_np, average='weighted')
    recall = recall_score(actual_np, predicted_np, average='weighted')
    f1 = f1_score(actual_np, predicted_np, average='weighted')
    accuracy = accuracy_score(actual_np, predicted_np)

    # Print the metrics
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')
    print(f'Accuracy: {accuracy:.4f}')

# Example usage
fg = FrameGenerator(subset_paths['train'], N_FRAMES, training=True)
labels = list(fg.class_ids_for_name.keys())

# Assuming `train_dataset` is already prepared
actual, predicted = get_actual_predicted_labels(train_dataset)
print_metrics(actual, predicted)