In [None]:
import torch
from matplotlib import pyplot as plt
import numpy as np
import cv2
import uuid   # Unique identifier
import os
import time
import serial


: 

uploading the ambulance model

In [None]:
model_ambulance = torch.hub.load(
    '/Users/mohamedabdelgawad/model/yolov5',    # path to YOLOv5 repo
    'custom',
    path='/Users/mohamedabdelgawad/Desktop/project/models/real.pt',
    source='local',
    force_reload=True
)

: 

running the ambualnce model through camera

In [None]:
'''
/Users/mohamedabdelgawad/Downloads/1.mp4
/Users/mohamedabdelgawad/Downloads/2.mp4
/Users/mohamedabdelgawad/Downloads/3.mp4





'''

cap = cv2.VideoCapture("/Users/mohamedabdelgawad/Downloads/1.mp4")
while cap.isOpened():
    ret, frame = cap.read()
    
    # Make detections 
    results = model_ambulance(frame)
    print(results)
    
    cv2.imshow('YOLO', np.squeeze(results.render()))
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

merging the two models, and connecting with the arduino

In [None]:
import cv2
import serial
import time
import torch
import numpy as np
import requests
import json

# =======================
# CONFIG
# =======================
NUM_ROADS = 4
ARDUINO_PORT = '/dev/cu.usbmodem11201'
BAUD = 9600

# Website endpoint from Lovable
WEBSITE_URL = "https://ioxckbbugansummpyrvm.supabase.co/functions/v1/traffic"

# Priority weights
AMBULANCE_WEIGHT = 1000  # Highest priority
CAR_WEIGHT = 1           # Base traffic weight
MIN_GREEN_TIME = 5       # Minimum green time in seconds
TIME_PER_VEHICLE =  5    # Seconds per vehicle

# =======================
# LOAD YOLOv5 MODEL
# =======================
print("Loading YOLOv5 model...")
model = torch.hub.load(
    '/Users/mohamedabdelgawad/model/yolov5',
    'custom',
    path='/Users/mohamedabdelgawad/Desktop/project/models/toy.pt',
    source='local',
    force_reload=True
)
print("‚úì Model loaded successfully")

# =======================
# CONNECT ARDUINO
# =======================
print("Connecting to Arduino...")
try:
    arduino = serial.Serial(ARDUINO_PORT, BAUD)
    time.sleep(2)
    print("‚úì Arduino connected successfully")
except Exception as e:
    print(f"‚ùå Error connecting to Arduino: {e}")
    print("‚ö† Continuing without Arduino (website-only mode)")
    arduino = None

# =======================
# CAMERA
# =======================
print("Connecting to camera...")
cap = cv2.VideoCapture(0)

if not cap.isOpened():
    print("‚ùå Error: Could not connect to camera")
    if arduino:
        arduino.close()
    exit(1)

print("‚úì Camera connected successfully")

print("\n" + "=" * 50)
print("SMART TRAFFIC CONTROL SYSTEM")
print("=" * 50)
print("Controls:")
print("  'c' - Capture current road")
print("  'q' - Quit system")
print("=" * 50)
print()

road_index = 0
ambulance_counts = [0] * NUM_ROADS
traffic_counts = [0] * NUM_ROADS

def calculate_priority_score(ambulances, cars):
    """
    Calculate priority score for a road.
    Ambulances have exponentially higher priority.
    """
    return (ambulances * AMBULANCE_WEIGHT) + (cars * CAR_WEIGHT)

def calculate_green_time(ambulances, cars):
    """
    Calculate optimal green light duration.
    More time for roads with ambulances and heavy traffic.
    """
    # Base time calculation
    total_vehicles = ambulances + cars
    
    # Ambulances get extra time to clear intersection safely
    ambulance_bonus = ambulances * 5
    
    # Calculate total time
    time_needed = (total_vehicles * TIME_PER_VEHICLE) + ambulance_bonus
    
    # Ensure minimum green time
    return max(time_needed, MIN_GREEN_TIME)

def send_to_website(priority_order, green_times):
    """Send traffic data to website via HTTP POST"""
    try:
        data = {
            "priorityOrder": ",".join(map(str, priority_order)),
            "greenTimes": ",".join(map(str, green_times))
        }
        
        # Try to send to website
        response = requests.post(
            WEBSITE_URL, 
            json=data,
            timeout=2
        )
        
        if response.status_code == 200:
            print(f"üì§ Sent to website: {data}")
        else:
            print(f"‚ö† Website responded with status {response.status_code}")
            
    except requests.exceptions.ConnectionError:
        print("‚ö† Website not reachable (is it running?)")
    except Exception as e:
        print(f"‚ö† Could not send to website: {e}")

def process_traffic_priority():
    """
    Process all roads and determine optimal traffic light sequence.
    """
    print("\n" + "=" * 50)
    print("PROCESSING TRAFFIC PRIORITY")
    print("=" * 50)
    
    # Calculate priority scores for each road
    priorities = []
    for i in range(NUM_ROADS):
        score = calculate_priority_score(ambulance_counts[i], traffic_counts[i])
        priorities.append({
            'road': i,
            'score': score,
            'ambulances': ambulance_counts[i],
            'cars': traffic_counts[i]
        })
    
    # Sort by priority (highest first)
    priorities.sort(key=lambda x: x['score'], reverse=True)
    
    # Display priority order
    print("\nPriority Order:")
    for idx, p in enumerate(priorities, 1):
        status = "üöë EMERGENCY" if p['ambulances'] > 0 else "üöó Normal"
        print(f"  {idx}. Road {p['road']}: {status}")
        print(f"     - Ambulances: {p['ambulances']}")
        print(f"     - Cars: {p['cars']}")
        print(f"     - Priority Score: {p['score']}")
    
    # Extract road order and calculate green times
    road_order = [p['road'] for p in priorities]
    green_times = [
        calculate_green_time(p['ambulances'], p['cars']) 
        for p in priorities
    ]
    
    print("\nGreen Light Schedule:")
    total_cycle_time = 0
    for idx, (road, gt) in enumerate(zip(road_order, green_times), 1):
        amb_count = priorities[idx-1]['ambulances']
        car_count = priorities[idx-1]['cars']
        print(f"  {idx}. Road {road}: {gt}s (A:{amb_count}, C:{car_count})")
        total_cycle_time += gt
    
    print(f"\nTotal Cycle Time: {total_cycle_time}s")
    print("=" * 50)
    
    return road_order, green_times

# =======================
# MAIN LOOP
# =======================
try:
    while True:
        ret, frame = cap.read()
        if not ret:
            print("‚ö† Warning: Failed to read frame")
            time.sleep(0.1)
            continue
        
        # Run detection
        results = model(frame)
        detections = results.xyxy[0]
        
        ambulance_count = 0
        traffic_count = 0
        
        if len(detections) > 0:
            cls = detections[:, 5].cpu().numpy().astype(int)
            ambulance_count = int((cls == 15).sum())  # Class 15 = ambulance
            traffic_count = int((cls == 0).sum())     # Class 0 = car
        
        # Render annotated frame
        annotated = results.render()[0].copy()
        
        # Add UI elements
        cv2.putText(annotated, f"Road {road_index}", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 0), 2)
        cv2.putText(annotated, f"Ambulances: {ambulance_count}", (10, 65),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
        cv2.putText(annotated, f"Cars: {traffic_count}", (10, 100),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
        
        # Priority indicator
        priority_score = calculate_priority_score(ambulance_count, traffic_count)
        cv2.putText(annotated, f"Priority: {priority_score}", (10, 135),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 165, 0), 2)
        
        cv2.imshow("Live Detection", annotated)
        
        key = cv2.waitKey(1) & 0xFF
        
        # -----------------------
        # CAPTURE ROAD
        # -----------------------
        if key == ord('c'):
            ambulance_counts[road_index] = ambulance_count
            traffic_counts[road_index] = traffic_count
            
            print(f"\n‚úì Captured Road {road_index}:")
            print(f"  Ambulances: {ambulance_count}")
            print(f"  Cars: {traffic_count}")
            print(f"  Priority Score: {priority_score}")
            
            road_index += 1
            time.sleep(0.4)
            
            # -----------------------
            # PROCESS ALL 4 ROADS
            # -----------------------
            if road_index == NUM_ROADS:
                # Calculate priority-based sequence
                road_order, green_times = process_traffic_priority()
                
                # Format message for Arduino
                msg = (
                    ",".join(map(str, road_order)) +
                    "&" +
                    ",".join(map(str, green_times)) +
                    "\n"
                )
                
                # Send to Arduino
                if arduino:
                    try:
                        arduino.write(msg.encode())
                        arduino.flush()
                        print(f"\nüì° Sent to Arduino: {msg.strip()}")
                    except Exception as e:
                        print(f"‚ùå Error sending to Arduino: {e}")
                else:
                    print(f"\n‚ö† Arduino not connected. Data: {msg.strip()}")
                
                # Send to Website
                send_to_website(road_order, green_times)
                
                # Reset for next cycle
                road_index = 0
                ambulance_counts = [0] * NUM_ROADS
                traffic_counts = [0] * NUM_ROADS
                
                print("\n‚úì Cycle complete. Ready for next detection cycle.\n")
        
        elif key == ord('q'):
            print("\nüëã Shutting down system...")
            break

except KeyboardInterrupt:
    print("\n\n‚ö† Interrupted by user")
except Exception as e:
    print(f"\n‚ùå Error: {e}")
    import traceback
    traceback.print_exc()
finally:
    # =======================
    # CLEANUP
    # =======================
    print("\nCleaning up resources...")
    cap.release()
    cv2.destroyAllWindows()
    if arduino:
        arduino.close()
    print("‚úì System shutdown complete.")

Loading YOLOv5 model...


YOLOv5 üöÄ v7.0-450-g781b9d57 Python-3.13.7 torch-2.9.1 CPU

Fusing layers... 
Model summary: 157 layers, 7012822 parameters, 0 gradients, 15.8 GFLOPs
Adding AutoShape... 


‚úì Model loaded successfully
Connecting to Arduino...
‚ùå Error connecting to Arduino: [Errno 2] could not open port /dev/cu.usbmodem11201: [Errno 2] No such file or directory: '/dev/cu.usbmodem11201'
‚ö† Continuing without Arduino (website-only mode)
Connecting to camera...
‚úì Camera connected successfully

SMART TRAFFIC CONTROL SYSTEM
Controls:
  'c' - Capture current road
  'q' - Quit system



  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with a



  with amp.autocast(autocast):




: 