In [1]:
# setup and imports
import os
import numpy as np
import pandas as pd
import cv2
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (Conv3D, MaxPooling3D, Flatten, Dense, 
                                    LSTM, TimeDistributed, ConvLSTM2D, Dropout,
                                    BatchNormalization, Input, Reshape)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt

In [2]:
import os
from pathlib import Path

class Config:
    # Get the current script directory
    BASE_DIR = Path(__file__).parent
    
    # Dataset parameters
    SEQUENCE_LENGTH = 20
    IMG_HEIGHT = 224
    IMG_WIDTH = 224
    CHANNELS = 3
    
    # Training parameters
    EPOCHS = 50
    BATCH_SIZE = 8
    LEARNING_RATE = 0.0001
    VALIDATION_SPLIT = 0.2
    TEST_SPLIT = 0.1
    
    # Paths - using absolute paths
    DATASET_DIR = BASE_DIR / "dataset"
    ACCIDENT_DIR = DATASET_DIR / "accident"
    NON_ACCIDENT_DIR = DATASET_DIR / "non_accident"
    MODEL_SAVE_PATH = BASE_DIR / "models" / "accident_detection_model.h5"
    
    # Create directories if they don't exist
    DATASET_DIR.mkdir(exist_ok=True)
    ACCIDENT_DIR.mkdir(exist_ok=True)
    NON_ACCIDENT_DIR.mkdir(exist_ok=True)
    (BASE_DIR / "models").mkdir(exist_ok=True)

config = Config()

NameError: name '__file__' is not defined

In [None]:
# Data preprocessing 
def extract_frames(video_path, max_frames=0):
    """
    Extract frames from video file
    """
    frames = []
    cap = cv2.VideoCapture(video_path)
    
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
                
            # Resize and normalize frame
            frame = cv2.resize(frame, (config.IMG_WIDTH, config.IMG_HEIGHT))
            frame = frame / 255.0
            frames.append(frame)
            
            if 0 < max_frames <= len(frames):
                break
    finally:
        cap.release()
    
    return np.array(frames)

def create_sequences(frames, sequence_length):
    """
    Create sequences of frames from video
    """
    sequences = []
    for i in range(len(frames) - sequence_length + 1):
        sequence = frames[i:i + sequence_length]
        sequences.append(sequence)
    return np.array(sequences)

def load_dataset():
    """
    Load and preprocess the entire dataset
    """
    X = []
    y = []
    
    # Load accident videos
    for video_file in os.listdir(config.ACCIDENT_DIR):
        if video_file.endswith(('.mp4', '.avi', '.mov')):
            video_path = os.path.join(config.ACCIDENT_DIR, video_file)
            frames = extract_frames(video_path)
            sequences = create_sequences(frames, config.SEQUENCE_LENGTH)
            X.extend(sequences)
            y.extend([1] * len(sequences))  # 1 for accident
    
    # Load non-accident videos
    for video_file in os.listdir(config.NON_ACCIDENT_DIR):
        if video_file.endswith(('.mp4', '.avi', '.mov')):
            video_path = os.path.join(config.NON_ACCIDENT_DIR, video_file)
            frames = extract_frames(video_path)
            sequences = create_sequences(frames, config.SEQUENCE_LENGTH)
            X.extend(sequences)
            y.extend([0] * len(sequences))  # 0 for non-accident
    
    X = np.array(X)
    y = np.array(y)
    
    return X, y

def prepare_datasets(X, y):
    """
    Split dataset into train, validation and test sets
    """
    # First split into train+val and test
    X_train_val, X_test, y_train_val, y_test = train_test_split(
        X, y, test_size=config.TEST_SPLIT, random_state=42, stratify=y)
    
    # Then split train+val into train and val
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_val, y_train_val, test_size=config.VALIDATION_SPLIT, 
        random_state=42, stratify=y_train_val)
    
    return X_train, X_val, X_test, y_train, y_val, y_test

In [None]:
# Model architecture
# 3d cnn model 
def create_3d_cnn_model():
    """
    Create a 3D CNN model for spatiotemporal feature extraction
    """
    model = Sequential([
        # First 3D convolution layer
        Conv3D(32, (3, 3, 3), activation='relu', 
               input_shape=(config.SEQUENCE_LENGTH, config.IMG_HEIGHT, 
                           config.IMG_WIDTH, config.CHANNELS)),
        BatchNormalization(),
        MaxPooling3D((1, 2, 2)),
        Dropout(0.25),
        
        # Second 3D convolution layer
        Conv3D(64, (3, 3, 3), activation='relu'),
        BatchNormalization(),
        MaxPooling3D((1, 2, 2)),
        Dropout(0.25),
        
        # Third 3D convolution layer
        Conv3D(128, (3, 3, 3), activation='relu'),
        BatchNormalization(),
        MaxPooling3D((1, 2, 2)),
        Dropout(0.25),
        
        # Flatten and dense layers
        Flatten(),
        Dense(256, activation='relu'),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])
    
    optimizer = Adam(learning_rate=config.LEARNING_RATE)
    model.compile(optimizer=optimizer, 
                  loss='binary_crossentropy', 
                  metrics=['accuracy'])
    
    return model

In [None]:
# convlstm model 
def create_convlstm_model():
    """
    Create a ConvLSTM model for better temporal modeling
    """
    model = Sequential([
        # First ConvLSTM layer
        ConvLSTM2D(32, (3, 3), activation='tanh', recurrent_activation='hard_sigmoid',
                   return_sequences=True,
                   input_shape=(config.SEQUENCE_LENGTH, config.IMG_HEIGHT, 
                               config.IMG_WIDTH, config.CHANNELS)),
        BatchNormalization(),
        MaxPooling3D((1, 2, 2)),
        Dropout(0.25),
        
        # Second ConvLSTM layer
        ConvLSTM2D(64, (3, 3), activation='tanh', recurrent_activation='hard_sigmoid',
                   return_sequences=True),
        BatchNormalization(),
        MaxPooling3D((1, 2, 2)),
        Dropout(0.25),
        
        # Third ConvLSTM layer
        ConvLSTM2D(128, (3, 3), activation='tanh', recurrent_activation='hard_sigmoid',
                   return_sequences=False),
        BatchNormalization(),
        MaxPooling3D((1, 2, 2)),
        Dropout(0.25),
        
        # Flatten and dense layers
        Flatten(),
        Dense(256, activation='relu'),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])
    
    optimizer = Adam(learning_rate=config.LEARNING_RATE)
    model.compile(optimizer=optimizer, 
                  loss='binary_crossentropy', 
                  metrics=['accuracy'])
    
    return model

In [None]:
# training pipeline
def train_model():
    # Load and prepare dataset
    print("Loading dataset...")
    X, y = load_dataset()
    X_train, X_val, X_test, y_train, y_val, y_test = prepare_datasets(X, y)
    
    print(f"Training samples: {len(X_train)}")
    print(f"Validation samples: {len(X_val)}")
    print(f"Test samples: {len(X_test)}")
    
    # Create model (choose one)
    print("Creating model...")
    model = create_3d_cnn_model()  # or create_convlstm_model()
    model.summary()
    
    # Callbacks
    callbacks = [
        ModelCheckpoint(
            config.MODEL_SAVE_PATH,
            monitor='val_accuracy',
            save_best_only=True,
            mode='max',
            verbose=1
        ),
        EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-6
        )
    ]
    
    # Train model
    print("Training model...")
    history = model.fit(
        X_train, y_train,
        batch_size=config.BATCH_SIZE,
        epochs=config.EPOCHS,
        validation_data=(X_val, y_val),
        callbacks=callbacks,
        verbose=1
    )
    
    # Evaluate on test set
    print("Evaluating on test set...")
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test Accuracy: {test_acc*100:.2f}%")
    
    # Save training history
    pd.DataFrame(history.history).to_csv("training_history.csv", index=False)
    
    return model, history

def plot_training_history(history):
    """
    Plot training and validation accuracy and loss
    """
    plt.figure(figsize=(12, 4))
    
    # Plot accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig('training_history.png')
    plt.show()

In [None]:
# main exection
if __name__ == "__main__":
    # Create necessary directories
    os.makedirs(os.path.dirname(config.MODEL_SAVE_PATH), exist_ok=True)
    
    # Train the model
    trained_model, training_history = train_model()
    
    # Plot training history
    plot_training_history(training_history)
    
    # Save the final model
    trained_model.save(config.MODEL_SAVE_PATH)
    print(f"Model saved to {config.MODEL_SAVE_PATH}")

Loading dataset...


FileNotFoundError: [WinError 3] The system cannot find the path specified: 'dataset\\accident'

In [None]:
# data augmentation 
from tensorflow.keras.preprocessing.image import ImageDataGenerator

def create_augmentation_generator():
    """
    Create data generator with augmentation
    """
    datagen = ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.1,
        horizontal_flip=True,
        fill_mode='nearest'
    )
    return datagen

def augmented_fit(model, X_train, y_train, X_val, y_val):
    """
    Train model with augmented data
    """
    datagen = create_augmentation_generator()
    
    # Create sequence generator
    def sequence_generator(X, y, batch_size):
        while True:
            for i in range(0, len(X), batch_size):
                batch_X = X[i:i + batch_size]
                batch_y = y[i:i + batch_size]
                
                # Apply augmentation to each frame in each sequence
                augmented_batch = []
                for sequence in batch_X:
                    augmented_sequence = []
                    for frame in sequence:
                        # Expand dims to (1, h, w, c) for augmentation
                        augmented_frame = datagen.random_transform(frame)
                        augmented_sequence.append(augmented_frame)
                    augmented_batch.append(augmented_sequence)
                
                yield np.array(augmented_batch), batch_y
    
    # Calculate steps per epoch
    steps_per_epoch = len(X_train) // config.BATCH_SIZE
    
    # Train with generator
    history = model.fit(
        sequence_generator(X_train, y_train, config.BATCH_SIZE),
        steps_per_epoch=steps_per_epoch,
        epochs=config.EPOCHS,
        validation_data=(X_val, y_val),
        callbacks=callbacks,
        verbose=1
    )
    
    return history

In [None]:
# optical flow features 
def compute_optical_flow(prev_frame, next_frame):
    """
    Compute dense optical flow between two frames
    """
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    next_gray = cv2.cvtColor(next_frame, cv2.COLOR_BGR2GRAY)
    
    flow = cv2.calcOpticalFlowFarneback(
        prev_gray, next_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    
    # Convert flow to RGB representation
    hsv = np.zeros((config.IMG_HEIGHT, config.IMG_WIDTH, 3), dtype=np.float32)
    hsv[..., 1] = 1.0
    
    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    hsv[..., 0] = ang * (180 / np.pi / 2)
    hsv[..., 2] = cv2.normalize(mag, None, 0, 1, cv2.NORM_MINMAX)
    
    flow_rgb = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB)
    return flow_rgb

def extract_optical_flow_sequences(frames, sequence_length):
    """
    Create sequences with optical flow features
    """
    sequences = []
    for i in range(len(frames) - sequence_length):
        sequence = []
        for j in range(i, i + sequence_length - 1):
            flow = compute_optical_flow(frames[j], frames[j+1])
            sequence.append(flow)
        sequences.append(sequence)
    return np.array(sequences)