<a href="https://colab.research.google.com/github/sara23523/crash-Analyzer-/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install required packages
!pip install ultralytics opencv-python scikit-learn numpy matplotlib
!pip install scipy filterpy

# Import necessary libraries
import cv2
import numpy as np
from ultralytics import YOLO
from scipy.spatial.distance import cdist
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from filterpy.kalman import KalmanFilter
import matplotlib.pyplot as plt
from collections import defaultdict, deque
import os
from google.colab import files
import time

Collecting ultralytics
  Downloading ultralytics-8.3.200-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.17-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.200-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m28.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.17-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.200 ultralytics-thop-2.0.17
Collecting filterpy
  Downloading filterpy-1.4.5.zip (177 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.0/178.0 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: filterpy
  Building wheel for filterpy (setup.py) ... [?25l[?25hdone
  Created wheel for filterpy: filename=filterpy-1.4.5-py3-no

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
class KalmanTracker:
    """Kalman Filter for object tracking with position and velocity prediction"""

    def __init__(self, initial_bbox):
        self.kalman = KalmanFilter(dim_x=8, dim_z=4)

        # State vector: [x, y, w, h, vx, vy, vw, vh]
        self.kalman.x = np.array([initial_bbox[0], initial_bbox[1],
                                 initial_bbox[2], initial_bbox[3],
                                 0, 0, 0, 0], dtype=np.float32)

        # State transition matrix (constant velocity model)
        dt = 1.0
        self.kalman.F = np.array([
            [1, 0, 0, 0, dt, 0, 0, 0],
            [0, 1, 0, 0, 0, dt, 0, 0],
            [0, 0, 1, 0, 0, 0, dt, 0],
            [0, 0, 0, 1, 0, 0, 0, dt],
            [0, 0, 0, 0, 1, 0, 0, 0],
            [0, 0, 0, 0, 0, 1, 0, 0],
            [0, 0, 0, 0, 0, 0, 1, 0],
            [0, 0, 0, 0, 0, 0, 0, 1]
        ], dtype=np.float32)

        # Measurement matrix
        self.kalman.H = np.array([
            [1, 0, 0, 0, 0, 0, 0, 0],
            [0, 1, 0, 0, 0, 0, 0, 0],
            [0, 0, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 1, 0, 0, 0, 0]
        ], dtype=np.float32)

        # Process noise covariance
        self.kalman.Q *= 0.1

        # Measurement noise covariance
        self.kalman.R *= 10

        # Initial covariance
        self.kalman.P *= 100

        self.age = 0
        self.hits = 0
        self.hit_streak = 0
        self.time_since_update = 0

    def update(self, bbox):
        """Update the tracker with a new detection"""
        self.time_since_update = 0
        self.hits += 1
        self.hit_streak += 1
        self.kalman.update(np.array(bbox, dtype=np.float32))

    def predict(self):
        """Predict the next state"""
        if (self.kalman.x[6] + self.kalman.x[2]) <= 0:
            self.kalman.x[6] *= 0.0
        if (self.kalman.x[7] + self.kalman.x[3]) <= 0:
            self.kalman.x[7] *= 0.0

        self.kalman.predict()
        self.age += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1

        return self.get_state()

    def get_state(self):
        """Get current bounding box estimate"""
        return self.kalman.x[:4].copy()

In [None]:
class CentroidTracker:
    """Centroid-based object tracking"""

    def __init__(self, max_disappeared=50, max_distance=50):
        self.next_object_id = 0
        self.objects = {}
        self.disappeared = {}
        self.max_disappeared = max_disappeared
        self.max_distance = max_distance

    def register(self, centroid, bbox, class_name):
        """Register a new object"""
        self.objects[self.next_object_id] = {
            'centroid': centroid,
            'bbox': bbox,
            'class': class_name,
            'trajectory': deque(maxlen=30),
            'kalman': KalmanTracker(bbox)
        }
        self.objects[self.next_object_id]['trajectory'].append(centroid)
        self.disappeared[self.next_object_id] = 0
        self.next_object_id += 1

    def deregister(self, object_id):
        """Remove an object from tracking"""
        del self.objects[object_id]
        del self.disappeared[object_id]

    def update(self, detections):
        """Update tracked objects with new detections"""
        if len(detections) == 0:
            # Iterate over a copy of keys to allow modification during iteration
            for object_id in list(self.disappeared.keys()):
                self.disappeared[object_id] += 1
                if self.disappeared[object_id] > self.max_disappeared:
                    self.deregister(object_id)
            return self.objects

        input_centroids = []
        input_bboxes = []
        input_classes = []

        for detection in detections:
            bbox, class_name, confidence = detection
            cx = int((bbox[0] + bbox[2]) / 2.0)
            cy = int((bbox[1] + bbox[3]) / 2.0)
            input_centroids.append((cx, cy))
            input_bboxes.append(bbox)
            input_classes.append(class_name)

        if len(self.objects) == 0:
            for i in range(len(input_centroids)):
                self.register(input_centroids[i], input_bboxes[i], input_classes[i])
        else:
            object_centroids = [obj['centroid'] for obj in self.objects.values()]
            D = cdist(np.array(object_centroids), input_centroids)

            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]

            used_row_idxs = set()
            used_col_idxs = set()

            for (row, col) in zip(rows, cols):
                if row in used_row_idxs or col in used_col_idxs:
                    continue

                if D[row, col] > self.max_distance:
                    continue

                object_id = list(self.objects.keys())[row]
                self.objects[object_id]['centroid'] = input_centroids[col]
                self.objects[object_id]['bbox'] = input_bboxes[col]
                self.objects[object_id]['class'] = input_classes[col]
                self.objects[object_id]['trajectory'].append(input_centroids[col])
                self.objects[object_id]['kalman'].update(input_bboxes[col])
                self.disappeared[object_id] = 0

                used_row_idxs.add(row)
                used_col_idxs.add(col)

            unused_row_idxs = set(range(0, D.shape[0])).difference(used_row_idxs)
            unused_col_idxs = set(range(0, D.shape[1])).difference(used_col_idxs)

            if D.shape[0] >= D.shape[1]:
                # Iterate over a copy of keys to allow modification during iteration
                object_ids_to_process = list(self.objects.keys())
                for row in unused_row_idxs:
                    # Ensure the index 'row' is still valid for the current list of keys
                    if row < len(object_ids_to_process):
                        object_id = object_ids_to_process[row]
                        self.disappeared[object_id] += 1

                        if self.disappeared[object_id] > self.max_disappeared:
                            self.deregister(object_id)
            else:
                for col in unused_col_idxs:
                    self.register(input_centroids[col], input_bboxes[col], input_classes[col])

        # Update Kalman predictions for all objects
        # Iterate over a copy of keys for safety
        for obj_id in list(self.objects.keys()):
            # Check if object still exists before predicting (in case it was deregistered in the same update loop)
            if obj_id in self.objects:
                predicted_bbox = self.objects[obj_id]['kalman'].predict()


        return self.objects

In [None]:
class OpticalFlowAnalyzer:
    """Optical Flow computation and analysis"""

    def __init__(self):
        self.prev_gray = None
        self.flow_history = deque(maxlen=10)

    def compute_optical_flow(self, frame):
        """Compute dense optical flow using Farneback method"""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        if self.prev_gray is not None:
            flow = cv2.calcOpticalFlowPyrLK(
                self.prev_gray, gray, None, None,
                winSize=(15, 15),
                maxLevel=2,
                criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
            )

            # Dense optical flow
            dense_flow = cv2.calcOpticalFlowFarneback(
                self.prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
            )

            # Compute flow magnitude and angle
            magnitude, angle = cv2.cartToPolar(dense_flow[..., 0], dense_flow[..., 1])

            # Store flow statistics
            flow_stats = {
                'mean_magnitude': np.mean(magnitude),
                'max_magnitude': np.max(magnitude),
                'flow_variance': np.var(magnitude),
                'dense_flow': dense_flow
            }

            self.flow_history.append(flow_stats)
            self.prev_gray = gray

            return dense_flow, magnitude, angle, flow_stats
        else:
            self.prev_gray = gray
            return None, None, None, None

    def get_flow_features(self):
        """Extract features for collision detection"""
        if len(self.flow_history) < 2:
            return None

        recent_flows = list(self.flow_history)[-5:]  # Last 5 frames

        features = []
        for flow_data in recent_flows:
            features.extend([
                flow_data['mean_magnitude'],
                flow_data['max_magnitude'],
                flow_data['flow_variance']
            ])

        # Pad if necessary
        while len(features) < 15:  # 5 frames * 3 features
            features.append(0.0)

        return np.array(features)

In [None]:
class CollisionDetector:
    """SVM-based collision detection system"""

    def __init__(self):
        self.svm_model = None
        self.scaler = StandardScaler()
        self.is_trained = False

    def generate_synthetic_collision_data_real(self, n_samples):
        normal_features = []
        collision_features = []
        fps = 30

        for _ in range(n_samples // 2):
            # Normal traffic
            base_flow = np.random.normal(loc=1.5, scale=0.5)         # ~1-2
            speed_var = np.random.normal(loc=1.2, scale=0.4)
            dir_change = np.random.normal(loc=0.1, scale=0.05) # This feature is not used in OpticalFlowAnalyzer

            sample = []
            for t in range(5):
                growth = 1.0 + (t / 10) * np.random.normal(loc=0.1, scale=0.05)  # slow growth
                sample.extend([
                    (base_flow * growth) + np.random.normal(0, 0.3), # Mean magnitude
                    (base_flow * growth * 1.1) + np.random.normal(0, 0.3), # Max magnitude
                    speed_var + np.random.normal(0, 0.3), # Flow variance
                    # dir_change + np.random.normal(0, 0.05) # This feature is not used in OpticalFlowAnalyzer
                ])
            normal_features.append(sample)

        for _ in range(n_samples // 2):
            # Collision / Looming
            base_flow = np.random.normal(loc=6.0, scale=1.5)         # ~4-8
            speed_var = np.random.normal(loc=3.0, scale=1.0)
            dir_change = np.random.normal(loc=0.5, scale=0.2) # This feature is not used in OpticalFlowAnalyzer

            sample = []
            for t in range(5):
                growth = 1.0 + (t / 5) * np.random.normal(loc=0.5, scale=0.2)   # rapid growth
                sample.extend([
                    (base_flow * growth) + np.random.normal(0, 1.0), # Mean magnitude
                    (base_flow * growth * 1.5) + np.random.normal(0, 1.0), # Max magnitude
                    speed_var + np.random.normal(0, 0.7), # Flow variance
                    # dir_change + np.random.normal(0, 0.2) # This feature is not used in OpticalFlowAnalyzer
                ])
            collision_features.append(sample)

        X = np.array(normal_features + collision_features)
        y = np.array([0]*len(normal_features) + [1]*len(collision_features))
        return X, y


    def train_model(self):
        """Train the SVM collision detection model"""
        X, y = self.generate_synthetic_collision_data_real(n_samples=1000)

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # Scale features
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        # Train SVM
        self.svm_model = SVC(kernel='rbf', probability=True, random_state=42)
        self.svm_model.fit(X_train_scaled, y_train)

        # Evaluate
        train_score = self.svm_model.score(X_train_scaled, y_train)
        test_score = self.svm_model.score(X_test_scaled, y_test)

        print(f"Training Accuracy: {train_score:.3f}")
        print(f"Test Accuracy: {test_score:.3f}")

        self.is_trained = True

    def predict_collision(self, features):
        """Predict collision probability"""
        if not self.is_trained or features is None:
            return 0.0, 0.0

        features_scaled = self.scaler.transform([features])
        prediction = self.svm_model.predict(features_scaled)[0]
        probability = self.svm_model.predict_proba(features_scaled)[0]

        return prediction, probability[1]  # Return probability of collision

In [None]:
class OpticalFlowAnalyzer:
    """Optical Flow computation and analysis"""

    def __init__(self):
        self.prev_gray = None
        self.flow_history = deque(maxlen=10)

    def compute_optical_flow(self, frame):
        """Compute dense optical flow using Farneback method"""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        if self.prev_gray is not None:
            # Dense optical flow
            dense_flow = cv2.calcOpticalFlowFarneback(
                self.prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
            )

            # Compute flow magnitude and angle
            magnitude, angle = cv2.cartToPolar(dense_flow[..., 0], dense_flow[..., 1])

            # Store flow statistics
            flow_stats = {
                'mean_magnitude': np.mean(magnitude),
                'max_magnitude': np.max(magnitude),
                'flow_variance': np.var(magnitude),
                'dense_flow': dense_flow
            }

            self.flow_history.append(flow_stats)
            self.prev_gray = gray

            return dense_flow, magnitude, angle, flow_stats
        else:
            self.prev_gray = gray
            return None, None, None, None

    def get_flow_features(self):
        """Extract features for collision detection"""
        if len(self.flow_history) < 2:
            return None

        recent_flows = list(self.flow_history)[-5:]  # Last 5 frames

        features = []
        for flow_data in recent_flows:
            features.extend([
                flow_data['mean_magnitude'],
                flow_data['max_magnitude'],
                flow_data['flow_variance']
            ])

        # Pad if necessary
        while len(features) < 15:  # 5 frames * 3 features
            features.append(0.0)

        return np.array(features)

In [None]:
class TrafficAnalysisSystem:
    """Complete traffic analysis and collision detection system"""

    def __init__(self):
        self.yolo_model = YOLO('yolov8n.pt')  # Download model automatically
        self.centroid_tracker = CentroidTracker()
        self.optical_flow = OpticalFlowAnalyzer()
        self.collision_detector = CollisionDetector()

        # Target classes for detection
        self.target_classes = ['person', 'car', 'bus', 'truck', 'motorcycle', 'bicycle']

        # Train collision detection model
        print("Training collision detection model...")
        self.collision_detector.train_model()
        print("Model training completed!")

    def process_detections(self, results, frame_shape):
        """Process YOLO detections"""
        detections = []

        for result in results:
            boxes = result.boxes
            if boxes is not None:
                for box in boxes:
                    cls_id = int(box.cls[0])
                    class_name = self.yolo_model.names[cls_id]

                    if class_name in self.target_classes:
                        confidence = float(box.conf[0])
                        if confidence > 0.5:  # Confidence threshold
                            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                            bbox = [int(x1), int(y1), int(x2), int(y2)]
                            detections.append((bbox, class_name, confidence))

        return detections

    def draw_tracking_info(self, frame, tracked_objects, flow_stats, collision_prob):
        """Draw tracking information on frame, highlighting high-risk objects"""
        COLLISION_THRESHOLD = 0.7  # threshold for high collision risk

        for obj_id, obj_info in tracked_objects.items():
            bbox = obj_info['bbox']
            centroid = obj_info['centroid']
            class_name = obj_info['class']
            trajectory = obj_info['trajectory']

            # Highlight in red if high collision probability
            if collision_prob >= COLLISION_THRESHOLD:
                color = (0, 0, 255)  # Red
                label = f"{class_name} ID:{obj_id} RISK!"
            else:
                color = (0, 255, 0)  # Green
                label = f"{class_name} ID:{obj_id}"

            # Draw bounding box
            cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)

            # Draw centroid
            cv2.circle(frame, centroid, 5, (255, 0, 0), -1)

            # Draw trajectory
            if len(trajectory) > 1:
                trajectory_points = np.array(trajectory, dtype=np.int32)
                cv2.polylines(frame, [trajectory_points], False, color, 2)

            # Calculate speed
            if len(trajectory) >= 2:
                dx = trajectory[-1][0] - trajectory[-2][0]
                dy = trajectory[-1][1] - trajectory[-2][1]
                speed = np.sqrt(dx**2 + dy**2)
                cv2.putText(frame, f"{label} Speed:{speed:.1f}",
                            (bbox[0], bbox[1]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # Flow info
        if flow_stats:
            flow_text = f"Flow Mag: {flow_stats['mean_magnitude']:.2f}"
            cv2.putText(frame, flow_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)

        # Global collision warning
        if collision_prob >= COLLISION_THRESHOLD:
            cv2.putText(frame, "COLLISION WARNING!", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
            cv2.putText(frame, f"Probability: {collision_prob:.2f}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

        return frame


    def process_video(self, input_path, output_path):
        """Process entire video"""
        cap = cv2.VideoCapture(input_path)

        # Get video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        frame_count = 0
        collision_detections = []

        print(f"Processing {total_frames} frames...")

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1

            # YOLO detection
            results = self.yolo_model(frame, verbose=False)
            detections = self.process_detections(results, frame.shape)

            # Update tracking
            tracked_objects = self.centroid_tracker.update(detections)

            # Compute optical flow
            dense_flow, magnitude, angle, flow_stats = self.optical_flow.compute_optical_flow(frame)

            # Get flow features for collision detection
            flow_features = self.optical_flow.get_flow_features()
            collision_pred, collision_prob = self.collision_detector.predict_collision(flow_features)

            if collision_prob > 0.7:
                collision_detections.append((frame_count, collision_prob))

            # Draw tracking information
            processed_frame = self.draw_tracking_info(
                frame, tracked_objects, flow_stats, collision_prob
            )

            # Write frame
            out.write(processed_frame)

            if frame_count % 30 == 0:  # Progress update every 30 frames
                progress = (frame_count / total_frames) * 100
                print(f"Progress: {progress:.1f}% ({frame_count}/{total_frames})")

        cap.release()
        out.release()

        print(f"Video processing completed!")
        print(f"Collision detections: {len(collision_detections)}")

        return collision_detections

In [None]:
# Initialize the system
print("Initializing Traffic Analysis System...")
traffic_system = TrafficAnalysisSystem()

# Upload video file
print("Please upload your video file:")
uploaded = files.upload()

# Get the uploaded filename
input_video_path = list(uploaded.keys())[0]
output_video_path = "processed_" + input_video_path

print(f"Processing video: {input_video_path}")
print("This may take several minutes depending on video length...")

# Process the video
collision_detections = traffic_system.process_video(input_video_path, output_video_path)

print(f"Processing completed!")
print(f"Output video saved as: {output_video_path}")

# Download the processed video
print("Downloading processed video...")
files.download(output_video_path)

# Generate and download collision report
if collision_detections:
    report_path = "collision_report.txt"
    with open(report_path, 'w') as f:
        f.write("Collision Detection Report\n")
        f.write("=" * 30 + "\n\n")

        for frame_num, probability in collision_detections:
            timestamp = frame_num / 30  # Assuming 30 FPS
            f.write(f"Frame {frame_num} (Time: {timestamp:.1f}s) - ")
            f.write(f"Collision Probability: {probability:.3f}\n")

    files.download(report_path)
    print("Collision report downloaded!")
else:
    print("No high-probability collision events detected.")

Initializing Traffic Analysis System...
Training collision detection model...
Training Accuracy: 0.999
Test Accuracy: 1.000
Model training completed!
Please upload your video file:


Saving VID-20250828-WA0003.mp4 to VID-20250828-WA0003 (1).mp4
Processing video: VID-20250828-WA0003 (1).mp4
This may take several minutes depending on video length...
Processing 97 frames...
Progress: 30.9% (30/97)
Progress: 61.9% (60/97)
Progress: 92.8% (90/97)
Video processing completed!
Collision detections: 92
Processing completed!
Output video saved as: processed_VID-20250828-WA0003 (1).mp4
Downloading processed video...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Collision report downloaded!
