# üö¶ YOLO Detection Server V14 - PARALLEL PIPELINE
### Async Pipeline: Reader ‚Üí Detector ‚Üí Sender
### No blocking, maximum throughput

In [None]:
# ‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
# Cell 1: CONFIGURATION
# ‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê

BACKEND_HTTP_URL = 'https://teens-relief-performed-blank.trycloudflare.com'
BACKEND_WS_URL = 'wss://kevin-revolution-fish-foster.trycloudflare.com'
NMS_URL = 'https://designer-supply-later-landing.trycloudflare.com'

TRACK_LINE_Y = 50
CONFIDENCE = 0.4
IMGSZ = 416
USE_HALF = True

# Queue sizes (smaller = lower latency, larger = smoother)
FRAME_QUEUE_SIZE = 2    # Frames waiting for detection
RESULT_QUEUE_SIZE = 5   # Results waiting to send

print(f"üì° HTTP: {BACKEND_HTTP_URL}")
print(f"üì° WS: {BACKEND_WS_URL}")
print(f"üì∫ NMS: {NMS_URL}")

In [None]:
# Cell 2: Install Dependencies
!pip uninstall -y numpy pillow ultralytics > /dev/null 2>&1
!pip install "numpy<2.0.0" "pillow>=10.3.0" ultralytics opencv-python-headless requests websocket-client --upgrade --quiet
!wget -nc -q https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo11n.pt

import numpy, PIL
print(f'‚úÖ Deps ready')

In [None]:
# Cell 3: Load Model
import warnings
warnings.filterwarnings('ignore')
import torch
from ultralytics import YOLO

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'üöÄ Device: {device}')

vehicle_model = YOLO('yolo11n.pt').to(device)
if USE_HALF and device == 'cuda':
    vehicle_model.model.half()
print('‚úÖ Model loaded (yolo11n, FP16)')

# Warmup
import numpy as np
dummy = np.zeros((480, 640, 3), dtype=np.uint8)
for _ in range(5):
    vehicle_model(dummy, imgsz=IMGSZ, verbose=False)
print('‚úÖ Warmup done')

In [None]:
# Cell 4: Detection Function
import cv2, time, threading, queue, json
from datetime import datetime
from collections import deque

VEHICLE_CLASSES = ['car', 'truck', 'bus', 'motorcycle', 'bicycle']
camera_trackers = {}

def log(msg): print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")

def detect_frame(frame, camera_id):
    h, w = frame.shape[:2]
    result = {
        'camera_id': camera_id,
        'created_at': int(time.time() * 1000),
        'image_dimensions': {'width': w, 'height': h},
        'track_line_y': TRACK_LINE_Y
    }
    
    if camera_id not in camera_trackers:
        camera_trackers[camera_id] = {
            'tracks': {}, 'counted': {},
            'counts_up': {v:0 for v in VEHICLE_CLASSES},
            'counts_down': {v:0 for v in VEHICLE_CLASSES},
            'total_up': 0, 'total_down': 0
        }
    tr = camera_trackers[camera_id]
    
    t0 = time.time()
    dets, tracks, vcounts, crossings = [], {}, {v:0 for v in VEHICLE_CLASSES}, []
    ly = int(h * TRACK_LINE_Y / 100)
    
    for r in vehicle_model.track(frame, persist=True, verbose=False, imgsz=IMGSZ, conf=CONFIDENCE, half=USE_HALF):
        for b in r.boxes:
            cls = vehicle_model.names[int(b.cls[0])]
            if cls not in VEHICLE_CLASSES: continue
            x1,y1,x2,y2 = map(int, b.xyxy[0])
            cx, cy = (x1+x2)//2, (y1+y2)//2
            det = {
                'class': cls, 'type': 'vehicle', 'confidence': float(b.conf[0]),
                'bbox': {'x1': x1/w, 'y1': y1/h, 'x2': x2/w, 'y2': y2/h},
                'bbox_pixels': [x1, y1, x2, y2]
            }
            if hasattr(b,'id') and b.id is not None:
                tid = int(b.id[0]); det['id'] = tid
                tracks[tid] = {'pos': (cx,cy), 'time': result['created_at'], 'class': cls}
            dets.append(det); vcounts[cls] += 1
    
    for tid, info in tracks.items():
        if tid not in tr['tracks']: tr['tracks'][tid] = []
        if tr['tracks'][tid]:
            py, cy = tr['tracks'][tid][-1]['pos'][1], info['pos'][1]
            d = 1 if py <= ly < cy else (-1 if py >= ly > cy else 0)
            if d and f"{tid}_{d}" not in tr['counted']:
                tr['counted'][f"{tid}_{d}"] = True
                if d == 1: tr['counts_down'][info['class']] += 1; tr['total_down'] += 1
                else: tr['counts_up'][info['class']] += 1; tr['total_up'] += 1
                crossings.append({'id': tid, 'direction': d})
        tr['tracks'][tid].append({'pos': info['pos'], 'time': info['time'], 'class': info['class']})
        tr['tracks'][tid] = tr['tracks'][tid][-10:]
    
    result['vehicle'] = {
        'detections': dets, 
        'inference_time': (time.time()-t0)*1000,
        'vehicle_count': {
            'total_up': tr['total_up'], 'total_down': tr['total_down'],
            'by_type_up': tr['counts_up'].copy(), 'by_type_down': tr['counts_down'].copy(),
            'current': vcounts
        },
        'new_crossings': crossings
    }
    return result

print('‚úÖ Detection ready')

In [None]:
# Cell 5: WebSocket Client (Non-blocking)
import websocket

class AsyncWebSocket:
    def __init__(self, url, camera_id, api_key):
        self.url = f"{url}?cameraId={camera_id}&apiKey={api_key}"
        self.camera_id = camera_id
        self.ws = None
        self.connected = False
        self.send_queue = queue.Queue(maxsize=RESULT_QUEUE_SIZE)
        self._stop = False

    def connect(self):
        try:
            self.ws = websocket.WebSocketApp(
                self.url,
                on_open=self._on_open,
                on_close=lambda ws,c,m: setattr(self, 'connected', False),
                on_error=lambda ws,e: None
            )
            # WebSocket run thread
            threading.Thread(target=self.ws.run_forever, daemon=True).start()
            # Sender thread
            threading.Thread(target=self._sender_loop, daemon=True).start()
            time.sleep(1)
            return self.connected
        except: return False
    
    def _on_open(self, ws):
        self.connected = True
        log(f'‚úÖ [{self.camera_id[-4:]}] WS Connected')
    
    def _sender_loop(self):
        """Sender thread - sends results from queue without blocking"""
        while not self._stop:
            try:
                result = self.send_queue.get(timeout=1)
                if self.connected and self.ws:
                    self.ws.send(json.dumps(result))
            except queue.Empty:
                continue
            except Exception as e:
                self.connected = False
    
    def send_async(self, data):
        """Non-blocking send - puts data in queue"""
        try:
            self.send_queue.put_nowait(data)
            return True
        except queue.Full:
            # Drop oldest if queue full (keep latest)
            try:
                self.send_queue.get_nowait()
                self.send_queue.put_nowait(data)
            except: pass
            return False

print('‚úÖ Async WebSocket ready')

In [None]:
# Cell 6: Parallel Pipeline
import cv2, time, requests, threading, queue
from collections import deque

camera_stats = {}
camera_keys = {}

def fetch_cameras():
    try:
        resp = requests.get(f"{BACKEND_HTTP_URL}/api/camera/all", timeout=10)
        if resp.status_code == 200:
            data = resp.json()
            cams = data.get('metadata', data) if isinstance(data, dict) else data
            for cam in cams:
                if isinstance(cam, dict) and cam.get('_id'):
                    camera_keys[cam['_id']] = cam.get('camera_api_key', '')
            return list(camera_keys.keys())
    except Exception as e: log(f'‚ùå {e}')
    return []

class CameraPipeline:
    """Parallel pipeline: Reader ‚Üí Detector ‚Üí Sender"""
    
    def __init__(self, camera_id, api_key):
        self.camera_id = camera_id
        self.flv_url = f"{NMS_URL}/live/{camera_id}.flv"
        self.frame_queue = queue.Queue(maxsize=FRAME_QUEUE_SIZE)
        self.ws = AsyncWebSocket(BACKEND_WS_URL, camera_id, api_key)
        self._stop = False
        self.stats = {'frames_read': 0, 'frames_detected': 0, 'frames_sent': 0, 'fps': 0}
        self.fps_times = deque(maxlen=30)
    
    def start(self):
        # Connect WebSocket
        if not self.ws.connect():
            log(f'‚ùå [{self.camera_id[-4:]}] WS Failed')
            return
        
        # Start threads
        threading.Thread(target=self._reader_loop, daemon=True).start()
        threading.Thread(target=self._detector_loop, daemon=True).start()
        log(f'üöÄ [{self.camera_id[-4:]}] Pipeline started')
    
    def _reader_loop(self):
        """Thread 1: Read frames continuously"""
        while not self._stop:
            try:
                cap = cv2.VideoCapture(self.flv_url)
                if not cap.isOpened():
                    time.sleep(2)
                    continue
                log(f'üé• [{self.camera_id[-4:]}] Stream connected')
                
                while cap.isOpened() and not self._stop:
                    ret, frame = cap.read()
                    if not ret: break
                    
                    self.stats['frames_read'] += 1
                    
                    # Put frame in queue (non-blocking)
                    try:
                        self.frame_queue.put_nowait(frame)
                    except queue.Full:
                        # Drop oldest frame if queue full (keep latest)
                        try:
                            self.frame_queue.get_nowait()
                            self.frame_queue.put_nowait(frame)
                        except: pass
                
                cap.release()
            except Exception as e:
                log(f'‚ùå [{self.camera_id[-4:]}] Reader: {e}')
                time.sleep(2)
    
    def _detector_loop(self):
        """Thread 2: Detect and send results"""
        while not self._stop:
            try:
                # Get frame (blocking with timeout)
                frame = self.frame_queue.get(timeout=1)
                
                # Detect
                result = detect_frame(frame, self.camera_id)
                self.stats['frames_detected'] += 1
                
                # Send async (non-blocking)
                if self.ws.send_async(result):
                    self.stats['frames_sent'] += 1
                
                # Calculate FPS
                now = time.time()
                self.fps_times.append(now)
                if len(self.fps_times) > 1:
                    self.stats['fps'] = round(len(self.fps_times) / (self.fps_times[-1] - self.fps_times[0]), 1)
                
                # Log every 100 detections
                if self.stats['frames_detected'] % 100 == 0:
                    inf = result['vehicle'].get('inference_time', 0)
                    det = len(result['vehicle']['detections'])
                    log(f"[{self.camera_id[-4:]}] {self.stats['fps']} FPS | {inf:.0f}ms | {det} det | Q:{self.frame_queue.qsize()}")
                
            except queue.Empty:
                continue
            except Exception as e:
                log(f'‚ùå [{self.camera_id[-4:]}] Detector: {e}')

# Main
log('üîç Fetching cameras...')
cameras = fetch_cameras()
if not cameras:
    log('‚ùå No cameras!')
else:
    log(f'‚úÖ {len(cameras)} cameras')
    pipelines = []
    for cid in cameras:
        p = CameraPipeline(cid, camera_keys.get(cid, ''))
        p.start()
        pipelines.append(p)
    
    log('üöÄ PARALLEL PIPELINE RUNNING!')
    
    # Keep main thread alive
    try:
        while True:
            time.sleep(60)
            for p in pipelines:
                log(f"üìä [{p.camera_id[-4:]}] Read:{p.stats['frames_read']} Det:{p.stats['frames_detected']} Sent:{p.stats['frames_sent']} FPS:{p.stats['fps']}")
    except KeyboardInterrupt:
        log('Stopping...')