In [1]:
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

In [3]:
import os
import pandas as pd
from datetime import datetime
import numpy as np
import torch
from time import time
import cv2
from ultralytics.solutions.solutions import BaseSolution
from ultralytics.utils.plotting import Annotator, colors

class ObjectCounter(BaseSolution):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.in_count = 0
        self.out_count = 0
        self.counted_ids = []
        self.saved_ids = []
        self.classwise_counts = {}
        self.region_initialized = False
        self.spd = {}
        self.trkd_ids = []
        self.trk_pt = {}
        self.trk_pp = {}
        self.show_in = self.CFG.get("show_in", True)
        self.show_out = self.CFG.get("show_out", True)
        self.frame_skip = 2
        
        # New parameters for speed calculation
        self.real_world_distance = kwargs.get('real_world_distance', 20)  # Distance in meters between region points
        self.fps = 30 // self.frame_skip  # Adjusted FPS accounting for frame skip
        
        # Initialize CSV data storage
        self.csv_filename = self.get_daily_filename()
        self.create_csv()

    def get_daily_filename(self):
        """Generate a filename based on the current date."""
        current_date = datetime.now().strftime("%Y-%m-%d")
        filename = f"vehicle_count_data_{current_date}.csv"
        return filename

    def create_csv(self):
        """Create the CSV file with proper headers if it doesn't exist."""
        if not os.path.exists(self.csv_filename):
            header = ["Track ID", "Label", "Action", "Speed (km/h)", "Class", "Date", "Time"]
            df = pd.DataFrame(columns=header)
            df.to_csv(self.csv_filename, index=False)
            print(f"CSV file created: {self.csv_filename} with headers.")

    def save_label_to_file(self, track_id, label, action, speed, class_name):
        """Save the label, track_id, action, speed, date, time, and class name to a CSV file."""
        if isinstance(speed, torch.Tensor):
            speed = speed.item()
        elif isinstance(speed, np.ndarray):
            speed = speed.item()

        speed = int(round(speed))

        current_time = datetime.now()
        current_date = current_time.date()
        current_time_str = current_time.strftime("%H:%M:%S")

        data = {
            "Track ID": track_id,
            "Label": label,
            "Action": action,
            "Speed (km/h)": speed,
            "Class": class_name,
            "Date": current_date,
            "Time": current_time_str
        }

        df = pd.DataFrame([data])
        df.to_csv(self.csv_filename, mode='a', header=False, index=False)
        print(f"Data for track_id {track_id} saved to CSV file {self.csv_filename}.")

    def calculate_perspective_factor(self, y_position):
        """Calculate perspective correction factor based on y-position in frame."""
        # Get the y-coordinate of the detection line
        line_y = self.region[0][1]  # Assuming horizontal line
        
        # Calculate distance from the detection line
        distance_from_line = abs(y_position - line_y)
        
        # Define the range where we want to maintain consistent speed
        max_distance = self.im0.shape[0] * 0.4  # 40% of frame height
        
        if distance_from_line <= max_distance:
            # Within the stable range - maintain consistent factor
            return 1.0
        else:
            # Beyond stable range - gradual adjustment
            excess_distance = distance_from_line - max_distance
            max_adjustment = 0.3  # Maximum 30% adjustment
            adjustment = min(excess_distance / (self.im0.shape[0] * 0.2), 1.0) * max_adjustment
            
            # If above the line, increase factor; if below, decrease factor
            if y_position < line_y:
                return 1.0 + adjustment
            else:
                return 1.0 - adjustment

    def calculate_speed(self, track_id, current_position, previous_position, time_difference):
        """Calculate speed with perspective correction and real-world distance calibration."""
        if time_difference <= 0:
            return 0
            
        # Get pixel distance
        pixel_distance = np.sqrt(
            (current_position[0] - previous_position[0]) ** 2 +
            (current_position[1] - previous_position[1]) ** 2
        )
        
        # Apply perspective correction
        perspective_factor = self.calculate_perspective_factor(current_position[1])
        corrected_pixel_distance = pixel_distance * perspective_factor
        
        # Convert pixel distance to real-world distance
        region_pixel_distance = np.sqrt(
            (self.region[1][0] - self.region[0][0]) ** 2 +
            (self.region[1][1] - self.region[0][1]) ** 2
        )
        meters_per_pixel = self.real_world_distance / region_pixel_distance
        real_distance = corrected_pixel_distance * meters_per_pixel
        
        # Calculate speed in km/h
        speed = (real_distance / time_difference) * 3.6  # Convert m/s to km/h
        
        # Apply stronger smoothing using moving average
        if track_id in self.spd:
            # More weight to previous speed for stability
            speed = 0.85 * self.spd[track_id] + 0.15 * speed  # Weighted average
            
            # Additional stabilization: limit speed change
            max_change = 10  # Maximum speed change in km/h
            prev_speed = self.spd[track_id]
            if abs(speed - prev_speed) > max_change:
                if speed > prev_speed:
                    speed = prev_speed + max_change
                else:
                    speed = prev_speed - max_change
            
        return min(max(speed, 0), 150)  # Cap speed between 0 and 150 km/h

    def count_objects(self, current_centroid, track_id, prev_position, cls):
        """Count objects and update file based on centroid movements."""
        if prev_position is None or track_id in self.counted_ids:
            return

        action = None
        speed = None

        if len(self.region) == 2:  # Handle linear region counting
            line = self.LineString(self.region)
            if line.intersects(self.LineString([prev_position, current_centroid])):
                if abs(self.region[0][0] - self.region[1][0]) < abs(self.region[0][1] - self.region[1][1]):
                    if current_centroid[0] > prev_position[0]:
                        self.in_count += 1
                        self.classwise_counts[self.names[cls]]["IN"] += 1
                        action = "IN"
                    else:
                        self.out_count += 1
                        self.classwise_counts[self.names[cls]]["OUT"] += 1
                        action = "OUT"
                else:
                    if current_centroid[1] > prev_position[1]:
                        self.in_count += 1
                        self.classwise_counts[self.names[cls]]["IN"] += 1
                        action = "IN"
                    else:
                        self.out_count += 1
                        self.classwise_counts[self.names[cls]]["OUT"] += 1
                        action = "OUT"
                self.counted_ids.append(track_id)

        elif len(self.region) > 2:  # Handle polygonal region counting
            polygon = self.Polygon(self.region)
            if polygon.contains(self.Point(current_centroid)):
                if current_centroid[0] > prev_position[0]:
                    self.in_count += 1
                    self.classwise_counts[self.names[cls]]["IN"] += 1
                    action = "IN"
                else:
                    self.out_count += 1
                    self.classwise_counts[self.names[cls]]["OUT"] += 1
                    action = "OUT"
                self.counted_ids.append(track_id)

        if action:
            label = f"{self.names[cls]} ID: {track_id}"
            speed = self.spd.get(track_id, 0)
            self.save_label_to_file(track_id, label, action, speed, self.names[cls])

    def store_classwise_counts(self, cls):
        """Initialize count dictionary for a given class."""
        if self.names[cls] not in self.classwise_counts:
            self.classwise_counts[self.names[cls]] = {"IN": 0, "OUT": 0}

    def display_counts(self, im0):
        """Display the counts and actions on the image."""
        labels_dict = {
            str.capitalize(key): f"{'IN ' + str(value['IN']) if self.show_in else ''} "
                                f"{'OUT ' + str(value['OUT']) if self.show_out else ''}".strip()
            for key, value in self.classwise_counts.items()
            if value["IN"] != 0 or value["OUT"] != 0
        }

        if labels_dict:
            self.annotator.display_analytics(im0, labels_dict, (104, 31, 17), (255, 255, 255), 10)

    def count(self, im0):
        """Main counting function with improved speed calculation."""
        self.im0 = im0  # Store reference to current frame
        
        if not self.region_initialized:
            self.initialize_region()
            self.region_initialized = True

        self.annotator = Annotator(im0, line_width=self.line_width)
        self.extract_tracks(im0)
        self.annotator.draw_region(reg_pts=self.region, color=(104, 0, 123), thickness=self.line_width * 2)

        for box, track_id, cls in zip(self.boxes, self.track_ids, self.clss):
            self.store_tracking_history(track_id, box)
            self.store_classwise_counts(cls)
            
            current_position = ((box[0] + box[2]) / 2, (box[1] + box[3]) / 2)
            
            if track_id in self.trk_pt and track_id in self.trk_pp:
                time_difference = time() - self.trk_pt[track_id]
                if time_difference > 0:
                    speed = self.calculate_speed(
                        track_id,
                        current_position,
                        self.trk_pp[track_id],
                        time_difference
                    )
                    self.spd[track_id] = speed

            self.trk_pt[track_id] = time()
            self.trk_pp[track_id] = current_position

            # Display speed and draw annotations
            speed_label = f"{int(self.spd.get(track_id, 0))} km/h" if track_id in self.spd else self.names[int(cls)]
            combine_label = f"{speed_label}, ID: {track_id}"
            self.annotator.box_label(box, label=combine_label, color=(255, 0, 0))
            self.annotator.draw_centroid_and_tracks(
                self.track_line,
                color=colors(int(track_id), True),
                track_thickness=self.line_width
            )

            # Count objects
            prev_position = self.track_history[track_id][-2] if len(self.track_history[track_id]) > 1 else None
            self.count_objects(current_position, track_id, prev_position, cls)

        self.display_counts(im0)
        return im0

def main():
    # Initialize video capture
    cap = cv2.VideoCapture('video2.mp4')
    
    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Scale region points according to video dimensions
    scale_x = frame_width / 1020
    scale_y = frame_height / 500
    region_points = [
        (int(135 * scale_x), int(350 * scale_y)),
        (int(676 * scale_x), int(350 * scale_y))
    ]

    # Initialize object counter
    counter = ObjectCounter(
        region=region_points,
        model="C:/Users/DELL/Desktop/only traffic project/yolov8/yolov8x.pt",  # Update path to your model
        classes=[2, 5, 7],
        show_in=True,
        show_out=True,
        line_width=2,
        real_world_distance=20  # Set this to actual distance in meters between region points
    )

    # Set up video writer
    output_filename = "video2OUTPUT2.mp4"
    codecs = [
        ('avc1', '.mp4'),
        ('mp4v', '.mp4'),
        ('XVID', '.avi'),
    ]

    video_writer = None
    for codec, ext in codecs:
        try:
            fourcc = cv2.VideoWriter_fourcc(*codec)
            output_path = output_filename.rsplit('.', 1)[0] + ext
            video_writer = cv2.VideoWriter(output_path, fourcc, fps//2, (frame_width, frame_height))
            if video_writer.isOpened():
                print(f"Successfully initialized video writer with codec: {codec}")
                break
        except Exception as e:
            print(f"Failed to initialize video writer with codec {codec}: {str(e)}")
            if video_writer is not None:
                video_writer.release()

    if video_writer is None:
        raise Exception("Could not initialize video writer with any codec")

    # Process video
    count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        count += 1
        if count % 2 != 0:  # Skip odd frames
            continue
        
        # Process the frame
        frame = counter.count(frame)
        
        # Write the processed frame
        video_writer.write(frame)
        
        # Display the frame
        display_frame = cv2.resize(frame, (1020, 500))
        cv2.imshow("RGB", display_frame)
        
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    # Release resources
    cap.release()
    video_writer.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

Ultralytics Solutions:  {'region': [(101, 302), (508, 302)], 'show_in': True, 'show_out': True, 'colormap': None, 'up_angle': 145.0, 'down_angle': 90, 'kpts': [6, 8, 10], 'analytics_type': 'line', 'json_file': None, 'records': 5, 'model': 'C:/Users/DELL/Desktop/only traffic project/yolov8/yolov8x.pt', 'classes': [2, 5, 7], 'line_width': 2, 'real_world_distance': 20}
Successfully initialized video writer with codec: avc1

0: 384x640 11 cars, 1 bus, 5 trucks, 34.8ms
Speed: 0.0ms preprocess, 34.8ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 5 trucks, 38.8ms
Speed: 0.0ms preprocess, 38.8ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 bus, 6 trucks, 18.0ms
Speed: 1.0ms preprocess, 18.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 5 trucks, 16.9ms
Speed: 1.8ms preprocess, 16.9ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x

In [7]:
import os
import pandas as pd
from datetime import datetime
import numpy as np
import torch
from time import time
import cv2
from ultralytics.solutions.solutions import BaseSolution
from ultralytics.utils.plotting import Annotator, colors

class ObjectCounter(BaseSolution):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.in_count = 0
        self.out_count = 0
        self.counted_ids = []
        self.saved_ids = []
        self.classwise_counts = {}
        self.region_initialized = False
        self.spd = {}
        self.trkd_ids = []
        self.trk_pt = {}
        self.trk_pp = {}
        self.show_in = self.CFG.get("show_in", True)
        self.show_out = self.CFG.get("show_out", True)
        self.frame_skip = 2
        
        # New parameters for extended tracking
        self.tracking_history_length = 60  # Frames to keep tracking after crossing line
        self.track_frames = {}  # Track frames since crossing line
        self.active_tracks = set()  # Currently active tracks
        
        # New parameters for speed calculation
        self.real_world_distance = kwargs.get('real_world_distance', 20)  # Distance in meters between region points
        self.fps = 30 // self.frame_skip  # Adjusted FPS accounting for frame skip
        
        # Initialize CSV data storage
        self.csv_filename = self.get_daily_filename()
        self.create_csv()

    def get_daily_filename(self):
        """Generate a filename based on the current date."""
        current_date = datetime.now().strftime("%Y-%m-%d")
        filename = f"vehicle_count_data_{current_date}.csv"
        return filename

    def create_csv(self):
        """Create the CSV file with proper headers if it doesn't exist."""
        if not os.path.exists(self.csv_filename):
            header = ["Track ID", "Label", "Action", "Speed (km/h)", "Class", "Date", "Time"]
            df = pd.DataFrame(columns=header)
            df.to_csv(self.csv_filename, index=False)
            print(f"CSV file created: {self.csv_filename} with headers.")

    def save_label_to_file(self, track_id, label, action, speed, class_name):
        """Save the label, track_id, action, speed, date, time, and class name to a CSV file."""
        if isinstance(speed, torch.Tensor):
            speed = speed.item()
        elif isinstance(speed, np.ndarray):
            speed = speed.item()

        speed = int(round(speed))

        current_time = datetime.now()
        current_date = current_time.date()
        current_time_str = current_time.strftime("%H:%M:%S")

        data = {
            "Track ID": track_id,
            "Label": label,
            "Action": action,
            "Speed (km/h)": speed,
            "Class": class_name,
            "Date": current_date,
            "Time": current_time_str
        }

        df = pd.DataFrame([data])
        df.to_csv(self.csv_filename, mode='a', header=False, index=False)
        print(f"Data for track_id {track_id} saved to CSV file {self.csv_filename}.")

    def calculate_perspective_factor(self, y_position):
        """Calculate perspective correction factor based on y-position in frame."""
        # Get the y-coordinate of the detection line
        line_y = self.region[0][1]  # Assuming horizontal line
        
        # Calculate distance from the detection line
        distance_from_line = abs(y_position - line_y)
        
        # Define the range where we want to maintain consistent speed
        max_distance = self.im0.shape[0] * 0.4  # 40% of frame height
        
        if distance_from_line <= max_distance:
            # Within the stable range - maintain consistent factor
            return 1.0
        else:
            # Beyond stable range - gradual adjustment
            excess_distance = distance_from_line - max_distance
            max_adjustment = 0.3  # Maximum 30% adjustment
            adjustment = min(excess_distance / (self.im0.shape[0] * 0.2), 1.0) * max_adjustment
            
            # If above the line, increase factor; if below, decrease factor
            if y_position < line_y:
                return 1.0 + adjustment
            else:
                return 1.0 - adjustment

    def calculate_speed(self, track_id, current_position, previous_position, time_difference):
        """Calculate speed with perspective correction and real-world distance calibration."""
        if time_difference <= 0:
            return 0
            
        # Get pixel distance
        pixel_distance = np.sqrt(
            (current_position[0] - previous_position[0]) ** 2 +
            (current_position[1] - previous_position[1]) ** 2
        )
        
        # Apply perspective correction
        perspective_factor = self.calculate_perspective_factor(current_position[1])
        corrected_pixel_distance = pixel_distance * perspective_factor
        
        # Convert pixel distance to real-world distance
        region_pixel_distance = np.sqrt(
            (self.region[1][0] - self.region[0][0]) ** 2 +
            (self.region[1][1] - self.region[0][1]) ** 2
        )
        meters_per_pixel = self.real_world_distance / region_pixel_distance
        real_distance = corrected_pixel_distance * meters_per_pixel
        
        # Calculate speed in km/h
        speed = (real_distance / time_difference) * 3.6  # Convert m/s to km/h
        
        # Apply adaptive smoothing using moving average
        if track_id in self.spd:
            # Increase speed factor by 20%
            speed = speed * 1.2
            
            # Adaptive smoothing based on speed difference
            diff = abs(speed - self.spd[track_id])
            if diff < 10:
                # Small changes - more smoothing
                speed = 0.8 * self.spd[track_id] + 0.2 * speed
            else:
                # Larger changes - less smoothing
                speed = 0.6 * self.spd[track_id] + 0.4 * speed
            
            # Additional stabilization: limit speed change
            max_change = 15  # Increased maximum speed change in km/h
            prev_speed = self.spd[track_id]
            if abs(speed - prev_speed) > max_change:
                if speed > prev_speed:
                    speed = prev_speed + max_change
                else:
                    speed = prev_speed - max_change
            
        else:
            # Initial speed boost for new tracks
            speed = speed * 1.2
            
        return min(max(speed, 0), 150)  # Cap speed between 0 and 150 km/h

    def count_objects(self, current_centroid, track_id, prev_position, cls):
        """Count objects and update file based on centroid movements."""
        if prev_position is None or track_id in self.counted_ids:
            return

        action = None
        speed = None

        if len(self.region) == 2:  # Handle linear region counting
            line = self.LineString(self.region)
            if line.intersects(self.LineString([prev_position, current_centroid])):
                if abs(self.region[0][0] - self.region[1][0]) < abs(self.region[0][1] - self.region[1][1]):
                    if current_centroid[0] > prev_position[0]:
                        self.in_count += 1
                        self.classwise_counts[self.names[cls]]["IN"] += 1
                        action = "IN"
                    else:
                        self.out_count += 1
                        self.classwise_counts[self.names[cls]]["OUT"] += 1
                        action = "OUT"
                else:
                    if current_centroid[1] > prev_position[1]:
                        self.in_count += 1
                        self.classwise_counts[self.names[cls]]["IN"] += 1
                        action = "IN"
                    else:
                        self.out_count += 1
                        self.classwise_counts[self.names[cls]]["OUT"] += 1
                        action = "OUT"
                self.counted_ids.append(track_id)

        elif len(self.region) > 2:  # Handle polygonal region counting
            polygon = self.Polygon(self.region)
            if polygon.contains(self.Point(current_centroid)):
                if current_centroid[0] > prev_position[0]:
                    self.in_count += 1
                    self.classwise_counts[self.names[cls]]["IN"] += 1
                    action = "IN"
                else:
                    self.out_count += 1
                    self.classwise_counts[self.names[cls]]["OUT"] += 1
                    action = "OUT"
                self.counted_ids.append(track_id)

        if action:
            label = f"{self.names[cls]} ID: {track_id}"
            speed = self.spd.get(track_id, 0)
            self.save_label_to_file(track_id, label, action, speed, self.names[cls])

    def store_classwise_counts(self, cls):
        """Initialize count dictionary for a given class."""
        if self.names[cls] not in self.classwise_counts:
            self.classwise_counts[self.names[cls]] = {"IN": 0, "OUT": 0}

    def display_counts(self, im0):
        """Display the counts and actions on the image."""
        labels_dict = {
            str.capitalize(key): f"{'IN ' + str(value['IN']) if self.show_in else ''} "
                                f"{'OUT ' + str(value['OUT']) if self.show_out else ''}".strip()
            for key, value in self.classwise_counts.items()
            if value["IN"] != 0 or value["OUT"] != 0
        }

        if labels_dict:
            self.annotator.display_analytics(im0, labels_dict, (104, 31, 17), (255, 255, 255), 10)

    def count(self, im0):
        """Main counting function with improved speed calculation."""
        self.im0 = im0  # Store reference to current frame
        
        if not self.region_initialized:
            self.initialize_region()
            self.region_initialized = True

        self.annotator = Annotator(im0, line_width=self.line_width)
        self.extract_tracks(im0)
        self.annotator.draw_region(reg_pts=self.region, color=(104, 0, 123), thickness=self.line_width * 2)

        for box, track_id, cls in zip(self.boxes, self.track_ids, self.clss):
            self.store_tracking_history(track_id, box)
            self.store_classwise_counts(cls)
            
            current_position = ((box[0] + box[2]) / 2, (box[1] + box[3]) / 2)
            
            if track_id in self.trk_pt and track_id in self.trk_pp:
                time_difference = time() - self.trk_pt[track_id]
                if time_difference > 0:
                    speed = self.calculate_speed(
                        track_id,
                        current_position,
                        self.trk_pp[track_id],
                        time_difference
                    )
                    self.spd[track_id] = speed

            self.trk_pt[track_id] = time()
            self.trk_pp[track_id] = current_position

            # Display speed and draw annotations
            speed_label = f"{int(self.spd.get(track_id, 0))} km/h" if track_id in self.spd else self.names[int(cls)]
            combine_label = f"{speed_label}, ID: {track_id}"
            self.annotator.box_label(box, label=combine_label, color=(255, 0, 0))
            self.annotator.draw_centroid_and_tracks(
                self.track_line,
                color=colors(int(track_id), True),
                track_thickness=self.line_width
            )

            # Handle tracking and counting
            prev_position = self.track_history[track_id][-2] if len(self.track_history[track_id]) > 1 else None
            
            # Check if vehicle crosses the line
            if prev_position is not None:
                if track_id not in self.counted_ids:
                    self.count_objects(current_position, track_id, prev_position, cls)
                    if track_id in self.counted_ids:
                        self.active_tracks.add(track_id)
                        self.track_frames[track_id] = 0
                elif track_id in self.active_tracks:
                    # Continue tracking after crossing line
                    self.track_frames[track_id] += 1
                    if self.track_frames[track_id] > self.tracking_history_length:
                        self.active_tracks.remove(track_id)
                        del self.track_frames[track_id]

        self.display_counts(im0)
        return im0

def main():
    # Initialize video capture
    cap = cv2.VideoCapture('video2.mp4')
    
    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Scale region points according to video dimensions
    scale_x = frame_width / 1020
    scale_y = frame_height / 500
    
    # Calculate line points leaving 10 pixels on each side
    left_margin = 10
    right_margin = 10
    line_y = int(350 * scale_y)
    
    region_points = [
        (int(left_margin * scale_x), line_y),
        (int((1020 - right_margin) * scale_x), line_y)
    ]

    # Initialize object counter
    counter = ObjectCounter(
        region=region_points,
        model="C:/Users/DELL/Desktop/only traffic project/yolov8/yolov8x.pt",  # Update path to your model
        classes=[2, 5, 7],
        show_in=True,
        show_out=True,
        line_width=2,
        real_world_distance=100  # Set this to actual distance in meters between region points
    )

    # Set up video writer
    output_filename = "video2OUTPUT3.mp4"
    codecs = [
        ('avc1', '.mp4'),
        ('mp4v', '.mp4'),
        ('XVID', '.avi'),
    ]

    video_writer = None
    for codec, ext in codecs:
        try:
            fourcc = cv2.VideoWriter_fourcc(*codec)
            output_path = output_filename.rsplit('.', 1)[0] + ext
            video_writer = cv2.VideoWriter(output_path, fourcc, fps//2, (frame_width, frame_height))
            if video_writer.isOpened():
                print(f"Successfully initialized video writer with codec: {codec}")
                break
        except Exception as e:
            print(f"Failed to initialize video writer with codec {codec}: {str(e)}")
            if video_writer is not None:
                video_writer.release()

    if video_writer is None:
        raise Exception("Could not initialize video writer with any codec")

    # Process video
    count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        count += 1
        if count % 2 != 0:  # Skip odd frames
            continue
        
        # Process the frame
        frame = counter.count(frame)
        
        # Write the processed frame
        video_writer.write(frame)
        
        # Display the frame
        display_frame = cv2.resize(frame, (1020, 500))
        cv2.imshow("RGB", display_frame)
        
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    # Release resources
    cap.release()
    video_writer.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

Ultralytics Solutions:  {'region': [(7, 302), (760, 302)], 'show_in': True, 'show_out': True, 'colormap': None, 'up_angle': 145.0, 'down_angle': 90, 'kpts': [6, 8, 10], 'analytics_type': 'line', 'json_file': None, 'records': 5, 'model': 'C:/Users/DELL/Desktop/only traffic project/yolov8/yolov8x.pt', 'classes': [2, 5, 7], 'line_width': 2, 'real_world_distance': 100}


Successfully initialized video writer with codec: avc1

0: 384x640 11 cars, 1 bus, 5 trucks, 15.6ms
Speed: 0.0ms preprocess, 15.6ms inference, 15.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 5 trucks, 29.1ms
Speed: 0.0ms preprocess, 29.1ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 bus, 6 trucks, 31.1ms
Speed: 0.0ms preprocess, 31.1ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 5 trucks, 14.4ms
Speed: 2.0ms preprocess, 14.4ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 bus, 5 trucks, 30.0ms
Speed: 0.0ms preprocess, 30.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 bus, 5 trucks, 23.3ms
Speed: 0.0ms preprocess, 23.3ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 6 trucks, 20.0ms
Speed: 0.0ms preprocess, 20.0ms inference, 0.0ms postp

In [2]:
import os
import pandas as pd
from datetime import datetime
import numpy as np
import torch
from time import time
import cv2
from ultralytics.solutions.solutions import BaseSolution
from ultralytics.utils.plotting import Annotator, colors

class ObjectCounter(BaseSolution):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.in_count = 0
        self.out_count = 0
        self.counted_ids = []
        self.saved_ids = []
        self.classwise_counts = {}
        self.region_initialized = False
        self.spd = {}
        self.trkd_ids = []
        self.trk_pt = {}
        self.trk_pp = {}
        self.show_in = self.CFG.get("show_in", True)
        self.show_out = self.CFG.get("show_out", True)
        self.frame_skip = 2
        
        # Display settings
        self.display_duration = 90  # 3 seconds at 30fps
        self.display_tracks = {}  # Track which vehicles to display
        self.display_start_frames = {}  # When to start displaying each vehicle
        
        # Speed calculation parameters
        self.real_world_distance = kwargs.get('real_world_distance', 20)
        self.fps = 30 // self.frame_skip
        
        # Initialize CSV data storage
        self.csv_filename = self.get_daily_filename()
        self.create_csv()

    def get_daily_filename(self):
        """Generate a filename based on the current date."""
        current_date = datetime.now().strftime("%Y-%m-%d")
        filename = f"vehicle_count_data_{current_date}.csv"
        return filename

    def create_csv(self):
        """Create the CSV file with proper headers if it doesn't exist."""
        if not os.path.exists(self.csv_filename):
            header = ["Track ID", "Label", "Action", "Speed (km/h)", "Class", "Date", "Time"]
            df = pd.DataFrame(columns=header)
            df.to_csv(self.csv_filename, index=False)
            print(f"CSV file created: {self.csv_filename} with headers.")

    def save_label_to_file(self, track_id, label, action, speed, class_name):
        """Save the label, track_id, action, speed, date, time, and class name to a CSV file."""
        if isinstance(speed, torch.Tensor):
            speed = speed.item()
        elif isinstance(speed, np.ndarray):
            speed = speed.item()

        speed = int(round(speed))

        current_time = datetime.now()
        current_date = current_time.date()
        current_time_str = current_time.strftime("%H:%M:%S")

        data = {
            "Track ID": track_id,
            "Label": label,
            "Action": action,
            "Speed (km/h)": speed,
            "Class": class_name,
            "Date": current_date,
            "Time": current_time_str
        }

        df = pd.DataFrame([data])
        df.to_csv(self.csv_filename, mode='a', header=False, index=False)
        print(f"Data for track_id {track_id} saved to CSV file {self.csv_filename}.")

    def calculate_speed(self, track_id, current_position, previous_position, time_difference):
        """Calculate speed with perspective correction and real-world distance calibration."""
        if time_difference <= 0:
            return 0
            
        # Get pixel distance
        pixel_distance = np.sqrt(
            (current_position[0] - previous_position[0]) ** 2 +
            (current_position[1] - previous_position[1]) ** 2
        )
        
        # Convert pixel distance to real-world distance
        region_pixel_distance = np.sqrt(
            (self.region[1][0] - self.region[0][0]) ** 2 +
            (self.region[1][1] - self.region[0][1]) ** 2
        )
        meters_per_pixel = self.real_world_distance / region_pixel_distance
        real_distance = pixel_distance * meters_per_pixel
        
        # Calculate speed in km/h
        speed = (real_distance / time_difference) * 3.6 * 1.2  # Convert m/s to km/h with 20% adjustment
        
        # Apply adaptive smoothing
        if track_id in self.spd:
            diff = abs(speed - self.spd[track_id])
            if diff < 10:
                speed = 0.8 * self.spd[track_id] + 0.2 * speed
            else:
                speed = 0.6 * self.spd[track_id] + 0.4 * speed
            
            # Limit speed change
            max_change = 15
            prev_speed = self.spd[track_id]
            if abs(speed - prev_speed) > max_change:
                if speed > prev_speed:
                    speed = prev_speed + max_change
                else:
                    speed = prev_speed - max_change
        
        return min(max(speed, 0), 150)  # Cap speed between 0 and 150 km/h

    def count_objects(self, current_centroid, track_id, prev_position, cls):
        """Count objects and update file based on centroid movements."""
        if prev_position is None or track_id in self.counted_ids:
            return

        action = None
        speed = None

        if len(self.region) == 2:  # Handle linear region counting
            line = self.LineString(self.region)
            if line.intersects(self.LineString([prev_position, current_centroid])):
                if abs(self.region[0][0] - self.region[1][0]) < abs(self.region[0][1] - self.region[1][1]):
                    if current_centroid[0] > prev_position[0]:
                        self.in_count += 1
                        self.classwise_counts[self.names[cls]]["IN"] += 1
                        action = "IN"
                    else:
                        self.out_count += 1
                        self.classwise_counts[self.names[cls]]["OUT"] += 1
                        action = "OUT"
                else:
                    if current_centroid[1] > prev_position[1]:
                        self.in_count += 1
                        self.classwise_counts[self.names[cls]]["IN"] += 1
                        action = "IN"
                    else:
                        self.out_count += 1
                        self.classwise_counts[self.names[cls]]["OUT"] += 1
                        action = "OUT"
                self.counted_ids.append(track_id)

        if action:
            label = f"{self.names[cls]} ID: {track_id}"
            speed = self.spd.get(track_id, 0)
            self.save_label_to_file(track_id, label, action, speed, self.names[cls])

    def store_classwise_counts(self, cls):
        """Initialize count dictionary for a given class."""
        if self.names[cls] not in self.classwise_counts:
            self.classwise_counts[self.names[cls]] = {"IN": 0, "OUT": 0}

    def display_counts(self, im0):
        """Display the counts and actions on the image."""
        labels_dict = {
            str.capitalize(key): f"{'IN ' + str(value['IN']) if self.show_in else ''} "
                                f"{'OUT ' + str(value['OUT']) if self.show_out else ''}".strip()
            for key, value in self.classwise_counts.items()
            if value["IN"] != 0 or value["OUT"] != 0
        }

        if labels_dict:
            self.annotator.display_analytics(im0, labels_dict, (104, 31, 17), (255, 255, 255), 10)

    def count(self, im0):
        """Main counting function with speed display only at line crossing."""
        self.im0 = im0
        
        if not self.region_initialized:
            self.initialize_region()
            self.region_initialized = True

        self.annotator = Annotator(im0, line_width=3)  # Increased line width for better visibility
        self.extract_tracks(im0)
        self.annotator.draw_region(reg_pts=self.region, color=(104, 0, 123), thickness=self.line_width * 2)

        # Process each tracked object
        for box, track_id, cls in zip(self.boxes, self.track_ids, self.clss):
            self.store_tracking_history(track_id, box)
            self.store_classwise_counts(cls)
            
            current_position = ((box[0] + box[2]) / 2, (box[1] + box[3]) / 2)
            
            # Calculate speed
            if track_id in self.trk_pt and track_id in self.trk_pp:
                time_difference = time() - self.trk_pt[track_id]
                if time_difference > 0:
                    speed = self.calculate_speed(
                        track_id,
                        current_position,
                        self.trk_pp[track_id],
                        time_difference
                    )
                    self.spd[track_id] = speed

            self.trk_pt[track_id] = time()
            self.trk_pp[track_id] = current_position

            # Check for line crossing
            prev_position = self.track_history[track_id][-2] if len(self.track_history[track_id]) > 1 else None
            if prev_position is not None and track_id not in self.counted_ids:
                line = self.LineString(self.region)
                if line.intersects(self.LineString([prev_position, current_position])):
                    # Vehicle just crossed the line
                    self.display_tracks[track_id] = True
                    self.display_start_frames[track_id] = len(self.track_history[track_id])
                    self.count_objects(current_position, track_id, prev_position, cls)

            # Display speed only for vehicles that have crossed the line
            if track_id in self.display_tracks:
                frames_since_crossing = len(self.track_history[track_id]) - self.display_start_frames[track_id]
                if frames_since_crossing <= self.display_duration:
                    if track_id in self.spd:
                        speed_text = f"{int(self.spd[track_id])} km/h"
                        self.annotator.box_label(
                            box,
                            label=speed_text,
                            color=(0, 0, 255)  # Red color
                        )
                else:
                    # Remove from display after duration
                    self.display_tracks.pop(track_id, None)
                    self.display_start_frames.pop(track_id, None)

        self.display_counts(im0)
        return im0

def main():
    # Initialize video capture
    cap = cv2.VideoCapture('video2.mp4')
    
    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Calculate line points leaving 10 pixels on each side
    scale_x = frame_width / 1020
    scale_y = frame_height / 500
    left_margin = 10
    right_margin = 10
    line_y = int(350 * scale_y)
    
    region_points = [
        (int(left_margin * scale_x), line_y),
        (int((1020 - right_margin) * scale_x), line_y)
    ]

    # Initialize object counter
    counter = ObjectCounter(
        region=region_points,
        model="C:/Users/DELL/Desktop/only traffic project/yolov8/yolov8x.pt",  # Update path to your model
        classes=[2, 5, 7],
        show_in=True,
        show_out=True,
        line_width=2,
        real_world_distance=100  # Set this to actual distance in meters between region points
    )

    # Set up video writer
    output_filename = "video2OUTPUT4.mp4"
    codecs = [
        ('avc1', '.mp4'),
        ('mp4v', '.mp4'),
        ('XVID', '.avi'),
    ]

    video_writer = None
    for codec, ext in codecs:
        try:
            fourcc = cv2.VideoWriter_fourcc(*codec)
            output_path = output_filename.rsplit('.', 1)[0] + ext
            video_writer = cv2.VideoWriter(output_path, fourcc, fps//2, (frame_width, frame_height))
            if video_writer.isOpened():
                print(f"Successfully initialized video writer with codec: {codec}")
                break
        except Exception as e:
            print(f"Failed to initialize video writer with codec {codec}: {str(e)}")
            if video_writer is not None:
                video_writer.release()

    if video_writer is None:
        raise Exception("Could not initialize video writer with any codec")

    # Process video
    count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        count += 1
        if count % 2 != 0:  # Skip odd frames
            continue
        
        # Process the frame
        frame = counter.count(frame)
        
        # Write the processed frame
        video_writer.write(frame)
        
        # Display the frame
        display_frame = cv2.resize(frame, (1020, 500))
        cv2.imshow("RGB", display_frame)
        
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    # Release resources
    cap.release()
    video_writer.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

Ultralytics Solutions:  {'region': [(7, 302), (760, 302)], 'show_in': True, 'show_out': True, 'colormap': None, 'up_angle': 145.0, 'down_angle': 90, 'kpts': [6, 8, 10], 'analytics_type': 'line', 'json_file': None, 'records': 5, 'model': 'C:/Users/DELL/Desktop/only traffic project/yolov8/yolov8x.pt', 'classes': [2, 5, 7], 'line_width': 2, 'real_world_distance': 100}
Successfully initialized video writer with codec: avc1

0: 384x640 11 cars, 1 bus, 5 trucks, 40.3ms
Speed: 0.0ms preprocess, 40.3ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 5 trucks, 23.7ms
Speed: 0.0ms preprocess, 23.7ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 bus, 6 trucks, 23.2ms
Speed: 0.0ms preprocess, 23.2ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 5 trucks, 24.8ms
Speed: 1.0ms preprocess, 24.8ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x6

In [3]:
#final 
import os
import pandas as pd
from datetime import datetime
import numpy as np
import torch
from time import time
import cv2
from ultralytics.solutions.solutions import BaseSolution
from ultralytics.utils.plotting import Annotator, colors

class ObjectCounter(BaseSolution):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.in_count = 0
        self.out_count = 0
        self.counted_ids = []
        self.saved_ids = []
        self.classwise_counts = {}
        self.region_initialized = False
        self.spd = {}
        self.trkd_ids = []
        self.trk_pt = {}
        self.trk_pp = {}
        self.show_in = self.CFG.get("show_in", True)
        self.show_out = self.CFG.get("show_out", True)
        self.frame_skip = 2
        
        # Display settings
        self.display_duration = 90  # 3 seconds at 30fps
        self.display_tracks = {}  # Track which vehicles to display
        self.display_start_frames = {}  # When to start displaying each vehicle
        
        # Speed calculation parameters
        self.real_world_distance = kwargs.get('real_world_distance', 20)
        self.fps = 30 // self.frame_skip
        
        # Initialize CSV data storage
        self.csv_filename = self.get_daily_filename()
        self.create_csv()

    def get_daily_filename(self):
        """Generate a filename based on the current date."""
        current_date = datetime.now().strftime("%Y-%m-%d")
        filename = f"vehicle_count_data_{current_date}.csv"
        return filename

    def create_csv(self):
        """Create the CSV file with proper headers if it doesn't exist."""
        if not os.path.exists(self.csv_filename):
            header = ["Track ID", "Label", "Action", "Speed (km/h)", "Class", "Date", "Time"]
            df = pd.DataFrame(columns=header)
            df.to_csv(self.csv_filename, index=False)
            print(f"CSV file created: {self.csv_filename} with headers.")

    def save_label_to_file(self, track_id, label, action, speed, class_name):
        """Save the label, track_id, action, speed, date, time, and class name to a CSV file."""
        if isinstance(speed, torch.Tensor):
            speed = speed.item()
        elif isinstance(speed, np.ndarray):
            speed = speed.item()

        speed = int(round(speed))
        
        current_time = datetime.now()
        current_date = current_time.date()
        current_time_str = current_time.strftime("%H:%M:%S")

        data = {
            "Track ID": track_id,
            "Label": label,
            "Action": action,
            "Speed (km/h)": speed,
            "Class": class_name,
            "Date": current_date,
            "Time": current_time_str
        }

        df = pd.DataFrame([data])
        df.to_csv(self.csv_filename, mode='a', header=False, index=False)

    def calculate_speed(self, track_id, current_position, previous_position, time_difference):
        """Calculate speed with perspective correction and real-world distance calibration."""
        if time_difference <= 0:
            return 0
            
        # Get pixel distance
        pixel_distance = np.sqrt(
            (current_position[0] - previous_position[0]) ** 2 +
            (current_position[1] - previous_position[1]) ** 2
        )
        
        # Convert pixel distance to real-world distance
        region_pixel_distance = np.sqrt(
            (self.region[1][0] - self.region[0][0]) ** 2 +
            (self.region[1][1] - self.region[0][1]) ** 2
        )
        meters_per_pixel = self.real_world_distance / region_pixel_distance
        real_distance = pixel_distance * meters_per_pixel
        
        # Calculate speed in km/h
        speed = (real_distance / time_difference) * 3.6 * 1.2  # Convert m/s to km/h with 20% adjustment
        
        # Apply adaptive smoothing
        if track_id in self.spd:
            diff = abs(speed - self.spd[track_id])
            if diff < 10:
                speed = 0.8 * self.spd[track_id] + 0.2 * speed
            else:
                speed = 0.6 * self.spd[track_id] + 0.4 * speed
            
            # Limit speed change
            max_change = 15
            prev_speed = self.spd[track_id]
            if abs(speed - prev_speed) > max_change:
                if speed > prev_speed:
                    speed = prev_speed + max_change
                else:
                    speed = prev_speed - max_change
        
        return min(max(speed, 0), 150)  # Cap speed between 0 and 150 km/h

    def count_objects(self, current_centroid, track_id, prev_position, cls):
        """Count objects and update file based on centroid movements."""
        if prev_position is None or track_id in self.counted_ids:
            return

        action = None
        speed = None

        if len(self.region) == 2:  # Handle linear region counting
            line = self.LineString(self.region)
            if line.intersects(self.LineString([prev_position, current_centroid])):
                if abs(self.region[0][0] - self.region[1][0]) < abs(self.region[0][1] - self.region[1][1]):
                    if current_centroid[0] > prev_position[0]:
                        self.in_count += 1
                        self.classwise_counts[self.names[cls]]["IN"] += 1
                        action = "IN"
                    else:
                        self.out_count += 1
                        self.classwise_counts[self.names[cls]]["OUT"] += 1
                        action = "OUT"
                else:
                    if current_centroid[1] > prev_position[1]:
                        self.in_count += 1
                        self.classwise_counts[self.names[cls]]["IN"] += 1
                        action = "IN"
                    else:
                        self.out_count += 1
                        self.classwise_counts[self.names[cls]]["OUT"] += 1
                        action = "OUT"
                self.counted_ids.append(track_id)

        if action:
            label = f"{self.names[cls]} ID: {track_id}"
            speed = self.spd.get(track_id, 0)
            self.save_label_to_file(track_id, label, action, speed, self.names[cls])

    def store_classwise_counts(self, cls):
        """Initialize count dictionary for a given class."""
        if self.names[cls] not in self.classwise_counts:
            self.classwise_counts[self.names[cls]] = {"IN": 0, "OUT": 0}

    def display_counts(self, im0):
        """Display the counts and actions on the image."""
        labels_dict = {
            str.capitalize(key): f"{'IN ' + str(value['IN']) if self.show_in else ''} "
                                f"{'OUT ' + str(value['OUT']) if self.show_out else ''}".strip()
            for key, value in self.classwise_counts.items()
            if value["IN"] != 0 or value["OUT"] != 0
        }

        if labels_dict:
            self.annotator.display_analytics(im0, labels_dict, (104, 31, 17), (255, 255, 255), 10)

    def count(self, im0):
        """Main counting function with speed display only at line crossing."""
        self.im0 = im0
        
        if not self.region_initialized:
            self.initialize_region()
            self.region_initialized = True

        self.annotator = Annotator(im0, line_width=2)
        self.extract_tracks(im0)
        self.annotator.draw_region(reg_pts=self.region, color=(104, 0, 123), thickness=self.line_width * 2)

        # Process each tracked object
        for box, track_id, cls in zip(self.boxes, self.track_ids, self.clss):
            self.store_tracking_history(track_id, box)
            self.store_classwise_counts(cls)
            
            current_position = ((box[0] + box[2]) / 2, (box[1] + box[3]) / 2)
            
            # Calculate speed
            if track_id in self.trk_pt and track_id in self.trk_pp:
                time_difference = time() - self.trk_pt[track_id]
                if time_difference > 0:
                    speed = self.calculate_speed(
                        track_id,
                        current_position,
                        self.trk_pp[track_id],
                        time_difference
                    )
                    self.spd[track_id] = speed

            self.trk_pt[track_id] = time()
            self.trk_pp[track_id] = current_position

            # Check for line crossing
            prev_position = self.track_history[track_id][-2] if len(self.track_history[track_id]) > 1 else None
            if prev_position is not None and track_id not in self.counted_ids:
                line = self.LineString(self.region)
                if line.intersects(self.LineString([prev_position, current_position])):
                    # Vehicle just crossed the line
                    self.display_tracks[track_id] = True
                    self.display_start_frames[track_id] = len(self.track_history[track_id])
                    self.count_objects(current_position, track_id, prev_position, cls)

            # Display speed only for vehicles that have crossed the line
            if track_id in self.display_tracks:
                frames_since_crossing = len(self.track_history[track_id]) - self.display_start_frames[track_id]
                if frames_since_crossing <= self.display_duration:
                    if track_id in self.spd:
                        speed_text = f"{int(self.spd[track_id])} km/h"
                        self.annotator.box_label(
                            box,
                            label=speed_text,
                            color=(255, 0, 0)  # Blue color in BGR format
                        )
                else:
                    # Remove from display after duration
                    self.display_tracks.pop(track_id, None)
                    self.display_start_frames.pop(track_id, None)

        self.display_counts(im0)
        return im0

def main():
    # Initialize video capture
    cap = cv2.VideoCapture('video2.mp4')
    
    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Calculate line points leaving 10 pixels on each side
    scale_x = frame_width / 1020
    scale_y = frame_height / 500
    left_margin = 10
    right_margin = 10
    line_y = int(350 * scale_y)
    
    region_points = [
        (int(left_margin * scale_x), line_y),
        (int((1020 - right_margin) * scale_x), line_y)
    ]

    # Initialize object counter
    counter = ObjectCounter(
        region=region_points,
        model="C:/Users/DELL/Desktop/only traffic project/yolov8/yolov8x.pt",  # Update path to your model
        classes=[2, 5, 7],
        show_in=True,
        show_out=True,
        line_width=2,
        real_world_distance=100  # Set this to actual distance in meters between region points
    )

    # Set up video writer
    output_filename = "video2OUTPUT5.mp4"
    codecs = [
        ('avc1', '.mp4'),
        ('mp4v', '.mp4'),
        ('XVID', '.avi'),
    ]

    video_writer = None
    for codec, ext in codecs:
        try:
            fourcc = cv2.VideoWriter_fourcc(*codec)
            output_path = output_filename.rsplit('.', 1)[0] + ext
            video_writer = cv2.VideoWriter(output_path, fourcc, fps//2, (frame_width, frame_height))
            if video_writer.isOpened():
                print(f"Successfully initialized video writer with codec: {codec}")
                break
        except Exception as e:
            print(f"Failed to initialize video writer with codec {codec}: {str(e)}")
            if video_writer is not None:
                video_writer.release()

    if video_writer is None:
        raise Exception("Could not initialize video writer with any codec")

    # Process video
    count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        count += 1
        if count % 2 != 0:  # Skip odd frames
            continue
        
        # Process the frame
        frame = counter.count(frame)
        
        # Write the processed frame
        video_writer.write(frame)
        
        # Display the frame
        display_frame = cv2.resize(frame, (1020, 500))
        cv2.imshow("RGB", display_frame)
        
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    # Release resources
    cap.release()
    video_writer.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

Ultralytics Solutions:  {'region': [(7, 302), (760, 302)], 'show_in': True, 'show_out': True, 'colormap': None, 'up_angle': 145.0, 'down_angle': 90, 'kpts': [6, 8, 10], 'analytics_type': 'line', 'json_file': None, 'records': 5, 'model': 'C:/Users/DELL/Desktop/only traffic project/yolov8/yolov8x.pt', 'classes': [2, 5, 7], 'line_width': 2, 'real_world_distance': 100}
Successfully initialized video writer with codec: avc1

0: 384x640 11 cars, 1 bus, 5 trucks, 31.6ms
Speed: 0.0ms preprocess, 31.6ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 5 trucks, 28.8ms
Speed: 0.0ms preprocess, 28.8ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 cars, 1 bus, 6 trucks, 29.4ms
Speed: 0.0ms preprocess, 29.4ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 cars, 1 bus, 5 trucks, 23.8ms
Speed: 0.0ms preprocess, 23.8ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x6