In [None]:
import cv2
import numpy as np
import math
import time
import matplotlib
matplotlib.use('Agg')

In [None]:
cv2.setNumThreads(4)

cap = cv2.VideoCapture("video.mkv")
input_size = 320

confThreshold = 0.2
nmsThreshold = 0.2

font_color = (0, 0, 255)
font_size = 0.5
font_thickness = 2


In [None]:
#  need to be adjusted based on video
PIXELS_PER_METER = 4  
FPS = 30 
SPEED_MULTIPLIER = 6.0 

In [None]:
classesFile = "coco.names"
classNames = open(classesFile).read().strip().split('\n')


In [None]:
required_class_index = [2, 3, 5, 7]

detected_classNames = []

modelConfiguration = 'yolov3-320.cfg'
modelWeigheights = 'yolov3-320.weights'




In [None]:
net = cv2.dnn.readNetFromDarknet(modelConfiguration, modelWeigheights)

In [None]:
np.random.seed(42)
colors = np.random.randint(0, 255, size=(len(classNames), 3), dtype='uint8')

In [None]:
def find_center(x, y, w, h):
    x1 = int(w / 2)
    y1 = int(h / 2)
    cx = x + x1
    cy = y + y1
    return cx, cy


In [None]:
class tracker:
    def __init__(self):
        self.id_count = 0
        self.center_points = {}
        self.disappeared = {}
        self.max_disappeared = 10
    
    def update(self, objects_rect):
        if len(objects_rect) == 0:
            for object_id in list(self.disappeared.keys()):
                self.disappeared[object_id] += 1
                if self.disappeared[object_id] > self.max_disappeared:
                    self.deregister(object_id)
            return []
        
        objects_bbs_ids = []
        
        if len(self.center_points) == 0:
            for rect in objects_rect:
                x, y, w, h, index = rect
                cx = (x + x + w) // 2
                cy = (y + y + h) // 2
                self.center_points[self.id_count] = (cx, cy)
                objects_bbs_ids.append([x, y, w, h, self.id_count, index])
                self.id_count += 1
        else:
            input_centroids = []
            for rect in objects_rect:
                x, y, w, h, index = rect
                cx = (x + x + w) // 2
                cy = (y + y + h) // 2
                input_centroids.append((cx, cy, x, y, w, h, index))
            
            object_ids = list(self.center_points.keys())
            object_centroids = list(self.center_points.values())
            
            D = np.linalg.norm(np.array(object_centroids)[:, np.newaxis] - np.array([(c[0], c[1]) for c in input_centroids]), axis=2)
            
            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]
            
            used_row_indices = set()
            used_col_indices = set()
            
            for (row, col) in zip(rows, cols):
                if row in used_row_indices or col in used_col_indices:
                    continue
                
                if D[row, col] <= 50:  # max distance threshold
                    object_id = object_ids[row]
                    self.center_points[object_id] = (input_centroids[col][0], input_centroids[col][1])
                    objects_bbs_ids.append([input_centroids[col][2], input_centroids[col][3], 
                                          input_centroids[col][4], input_centroids[col][5], 
                                          object_id, input_centroids[col][6]])
                    
                    used_row_indices.add(row)
                    used_col_indices.add(col)
                    
                    if object_id in self.disappeared:
                        del self.disappeared[object_id]
            
            unused_row_indices = set(range(0, D.shape[0])) - used_row_indices
            unused_col_indices = set(range(0, D.shape[1])) - used_col_indices
            
            if D.shape[0] >= D.shape[1]:
                for row in unused_row_indices:
                    object_id = object_ids[row]
                    self.disappeared[object_id] = self.disappeared.get(object_id, 0) + 1
                    if self.disappeared[object_id] > self.max_disappeared:
                        self.deregister(object_id)
            else:
                for col in unused_col_indices:
                    cx, cy, x, y, w, h, index = input_centroids[col]
                    self.center_points[self.id_count] = (cx, cy)
                    objects_bbs_ids.append([x, y, w, h, self.id_count, index])
                    self.id_count += 1
        
        return objects_bbs_ids
    
    def deregister(self, object_id):
        del self.center_points[object_id]
        if object_id in self.disappeared:
            del self.disappeared[object_id]

In [None]:
class detect:
    def __init__(self):
        self.boxes = []
        self.classIds = []
        self.confidence_scores = []
        self.detection = []
        self.track = tracker()
        self.prev_positions = {}
        self.prev_time = {}
        self.speed_history = {}  
        self.position_history = {}  
        self.frame_count = 0
        
    def calculate_speed_kmh(self, object_id, current_pos, current_time, frame_height):
        if object_id not in self.prev_positions or object_id not in self.prev_time:
            self.prev_positions[object_id] = current_pos
            self.prev_time[object_id] = current_time
            if object_id not in self.position_history:
                self.position_history[object_id] = []
            return 0
        
        prev_pos = self.prev_positions[object_id]
        prev_time = self.prev_time[object_id]
        
        if object_id not in self.position_history:
            self.position_history[object_id] = []
        self.position_history[object_id].append(current_pos)
        
        if len(self.position_history[object_id]) > 10:
            self.position_history[object_id] = self.position_history[object_id][-10:]
        
        distance_pixels = math.sqrt((current_pos[0] - prev_pos[0])**2 + (current_pos[1] - prev_pos[1])**2)
        
        y_position = current_pos[1]
        perspective_factor = 1.0 + (y_position / frame_height) * 0.5 
        distance_pixels *= perspective_factor
        
        time_diff = current_time - prev_time
        
        if time_diff <= 0 or distance_pixels < 2:  
            return self.get_smoothed_speed(object_id, 0)
        
        distance_meters = distance_pixels / PIXELS_PER_METER
        speed_ms = distance_meters / time_diff
        speed_kmh = speed_ms * 3.6 * SPEED_MULTIPLIER
        
        speed_kmh = max(0, min(speed_kmh, 120))
        
        self.prev_positions[object_id] = current_pos
        self.prev_time[object_id] = current_time
        
        return self.get_smoothed_speed(object_id, speed_kmh)
    
    def get_smoothed_speed(self, object_id, new_speed):
        """Apply heavy smoothing to reduce speed fluctuations"""
        if object_id not in self.speed_history:
            self.speed_history[object_id] = []
        
        if new_speed > 0:
            self.speed_history[object_id].append(new_speed)
        
        if len(self.speed_history[object_id]) > 15:
            self.speed_history[object_id] = self.speed_history[object_id][-15:]
        
        if len(self.speed_history[object_id]) == 0:
            return 0
        
        speeds = self.speed_history[object_id]
        if len(speeds) >= 3:
            median_speed = sorted(speeds)[len(speeds)//2]
            filtered_speeds = [s for s in speeds if abs(s - median_speed) < median_speed * 0.4]
            if filtered_speeds:
                speeds = filtered_speeds
        
        weights = [i + 1 for i in range(len(speeds))]
        weighted_avg = sum(s * w for s, w in zip(speeds, weights)) / sum(weights)
        
        return weighted_avg
        
    def postProcess(self, outputs, img):
        global detected_classNames
        
        self.boxes = []
        self.classIds = []
        self.confidence_scores = []
        self.detection = []
        
        height, width = img.shape[:2]
        current_time = time.time()
        self.frame_count += 1

        for output in outputs:
            for det in output:
                scores = det[5:]
                self.classId = np.argmax(scores)
                confidence = scores[self.classId]
                if self.classId in required_class_index:
                    if confidence > confThreshold:
                        w, h = int(det[2] * width), int(det[3] * height)
                        x, y = int((det[0] * width) - w / 2), int((det[1] * height) - h / 2)
                        self.boxes.append([x, y, w, h])
                        self.classIds.append(self.classId)
                        self.confidence_scores.append(float(confidence))

        # non-max suppression
        if len(self.boxes) > 0:
            indices = cv2.dnn.NMSBoxes(self.boxes, self.confidence_scores, confThreshold, nmsThreshold)
            
            if len(indices) > 0:
                for i in indices.flatten():
                    x, y, w, h = self.boxes[i][0], self.boxes[i][1], self.boxes[i][2], self.boxes[i][3]
                    self.detection.append([x, y, w, h, required_class_index.index(self.classIds[i])])

        if len(self.detection) > 0:
            boxes_ids = self.track.update(self.detection)
            
            for box_id in boxes_ids:
                x, y, w, h, object_id, index = box_id
                
                cx, cy = find_center(x, y, w, h)
                
                speed_kmh = self.calculate_speed_kmh(object_id, (cx, cy), current_time, height)
                
                color = [int(c) for c in colors[self.classIds[index] if index < len(self.classIds) else 0]]
                name = classNames[self.classIds[index] if index < len(self.classIds) else 0]
                
                # red if > 80 km/h
                speed_color = (0, 0, 255) if speed_kmh > 80 else (0, 255, 0)
                
                cv2.rectangle(img, (x, y), (x + w, y + h), color, 2)
                
                cv2.putText(img, f'{name.upper()} {int(self.confidence_scores[index] * 100)}%' if index < len(self.confidence_scores) else f'{name.upper()}',
                           (x, y - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
                
                if speed_kmh > 10 and len(self.speed_history.get(object_id, [])) >= 3:
                    cv2.putText(img, f"Speed: {speed_kmh:.0f} km/h", (x, y - 10), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, speed_color, 1)
                
                cv2.putText(img, f"ID: {object_id}", (x, y + h + 20), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1)

    def processVideo(self, output_path="video.mp4"):
        """Process video and save output without displaying"""
        global FPS, PIXELS_PER_METER
        
        FPS = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        PIXELS_PER_METER = max(2, width // 400) 
        
        new_width = int(width * 0.5)
        new_height = int(height * 0.5)
        
        print(f"Processing video: {width}x{height} -> {new_width}x{new_height}")
        print(f"FPS: {FPS}, Total frames: {total_frames}")
        print(f"Estimated pixels per meter: {PIXELS_PER_METER}")
        print(f"Speed multiplier: {SPEED_MULTIPLIER}")
        print("Note: Adjust PIXELS_PER_METER and SPEED_MULTIPLIER if speeds seem unrealistic")
        
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, FPS, (new_width, new_height))
        
        frame_count = 0
        
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                
                frame = cv2.resize(frame, (new_width, new_height))
                
                blob = cv2.dnn.blobFromImage(frame, 1 / 255, (input_size, input_size), [0, 0, 0], 1, crop=False)
                
                net.setInput(blob)
                layersNames = net.getLayerNames()
                outputNames = [layersNames[i - 1] for i in net.getUnconnectedOutLayers()]
                outputs = net.forward(outputNames)
                
                self.postProcess(outputs, frame)
                
                out.write(frame)
                
                frame_count += 1
                if frame_count % 30 == 0: 
                    print(f"Processed {frame_count}/{total_frames} frames ({frame_count/total_frames*100:.1f}%)")
                
        except Exception as e:
            print(f"Error during video processing: {e}")
        
        finally:
            cap.release()
            out.release()
            print(f"Video processing complete! Output saved to: {output_path}")
            print(f"Total frames processed: {frame_count}")


In [None]:
detector = detect()
detector.processVideo("output.mp4")