In [None]:
from ultralytics import YOLO
import os
import cv2
import numpy as np
import pandas as pd
from collections import defaultdict
from PIL import Image, ImageDraw
from tqdm import tqdm
import csv

In [None]:
#drive_folder = '/content/drive/MyDrive/Bengaluru Mobility Challenge Solution'
#model_path = os.path.join(drive_folder, 'runs/detect/train/weights/best.pt')

model_path = r"C:\Users\sharm\Python Projects\ML Projects\Bengaluru Mobility Challenge\Bengaluru Mobility Challenge Solution\runs\detect\train\weights\best.pt"
yolo_model = YOLO(model_path)
print("Model loaded successfully!")

In [None]:
zones = {
    'A': [(0, 0), (150, 200)],
    'B': [(0, 200), (150, 450)],
    'C': [(150, 0), (400, 130)],
    'D': [(400, 0), (630, 130)],
    'E': [(630, 0), (750, 200)],
    'F': [(630, 200), (750, 430)]
}

In [None]:
#zones_img = Image.open("/content/drive/MyDrive/Bengaluru Mobility Challenge Solution/Video File/zones_image.webp").convert("RGBA")
zones_img = Image.open(r"C:\Users\sharm\Python Projects\ML Projects\Bengaluru Mobility Challenge\Bengaluru Mobility Challenge Solution\Video File\zones_image.webp")
draw = ImageDraw.Draw(zones_img, "RGBA")


colors = {
    'A': (255, 0, 0, 128),
    'B': (0, 255, 0, 128),
    'C': (0, 0, 255, 128),
    'D': (255, 255, 0, 128),
    'E': (255, 0, 255, 128),
    'F': (0, 255, 255, 128)
}

for zone, coords in zones.items():
    draw.rectangle(coords, outline=colors[zone], fill=None, width = 4)

zones_img

#zones_img.save("zones_visualized.png")

In [None]:
#track_history = defaultdict(lambda: [])

In [None]:
"""  
vehicle_counts = {
    'BC': defaultdict(int), 'BE': defaultdict(int), 'DE': defaultdict(int),
    'DF': defaultdict(int), 'FA': defaultdict(int), 'FC': defaultdict(int)
} """

In [None]:
track_history = defaultdict(list)
vehicle_classes = defaultdict(int)

In [None]:

def transform_point(point, frame_width, frame_height):
    orig_width, orig_height = 1920, 1080
    x_scale = frame_width / orig_width
    y_scale = frame_height / orig_height
    return (point[0] / x_scale, point[1] / y_scale)

def point_in_zone(point, zone, frame_width, frame_height):
    transformed_point = transform_point(point, frame_width, frame_height)
    x, y = transformed_point
    return (zone[0][0] <= x <= zone[1][0]) and (zone[0][1] <= y <= zone[1][1])

def get_zone(point, frame_width, frame_height):
    for zone_name, zone_coords in zones.items():
        if point_in_zone(point, zone_coords, frame_width, frame_height):
            return zone_name
    return None


def get_turning_pattern(start_zone, end_zone):
    patterns = {
        ('B', 'C'): 'BC', ('B', 'E'): 'BE', 
        ('D', 'E'): 'DE', ('D', 'F'): 'DF', 
        ('F', 'A'): 'FA', ('F', 'C'): 'FC'
    }
    return patterns.get((start_zone, end_zone))


In [None]:
#video_file = '/content/drive/MyDrive/Bengaluru Mobility Challenge Solution/Video File/Stn_HD_1_time_2024-05-29T07:30:02_002.mp4'
video_file = r"C:\Users\sharm\Python Projects\ML Projects\Bengaluru Mobility Challenge\Bengaluru Mobility Challenge Solution\Video File\Stn_HD_1_time_2024-05-29T07_30_02_002.mp4"
#video_file = r"C:\Users\sharm\Python Projects\ML Projects\Bengaluru Mobility Challenge\Bengaluru Mobility Challenge Solution\Video File\Stn_HD_1_time_2024-05-25T07_30_01_008.mp4"
video_file

In [None]:
cap = cv2.VideoCapture(video_file)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
duration = total_frames / fps
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

print(f"Total frames: {total_frames}")
print(f"Frame width : {frame_width}")
print(f"Frames height : {frame_height}")
print(f"Frames per second (FPS): {fps}")
print(f"Duration of video (in seconds): {duration:.2f}")

In [None]:
""" vehicle_tracks = defaultdict(lambda: {'start': None, 'end': None, 'class': None})
output_dir = r"C:\Users\sharm\Python Projects\ML Projects\Bengaluru Mobility Challenge\Bengaluru Mobility Challenge Solution\frames"
os.makedirs(output_dir, exist_ok=True)

log_file_path = os.path.join(output_dir, 'processing_log.txt')
with open(log_file_path, 'w') as log_file:
    frame_count = 0
    progress_bar = tqdm(total=total_frames, desc='Processing Video', unit='frame', 
                        file=log_file, dynamic_ncols=True)

    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break
        
        results = yolo_model.track(frame, persist=True)
        try:
            boxes = results[0].boxes.xywh.cpu()
            track_ids = results[0].boxes.id.int().cpu().tolist()
            classes = results[0].boxes.cls.int().cpu().tolist()
            annotated_frame = results[0].plot()
            
            for box, track_id, cls in zip(boxes, track_ids, classes):
                x, y, w, h = box
                track = track_history[track_id]
                track.append((float(x), float(y)))
                current_zone = next((zone for zone, coords in zones.items() if point_in_zone((x, y), coords)), None)
                
                if len(track) == 1:
                    track[0] = (current_zone, cls)
                elif len(track) > 30:
                    track.pop(1)
                
                if current_zone and current_zone != track[0][0]:
                    start_zone, vehicle_class = track[0]
                    pattern = get_turning_pattern(start_zone, current_zone)
                    if pattern:
                        vehicle_counts[pattern][vehicle_class] += 1
                    track[0] = (current_zone, cls)
                
                points = np.array(track[1:]).astype(np.int32).reshape((-1, 1, 2))
                cv2.polylines(annotated_frame, [points], isClosed=False, color=(230, 230, 230), thickness=2)
            
            frame_filename = f"frame_{frame_count:06d}.jpg"
            frame_path = os.path.join(output_dir, frame_filename)
            cv2.imwrite(frame_path, annotated_frame)
            
            log_file.write(f"Frame {frame_count}: {len(boxes)} detections\n")
            log_file.flush()  
            
            frame_count += 1
            progress_bar.update(1)
            
            cv2.imshow("Vehicle Tracking", annotated_frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        except:
            pass

    progress_bar.close()

cap.release()
cv2.destroyAllWindows()

with open(log_file_path, 'a') as log_file:
    log_file.write("\nFinal Vehicle Counts:\n")
    for pattern, counts in vehicle_counts.items():
        log_file.write(f"{pattern}: {counts}\n") """

In [None]:
output_dir = r"C:\Users\sharm\Python Projects\ML Projects\Bengaluru Mobility Challenge\Bengaluru Mobility Challenge Solution\frames"
os.makedirs(output_dir, exist_ok=True)

log_file_path = os.path.join(output_dir, 'processing_log.txt')
with open(log_file_path, 'w') as log_file:
    frame_count = 0
    progress_bar = tqdm(total=total_frames, desc='Processing Video', unit='frame', 
                        file=log_file, dynamic_ncols=True)

    while cap.isOpened():
        success, frame = cap.read()
        frame = cv2.resize(frame, (640, 384))
        if not success:
            break
        
        results = yolo_model.track(frame, tracker = 'bytetrack.yaml', persist=True)
        try:
            boxes = results[0].boxes.xywh.cpu()
            track_ids = results[0].boxes.id.int().cpu().tolist()
            classes = results[0].boxes.cls.int().cpu().tolist()
            annotated_frame = results[0].plot()
            
            for box, track_id, cls in zip(boxes, track_ids, classes):
                x, y, w, h = box
                track_history[track_id].append((float(x), float(y)))
                vehicle_classes[track_id] = cls
                
                # Visualization (optional)
                points = np.array(track_history[track_id]).astype(np.int32).reshape((-1, 1, 2))
                cv2.polylines(annotated_frame, [points], isClosed=False, color=(230, 230, 230), thickness=2)
            
            frame_filename = f"frame_{frame_count:06d}.jpg"
            frame_path = os.path.join(output_dir, frame_filename)
            cv2.imwrite(frame_path, annotated_frame)
            
            log_file.write(f"Frame {frame_count}: {len(boxes)} detections\n")
            log_file.flush()  
            
            frame_count += 1
            progress_bar.update(1)

            cv2.namedWindow("Vehicle Tracking", cv2.WINDOW_NORMAL)
            cv2.imshow("Vehicle Tracking", annotated_frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        except Exception as e:
            log_file.write(f"Error in frame {frame_count}: {str(e)}\n")
    
    progress_bar.close()

cap.release()
cv2.destroyAllWindows()

In [None]:
vehicle_counts = defaultdict(lambda: defaultdict(int))

def analyze_track(history):
    if len(history) < 2:  
        return None, None
    
    start_points = history[:5] 
    end_points = history[-5:]  
    
    start_zone = max(set(get_zone(p) for p in start_points), key=start_points.count)
    end_zone = max(set(get_zone(p) for p in end_points), key=end_points.count)
    
    return start_zone, end_zone

for track_id, history in track_history.items():
    print("printing the track id and it's history")
    print(track_id, history)
    start_zone, end_zone = analyze_track(history)
    print("Printing the start zone and the end zone")
    print(start_zone, end_zone)
    if start_zone and end_zone and start_zone != end_zone:
        pattern = get_turning_pattern(start_zone, end_zone)
        if pattern:
            vehicle_class = vehicle_classes[track_id]
            vehicle_counts[pattern][vehicle_class] += 1


with open(log_file_path, 'a') as log_file:
    log_file.write("\nFinal Vehicle Counts:\n")
    for pattern, counts in vehicle_counts.items():
        log_file.write(f"{pattern}: {counts}\n")
        

In [None]:
print(len(track_history))

In [None]:
""" vehicle_counts = defaultdict(lambda: defaultdict(int))

def transform_point(point, frame_width, frame_height):
    orig_width, orig_height = 1920, 1080
    x_scale = frame_width / orig_width
    y_scale = frame_height / orig_height
    return (point[0] / x_scale, point[1] / y_scale)

def get_zone(point, frame_width, frame_height):
    transformed_point = transform_point(point, frame_width, frame_height)
    for zone_name, zone_coords in zones.items():
        if (zone_coords[0][0] <= transformed_point[0] <= zone_coords[1][0] and 
            zone_coords[0][1] <= transformed_point[1] <= zone_coords[1][1]):
            return zone_name
    return None

def analyze_track(history, frame_width, frame_height):
    if len(history) < 2:  
        return None, None
    
    start_points = history[:5] if len(history) >= 5 else history
    end_points = history[-5:] if len(history) >= 5 else history
    
    start_zones = [get_zone(p, frame_width, frame_height) for p in start_points]
    end_zones = [get_zone(p, frame_width, frame_height) for p in end_points]
    
    start_zone = max(set(start_zones) - {None}, key=start_zones.count, default=None)
    end_zone = max(set(end_zones) - {None}, key=end_zones.count, default=None)
    
    return start_zone, end_zone

print("Analyzing tracks...")
for track_id, history in track_history.items():
    print(f"Track ID: {track_id}")
    print(f"Track History: {history}")
    
    start_zone, end_zone = analyze_track(history, frame_width, frame_height)
    
    print(f"Start Zone: {start_zone}, End Zone: {end_zone}")
    
    if start_zone and end_zone and start_zone != end_zone:
        pattern = get_turning_pattern(start_zone, end_zone)
        if pattern:
            vehicle_class = vehicle_classes[track_id]
            vehicle_counts[pattern][vehicle_class] += 1
            print(f"Turning Pattern: {pattern}, Vehicle Class: {vehicle_class}")
        else:
            print(f"No valid turning pattern for {start_zone} to {end_zone}")
    else:
        print("No valid turn detected")
    
    print("---")

print("\nFinal Vehicle Counts:")
for pattern, counts in vehicle_counts.items():
    print(f"{pattern}: {counts}")

with open(log_file_path, 'a') as log_file:
    log_file.write("\nFinal Vehicle Counts:\n")
    for pattern, counts in vehicle_counts.items():
        log_file.write(f"{pattern}: {counts}\n") """

In [None]:
vehicle_counts

In [None]:
csv_file_path = os.path.join(output_dir, 'vehicle_counts.csv')
with open(csv_file_path, 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['Turning Patterns', 'bicycle', 'car', 'truck', 'lcv', 'two-wheeler', 'three-wheeler', 'bus'])
    for pattern in ['BC', 'BE', 'DE', 'DF', 'FA', 'FC']:
        counts = vehicle_counts[pattern]
        row = [pattern] + [counts[i] for i in range(7)]  
        writer.writerow(row)

In [None]:
vehicle_counts

In [None]:
""""
class_names = ['bicycle', 'car', 'truck', 'lcv', 'two-wheeler', 'three-wheeler', 'bus']
with open('vehicle_counts.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['Turning Patterns'] + class_names)
    for pattern in ['BC', 'BE', 'DE', 'DF', 'FA', 'FC']:
        row = [pattern] + [vehicle_counts[pattern][i] for i in range(len(class_names))]
        writer.writerow(row)
"""