# to esp32

In [1]:
from inference_sdk import InferenceHTTPClient
import cv2
import requests

# Roboflow client
client = InferenceHTTPClient(
    api_url="https://serverless.roboflow.com",
    api_key="rygRYzDh9yKhVmfOoeku"
)

MODEL1 = "ambulance-detection-ih8jn/2"
MODEL2 = "toy-car-4j56o/1"

ESP32_IP = "192.168.100.110"

cap = cv2.VideoCapture(1)

def send_to_esp32(cars, ambulance):
    try:
        url = f"http://{ESP32_IP}/update?cars={cars}&amb={int(ambulance)}"
        requests.get(url, timeout=0.25)
    except:
        pass

def inside(box, x, y):
    return box[0]<x<box[2] and box[1]<y<box[3]

while True:
    ret, frame = cap.read()
    if not ret:
        break
    cv2.imwrite("frame.jpg", frame)

    result_amb = client.infer("frame.jpg", model_id=MODEL1)
    result_car = client.infer("frame.jpg", model_id=MODEL2)

    ambulance_boxes=[]
    car_count=0
    ambulance_count=0

    for p in result_amb.get("predictions",[]):
        ambulance_count+=1
        x,y,w,h=p['x'],p['y'],p['width'],p['height']
        x1,y1=int(x-w/2),int(y-h/2)
        x2,y2=int(x+w/2),int(y+h/2)
        ambulance_boxes.append((x1,y1,x2,y2))
        cv2.rectangle(frame,(x1,y1),(x2,y2),(0,255,0),2)
        cv2.putText(frame,"AMBULANCE",(x1,y1-10),cv2.FONT_HERSHEY_SIMPLEX,0.6,(0,255,0),2)

    for p in result_car.get("predictions",[]):
        cx,cy=int(p['x']),int(p['y'])
        skip=False
        for amb in ambulance_boxes:
            if inside(amb,cx,cy): skip=True; break
        if skip: continue
        car_count+=1
        cx1,cy1=int(p['x']-p['width']/2),int(p['y']-p['height']/2)
        cx2,cy2=int(p['x']+p['width']/2),int(p['y']+p['height']/2)
        cv2.rectangle(frame,(cx1,cy1),(cx2,cy2),(255,0,0),2)
        cv2.putText(frame,"CAR",(cx1,cy1-10),cv2.FONT_HERSHEY_SIMPLEX,0.6,(255,0,0),2)

    cv2.putText(frame,f"Cars: {car_count}",(20,40),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2)
    cv2.putText(frame,f"Ambulance: {ambulance_count}",(20,80),cv2.FONT_HERSHEY_SIMPLEX,0.8,(0,255,0),2)

    send_to_esp32(car_count, ambulance_count>0)

    cv2.imshow("Realtime Detection", frame)
    if cv2.waitKey(1)&0xFF==ord('q'): break

cap.release()
cv2.destroyAllWindows()




KeyboardInterrupt: 

# faster

In [None]:
import cv2
import requests
import time
import threading
import queue
from inference_sdk import InferenceHTTPClient

# Roboflow client
client = InferenceHTTPClient(
    api_url="https://serverless.roboflow.com",
    api_key="rygRYzDh9yKhVmfOoeku"
)

MODEL1 = "ambulance-detection-ih8jn/2"
MODEL2 = "toy-car-4j56o/1"
ESP32_IP = "192.168.100.110"

FRAME_WIDTH = 640
FRAME_HEIGHT = 480
DETECT_EVERY = 3        # كل كام فريم نعمل inference
SEND_INTERVAL = 0.25    # ابعت ESP32 كل 0.25 ثانية

# Queues
frame_queue = queue.Queue(maxsize=5)
result_queue = queue.Queue(maxsize=5)

# Shared variables
last_send_time = 0
frame_id = 0

def inside(box, x, y):
    return box[0] < x < box[2] and box[1] < y < box[3]

# ---------------- Camera Thread ----------------
def camera_thread():
    cap = cv2.VideoCapture(1)
    global frame_id
    while True:
        ret, frame = cap.read()
        if not ret:
            continue
        small = cv2.resize(frame, (FRAME_WIDTH, FRAME_HEIGHT))
        frame_id += 1
        if frame_queue.full():
            try: frame_queue.get_nowait()
            except: pass
        frame_queue.put((frame_id, small))
        time.sleep(0.01)  # يقلل الضغط على CPU
    cap.release()

# ---------------- Inference Thread ----------------
def inference_thread():
    while True:
        try:
            frame_id_local, frame = frame_queue.get()
            if frame_id_local % DETECT_EVERY != 0:
                continue

            # Ambulance
            result_amb = client.infer(frame, model_id=MODEL1)
            ambulance_boxes = []
            ambulance_count = 0
            for p in result_amb.get("predictions", []):
                ambulance_count += 1
                x, y, w, h = p['x'], p['y'], p['width'], p['height']
                x1, y1 = int(x - w/2), int(y - h/2)
                x2, y2 = int(x + w/2), int(y + h/2)
                ambulance_boxes.append((x1, y1, x2, y2))

            # Cars
            result_car = client.infer(frame, model_id=MODEL2)
            car_count = 0
            for p in result_car.get("predictions", []):
                cx, cy = int(p['x']), int(p['y'])
                skip = any(inside(box, cx, cy) for box in ambulance_boxes)
                if skip: 
                    continue
                car_count += 1

            # وضع النتائج في الـ Queue
            if result_queue.full():
                try: result_queue.get_nowait()
                except: pass
            result_queue.put((frame, car_count, ambulance_count, ambulance_boxes))
        except Exception as e:
            print("Inference Error:", e)
        time.sleep(0.01)

# ---------------- ESP32 Thread ----------------
def esp32_thread():
    global last_send_time
    while True:
        if not result_queue.empty():
            try:
                _, car_count, ambulance_count, _ = result_queue.queue[-1]  # اخر نتيجة
                now = time.time()
                if now - last_send_time > SEND_INTERVAL:
                    last_send_time = now
                    url = f"http://{ESP32_IP}/update?cars={car_count}&amb={int(ambulance_count>0)}"
                    try:
                        requests.get(url, timeout=0.15)
                    except:
                        pass
            except Exception as e:
                print("ESP32 Error:", e)
        time.sleep(0.01)

# ---------------- Display Thread ----------------
def display_thread():
    while True:
        if not result_queue.empty():
            try:
                frame, car_count, ambulance_count, ambulance_boxes = result_queue.queue[-1]
                # رسم المربعات
                for x1, y1, x2, y2 in ambulance_boxes:
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
                    cv2.putText(frame, "AMBULANCE", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)

                cv2.putText(frame, f"Cars: {car_count}", (20,40), cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2)
                cv2.putText(frame, f"Ambulance: {ambulance_count}", (20,80), cv2.FONT_HERSHEY_SIMPLEX,0.8,(0,255,0),2)
                cv2.imshow("Realtime Detection (Stable)", frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            except Exception as e:
                print("Display Error:", e)
    cv2.destroyAllWindows()

# ---------------- Main ----------------
threads = [
    threading.Thread(target=camera_thread, daemon=True),
    threading.Thread(target=inference_thread, daemon=True),
    threading.Thread(target=esp32_thread, daemon=True),
    threading.Thread(target=display_thread, daemon=True)
]

for t in threads:
    t.start()

# Keep main alive
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Exiting...")




Exiting...
