In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, Flatten, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import confusion_matrix, accuracy_score
import matplotlib.pyplot as plt



In [None]:
import os
import cv2
import numpy as np

# Paths to your video folders (make sure these paths are correct)
train_dir = '/content/drive/MyDrive/Avenue Dataset/training_videos'
test_dir = '/content/drive/MyDrive/Avenue Dataset/testing_videos'  # Corrected path if necessary

# Check if the directories exist
if not os.path.exists(train_dir):
    print(f"Training directory {train_dir} does not exist.")
else:
    print("Training directory found.")

if not os.path.exists(test_dir):
    print(f"Testing directory {test_dir} does not exist.")
else:
    print("Testing directory found.")

# Function to load frames from videos
def load_video_frames(video_path, target_size=(126, 126), frame_stride=1):
    frames = []
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, target_size)
        frames.append(frame)
    cap.release()
    return np.array(frames)

# Load all videos from a directory
def load_all_videos(video_dir):
    videos = []
    for video_file in os.listdir(video_dir):
        if video_file.endswith('.avi'):  # Ensure only .avi files are loaded
            video_path = os.path.join(video_dir, video_file)
            frames = load_video_frames(video_path)
            if len(frames) > 0:
                videos.append(frames)
    return videos

# Load training and testing videos
train_videos = load_all_videos(train_dir)
test_videos = load_all_videos(test_dir)

# Display the number of videos loaded
print(f"Loaded {len(train_videos)} training videos.")
print(f"Loaded {len(test_videos)} testing videos.")


Training directory found.
Testing directory found.
Loaded 16 training videos.
Loaded 21 testing videos.


In [None]:
# Function to load frames from videos and convert them to RGB
def load_video_frames(video_path, target_size=(126, 126)):
    frames = []
    cap = cv2.VideoCapture(video_path)
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, target_size)  # Resize frame to target size
        frames.append(frame)
    cap.release()
    return np.array(frames)

# Function to preprocess frames (convert to grayscale, normalize, and convert to 3 channels)
def preprocess_frames(frames):
    frames = [cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) for frame in frames]  # Convert to grayscale
    frames = np.array(frames) / 255.0  # Normalize the pixel values to [0, 1]
    frames = np.repeat(frames[..., np.newaxis], 3, axis=-1)  # Convert grayscale to 3 channels (RGB)
    return frames

In [None]:
# Build the ResNet50 model
def build_model(input_shape=(126, 126, 3)):  # ResNet50 expects 3 channels (RGB)
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = False  # Freeze the layers of ResNet50

    x = Flatten()(base_model.output)
    x = Dropout(0.5)(x)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = Dense(1, activation='sigmoid')(x)  # Binary classification (normal vs abnormal)

    model = Model(inputs=base_model.input, outputs=x)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

In [None]:
# Convert videos to numpy arrays one at a time to avoid memory overload
def prepare_data(videos, labels):
    X_data = []
    y_data = []

    for video, label in zip(videos, labels):
        X_data.append(np.array(video))
        y_data.append(label)

    # Convert list of arrays into a single numpy array
    X_data = np.concatenate(X_data, axis=0)
    y_data = np.array(y_data)

    return X_data, y_data

# Example labels (modify this based on actual labels for your dataset)
train_labels = [0] * len(train_videos)  # Replace with actual labels
test_labels = [0] * len(test_videos)    # Replace with actual labels

# Prepare data
X_train, y_train = prepare_data(train_videos, train_labels)
X_test, y_test = prepare_data(test_videos, test_labels)


In [None]:
# Create and summarize the model
model = build_model(input_shape=(126, 126, 3))
model.summary()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94765736/94765736[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 0us/step


In [None]:
# Flatten the video frames into a format suitable for training
X_train = np.concatenate(train_videos, axis=0)  # Flatten list of video frames
X_test = np.concatenate(test_videos, axis=0)

# Example: Define the labels for training and testing
# For binary classification, labels could be 0 (normal) and 1 (abnormal)
train_labels = np.array([0]*len(train_videos))  # Modify with your actual labels
test_labels = np.array([0]*len(test_videos))  # Modify with your actual labels


In [None]:
# Prepare data by assigning the label to each frame of the video
def prepare_data(videos, labels):
    X_data = []
    y_data = []

    for video, label in zip(videos, labels):
        X_data.append(np.array(video))
        y_data.extend([label] * len(video))  # Repeat the label for each frame in the video

    # Convert list of arrays into a single numpy array
    X_data = np.concatenate(X_data, axis=0)
    y_data = np.array(y_data)

    return X_data, y_data

# Example labels for videos (modify based on actual data)
train_labels = [0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1]  # Replace with your actual labels
test_labels = [0, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1]  # Replace with actual labels

# Prepare data
X_train, y_train = prepare_data(train_videos, train_labels)
X_test, y_test = prepare_data(test_videos, test_labels)


In [None]:
import tensorflow as tf
from tensorflow.keras.applications import MobileNet  # Faster alternative to ResNet50
from tensorflow.keras.layers import Dense, Flatten, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping

# Enable mixed precision for faster processing (if using a GPU)
tf.keras.mixed_precision.set_global_policy('mixed_float16')

# Adjust batch size
batch_size = 128

# Define and compile the model using MobileNet (faster than ResNet50)
base_model = MobileNet(weights='imagenet', include_top=False, input_shape=(126,126, 3))
x = Flatten()(base_model.output)
x = Dense(128, activation='relu')(x)  # Use a smaller dense layer
x = Dropout(0.5)(x)
output = Dense(1, activation='sigmoid')(x)
model = Model(inputs=base_model.input, outputs=output)

# Freeze base model layers
for layer in base_model.layers:
    layer.trainable = False

# Compile the model with mixed precision optimizer
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
              loss='binary_crossentropy', metrics=['accuracy'])

# Early stopping callback to prevent over-training
early_stopping = EarlyStopping(monitor='val_loss', patience=2, restore_best_weights=True)

# Train the model on a subset of the data
X_train_subset = X_train[:int(0.3 * len(X_train))]  # 30% of training data
y_train_subset = y_train[:int(0.3 * len(y_train))]

# Fit the model with smaller batch size and fewer epochs
model.fit(X_train_subset, y_train_subset, epochs=2, batch_size=batch_size,
          validation_data=(X_test, y_test), callbacks=[early_stopping])

  base_model = MobileNet(weights='imagenet', include_top=False, input_shape=(126,126, 3))


Epoch 1/2
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1037s[0m 29s/step - accuracy: 0.6369 - loss: 3.2521 - val_accuracy: 0.4264 - val_loss: 7.8162
Epoch 2/2
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1109s[0m 31s/step - accuracy: 0.9849 - loss: 0.0829 - val_accuracy: 0.4582 - val_loss: 5.8259


<keras.src.callbacks.history.History at 0x79117e5794b0>

In [None]:
# Predict on the test set
predictions = model.predict(X_test)
predictions = (predictions > 0.5).astype(int)  # Convert predictions to binary labels

# Evaluate accuracy
accuracy = np.mean(predictions == y_test)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

# Confusion matrix
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(y_test, predictions)
print("Confusion Matrix:\n", cm)

# Plot confusion matrix
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
plt.xlabel('Predicted')
plt.ylabel('True')
plt.xticks([0, 1])
plt.yticks([0, 1])
plt.show()
