In [None]:
import os
import cv2
import glob
import numpy as np
import tensorflow as tf
import keras

In [None]:
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Flatten, Dense, Dropout
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, roc_auc_score, precision_score, recall_score, accuracy_score, confusion_matrix

In [None]:
# Parameters
frame_size = (224, 224)
num_frames = 16
num_classes = 2
batch_size = 16
epochs = 50
classes = ['normal', 'accident']

In [None]:
# Define input shape
input_shape = (50, 264, 264, 3)

In [None]:
# pre-process the data
def preprocess_video(video_path, frame_size, num_frames):
    vidcap = cv2.VideoCapture(video_path)
    frames = []
    frame_count = 0

    while len(frames) < num_frames:
        success, frame = vidcap.read()
        if not success:
            break

        frame = cv2.resize(frame, frame_size)
        frame = frame / 255.
        frames.append(frame)
        frame_count += 1

    if len(frames) == num_frames:
        return np.stack(frames)
    else:
        return None 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Load video paths
train_accident_videos = glob.glob('/content/drive/MyDrive/Dataset/training/accident/*.*')
train_normal_videos = glob.glob('/content/drive/MyDrive/Dataset/training/normal/*.*')
test_accident_videos = glob.glob('/content/drive/MyDrive/Dataset/testing/accident/*.*')
test_normal_videos = glob.glob('/content/drive/MyDrive/Dataset/testing/normal/*.*')

In [None]:
# Combine the lists of paths and create labels
train_videos = train_accident_videos + train_normal_videos
train_labels = [1] * len(train_accident_videos) + [0] * len(train_normal_videos)
test_videos = test_accident_videos + test_normal_videos
test_labels = [1] * len(test_accident_videos) + [0] * len(test_normal_videos)

In [None]:
# Custom generator
def video_generator(video_paths, labels, frame_size, num_frames, batch_size):
    while True:
        # Shuffle the data
        indices = np.arange(len(video_paths))
        np.random.shuffle(indices)
        video_paths = [video_paths[i] for i in indices]
        labels = [labels[i] for i in indices]

        # Generate batches
        for i in range(0, len(video_paths), batch_size):
            batch_videos = []
            batch_labels = []

            for j in range(i, min(i + batch_size, len(video_paths))):
                video_path = video_paths[j]
                video = preprocess_video(video_path, frame_size, num_frames)
                label = labels[j]

                batch_videos.append(video)
                batch_labels.append(label)

            yield np.stack(batch_videos, axis=0), to_categorical(batch_labels, num_classes)


In [None]:
# Create generators
train_gen = video_generator(train_videos, train_labels, frame_size, num_frames, batch_size)
test_gen = video_generator(test_videos, test_labels, frame_size, num_frames, batch_size)

In [None]:
# Defining the model
# 3 convolutional layers, 3 max pooling layers, 2 fully connected layers
model = Sequential()
model.add(Conv3D(32, kernel_size=(3, 3, 3), activation='relu', input_shape=(num_frames, *frame_size, 3)))
model.add(MaxPooling3D(pool_size=(2, 2, 2)))
model.add(Conv3D(64, kernel_size=(3, 3, 3), activation='relu'))
model.add(MaxPooling3D(pool_size=(2, 2, 2)))
# model.add(Dropout(0.5))
model.add(Conv3D(128, kernel_size=(3, 3, 3), activation='relu', padding='same'))
model.add(MaxPooling3D(pool_size=(2, 2, 2), padding='same'))
# model.add(Dropout(0.5))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))


In [None]:
# Compile the model

adam = tf.keras.optimizers.Adam(
    learning_rate=0.001,
    )

model.compile(loss='categorical_crossentropy', optimizer=adam , metrics=['accuracy', tf.keras.metrics.AUC()])

In [None]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv3d (Conv3D)             (None, 14, 222, 222, 32)  2624      
                                                                 
 max_pooling3d (MaxPooling3D  (None, 7, 111, 111, 32)  0         
 )                                                               
                                                                 
 conv3d_1 (Conv3D)           (None, 5, 109, 109, 64)   55360     
                                                                 
 max_pooling3d_1 (MaxPooling  (None, 2, 54, 54, 64)    0         
 3D)                                                             
                                                                 
 conv3d_2 (Conv3D)           (None, 2, 54, 54, 128)    221312    
                                                                 
 max_pooling3d_2 (MaxPooling  (None, 1, 27, 27, 128)   0

In [None]:
# Calculating steps per epoch and validation steps
steps_per_epoch = len(train_videos) // batch_size
validation_steps = len(test_videos) // batch_size

In [None]:
# Training the model
history = model.fit(train_gen,
                    steps_per_epoch=steps_per_epoch,
                    epochs=epochs,
                    validation_data=test_gen,
                    validation_steps=validation_steps)

In [None]:
true_labels = []
predicted_labels = []

for batch_videos, batch_labels in test_gen:
    batch_predictions = model.predict(batch_videos)
    batch_predicted_labels = np.argmax(batch_predictions, axis=1)
    batch_true_labels = np.argmax(batch_labels, axis=1)

    true_labels.extend(batch_true_labels)
    predicted_labels.extend(batch_predicted_labels)

    if len(true_labels) >= len(test_videos):
        break

true_labels = np.array(true_labels)
predicted_labels = np.array(predicted_labels)


In [None]:
accuracy = accuracy_score(true_labels, predicted_labels)
f_measure = f1_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels)
recall = recall_score(true_labels, predicted_labels)
auc = roc_auc_score(true_labels, predicted_labels)

tn, fp, fn, tp = confusion_matrix(true_labels, predicted_labels).ravel()
specificity = tn / (tn + fp)
sensitivity = tp / (tp + fn)

print(f"F-measure: {f_measure}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"Specificity: {specificity}")
print(f"Sensitivity: {sensitivity}")

In [None]:
# Saving the model
model.save('crash_detector_experiment3.h5')

In [None]:
# Loading the model
model = load_model('crash_detector_experiment3.h5')

In [None]:
loss, accuracy, auc = model.evaluate(test_gen, steps=validation_steps)
print(f"Test loss: {loss}, Test accuracy: {accuracy} , Test auc: {auc}")