In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Multi-Camera Server (ImageZMQ)
- Thread A: Nhận ảnh từ camera, lưu frame mới nhất
- Thread B: Nhận diện sản phẩm từ hàng đợi
- Thread C: Hiển thị frame mới nhất theo camera

Yêu cầu:
  pip install imagezmq opencv-python numpy
"""

import os
import cv2
import time
import queue
import threading
import numpy as np
import imagezmq
from datetime import datetime
from collections import defaultdict

# ========== Cấu hình ==========
PORT = int(os.getenv("PORT", 5555))
SAVE_DIR = os.getenv("SAVE_DIR", "detections")
INFER_INTERVAL = float(os.getenv("INFER_INTERVAL", 0.15))  # giãn cách giữa các lần inference
NUM_WORKERS = int(os.getenv("NUM_WORKERS", 1))
os.makedirs(SAVE_DIR, exist_ok=True)

# ========== Bộ nhớ chia sẻ ==========
latest_frames = {}  # {cam_id: frame}
latest_lock = threading.Lock()
last_infer_ts = defaultdict(lambda: 0.0)
infer_queue = queue.Queue(maxsize=32)

# ========== Hàm nhận diện (thay bằng model thật) ==========
def detect_products(frame_bgr):
    #  MODEL
    return []  

def draw_boxes(frame, boxes):
    for x1, y1, x2, y2, label, score in boxes:
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        text = f"{label}:{score:.2f}"
        cv2.putText(frame, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)
    return frame

def save_frame(cam_id, frame, boxes):
    if not boxes:
        return
    ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
    path = os.path.join(SAVE_DIR, f"{cam_id}_{ts}.jpg")
    cv2.imwrite(path, frame)

# ========== Thread A: Nhận ảnh ==========
def receiver():
    hub = imagezmq.ImageHub(open_port=f"tcp://*:{PORT}")
    print(f"[Receiver] Listening on tcp://*:{PORT}")

    while True:
        try:
            cam_name, jpg_buffer = hub.recv_jpg()
            hub.send_reply(b'OK')
        except Exception as e:
            print(f"[Receiver] Error: {e}")
            time.sleep(0.2)
            continue

        # cam_id, tag = cam_name.split(":", 1) if ":" in cam_name else (cam_name, "unknown")
        cam_id = cam_name
        np_arr = np.frombuffer(jpg_buffer, dtype=np.uint8)
        frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
        if frame is None:
            continue

        with latest_lock:
            # latest_frames[cam_id] = (frame, tag, time.time())
            latest_frames[cam_id] = frame

        # Đưa vào hàng đợi nếu đủ thời gian
        now = time.time()
        if now - last_infer_ts[cam_id] >= INFER_INTERVAL:
            last_infer_ts[cam_id] = now
            if infer_queue.full():
                try:
                    infer_queue.get_nowait()
                    infer_queue.task_done()
                except queue.Empty:
                    pass
            infer_queue.put((cam_id, frame.copy()))

# ========== Thread B: Nhận diện ==========
def inference(worker_id=0):
    while True:
        cam_id, frame = infer_queue.get()
        try:
            boxes = detect_products(frame)
            annotated = draw_boxes(frame.copy(), boxes)
            save_frame(cam_id, annotated, boxes)

            with latest_lock:
                if cam_id in latest_frames:
                    # _, tag, ts = latest_frames[cam_id]
                    # latest_frames[cam_id] = (annotated, tag, ts)
                    latest_frames[cam_id] = annotated
        except Exception as e:
            print(f"[Inference-{worker_id}] Error: {e}")
        finally:
            infer_queue.task_done()

# ========== Thread C: Hiển thị ==========
def display():
    while True:
        with latest_lock:
            # for cam_id, (frame, tag, ts) in latest_frames.items():
            for cam_id, frame in latest_frames.items():
                cv2.imshow(f"Live - {cam_id}", frame)


        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    cv2.destroyAllWindows()

# ========== Main ==========
def main():
    # A: receiver
    threading.Thread(target=receiver, daemon=True).start()

    # B: inference workers
    for i in range(NUM_WORKERS):
        threading.Thread(target=inference, args=(i,), daemon=True).start()

    # C: hiển thị từ main thread
    print("[Server] Running. Press 'q' to quit.")

    while True:
        with latest_lock:
            # for cam_id, (frame, tag, ts) in latest_frames.items():
            for cam_id, frame in latest_frames.items():
                win = f"Live - {cam_id}"
                cv2.imshow(win, frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


In [None]:
"""
Multi-Camera Server (ImageZMQ) - Multiprocessing Inference
"""

import os
import cv2
import time
import queue
import threading
import numpy as np
import imagezmq
from datetime import datetime
from collections import defaultdict
from multiprocessing import Process, Queue, Manager

# ========== Cấu hình ==========
PORT = int(os.getenv("PORT", 5555))
SAVE_DIR = os.getenv("SAVE_DIR", "detections")
INFER_INTERVAL = float(os.getenv("INFER_INTERVAL", 0.15))  # giãn cách giữa các lần inference
NUM_WORKERS = int(os.getenv("NUM_WORKERS", 2))
os.makedirs(SAVE_DIR, exist_ok=True)

# ========== Hàm nhận diện (thay bằng model thật) ==========
def detect_products(frame_bgr):
    # Dummy model: trả về 1 box giả lập
    h, w = frame_bgr.shape[:2]
    return [(w//4, h//4, w//2, h//2, "product", 0.9)]


def draw_boxes(frame, boxes):
    for x1, y1, x2, y2, label, score in boxes:
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        text = f"{label}:{score:.2f}"
        cv2.putText(frame, text, (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
    return frame


def save_frame(cam_id, frame, boxes):
    if not boxes:
        return
    ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
    path = os.path.join(SAVE_DIR, f"{cam_id}_{ts}.jpg")
    cv2.imwrite(path, frame)


# ========== Thread A: Nhận ảnh ==========
def receiver(latest_frames, latest_lock, last_infer_ts, in_queue):
    hub = imagezmq.ImageHub(open_port=f"tcp://*:{PORT}")
    print(f"[Receiver] Listening on tcp://*:{PORT}")

    while True:
        try:
            cam_name, jpg_buffer = hub.recv_jpg()
            hub.send_reply(b'OK')
        except Exception as e:
            print(f"[Receiver] Error: {e}")
            time.sleep(0.2)
            continue

        cam_id = cam_name
        np_arr = np.frombuffer(jpg_buffer, dtype=np.uint8)
        frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
        if frame is None:
            continue

        with latest_lock:
            latest_frames[cam_id] = frame

        # Đưa frame vào hàng đợi nếu đủ thời gian
        now = time.time()
        if now - last_infer_ts[cam_id] >= INFER_INTERVAL:
            last_infer_ts[cam_id] = now
            if in_queue.full():
                try:
                    in_queue.get_nowait()
                except queue.Empty:
                    pass
            in_queue.put((cam_id, frame.copy()))


# ========== Worker Process: Nhận diện ==========
def worker_process(worker_id, in_q, out_q):
    print(f"[Worker-{worker_id}] Started")
    while True:
        cam_id, frame = in_q.get()
        try:
            boxes = detect_products(frame)
            annotated = draw_boxes(frame.copy(), boxes)
            save_frame(cam_id, annotated, boxes)
            out_q.put((cam_id, annotated))
        except Exception as e:
            print(f"[Worker-{worker_id}] Error: {e}")


# ========== Thread B: Nhận kết quả từ worker ==========
def result_collector(latest_frames, latest_lock, out_queue):
    while True:
        cam_id, annotated = out_queue.get()
        with latest_lock:
            latest_frames[cam_id] = annotated


# ========== Main ==========
def main():
    # ========== Bộ nhớ chia sẻ và Queues ==========
    manager = Manager()
    latest_frames = manager.dict()  # {cam_id: frame}
    latest_lock = threading.Lock()
    last_infer_ts = defaultdict(lambda: 0.0)
    in_queue = Queue(maxsize=32)
    out_queue = Queue(maxsize=32)

    # A: receiver thread
    receiver_args = (latest_frames, latest_lock, last_infer_ts, in_queue)
    threading.Thread(target=receiver, args=receiver_args, daemon=True).start()

    # B: collector thread
    collector_args = (latest_frames, latest_lock, out_queue)
    threading.Thread(target=result_collector, args=collector_args, daemon=True).start()

    # C: inference workers (processes)
    workers = []
    for i in range(NUM_WORKERS):
        p = Process(target=worker_process, args=(i, in_queue, out_queue), daemon=True)
        p.start()
        workers.append(p)

    # D: hiển thị từ main process
    print("[Server] Running. Press 'q' to quit.")
    while True:
        items = []
        with latest_lock:
            # Tạo bản sao để tránh lỗi lặp trong khi thay đổi kích thước
            items = list(latest_frames.items())

        for cam_id, frame in items:
            win = f"Live - {cam_id}"
            cv2.imshow(win, frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cv2.destroyAllWindows()
    for p in workers:
        p.terminate()


if __name__ == "__main__":
    main()

In [None]:
"""
Multi-Camera Server (ImageZMQ) - Multiprocessing Inference
"""

import os
import cv2
import time
import queue
import random
import threading
import numpy as np
import imagezmq
from datetime import datetime
from collections import defaultdict
from multiprocessing import Process, Queue, Manager

# ========== Cấu hình ==========
PORT = int(os.getenv("PORT", 5555))
SAVE_DIR = os.getenv("SAVE_DIR", "detections")
INFER_INTERVAL = float(os.getenv("INFER_INTERVAL", 0.15))  # giãn cách giữa các lần inference
NUM_WORKERS = int(os.getenv("NUM_WORKERS", 2))
os.makedirs(SAVE_DIR, exist_ok=True)

# ========== Hàm nhận diện (thay bằng model thật) ==========
def detect_products(frame_rgb):
    # Giả lập model: 50% có kết quả, 50% không có
    if random.random() > 0.5:
        h, w = frame_rgb.shape[:2]
        return [(w//4, h//4, w//2, h//2, "product", 0.9)]
    else:
        return []  # Trả về danh sách rỗng khi không phát hiện gì


def draw_boxes(frame, boxes):
    for x1, y1, x2, y2, label, score in boxes:
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        text = f"{label}:{score:.2f}"
        cv2.putText(frame, text, (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
    return frame


def save_frame(cam_id, frame, boxes):
    if not boxes:
        return
    ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
    path = os.path.join(SAVE_DIR, f"{cam_id}_{ts}.jpg")
    cv2.imwrite(path, frame)


# ========== Thread A: Nhận ảnh ==========
def receiver(latest_frames, latest_lock, last_infer_ts, in_queue):
    hub = imagezmq.ImageHub(open_port=f"tcp://*:{PORT}")
    print(f"[Receiver] Listening on tcp://*:{PORT}")

    while True:
        try:
            cam_name, jpg_buffer = hub.recv_jpg()
            hub.send_reply(b'OK')
        except Exception as e:
            print(f"[Receiver] Error: {e}")
            time.sleep(0.2)
            continue

        cam_id = cam_name
        np_arr = np.frombuffer(jpg_buffer, dtype=np.uint8)
        frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
        if frame is None:
            continue

        with latest_lock:
            latest_frames[cam_id] = frame

        # Đưa frame vào hàng đợi nếu đủ thời gian
        now = time.time()
        if now - last_infer_ts[cam_id] >= INFER_INTERVAL:
            last_infer_ts[cam_id] = now
            if in_queue.full():
                try:
                    in_queue.get_nowait()
                except queue.Empty:
                    pass
            in_queue.put((cam_id, frame.copy()))


# ========== Worker Process: Nhận diện ==========
def worker_process(worker_id, in_q, out_q):
    print(f"[Worker-{worker_id}] Started")
    while True:
        cam_id, frame = in_q.get()
        try:
            # Worker thực hiện nhận diện
            boxes = detect_products(frame)

            # Worker cũng thực hiện vẽ và lưu ảnh để giảm tải cho luồng chính
            if boxes:
                annotated = draw_boxes(frame.copy(), boxes)
                save_frame(cam_id, annotated, boxes)

            # Chỉ gửi lại kết quả (nhẹ) cho luồng chính
            out_q.put((cam_id, boxes))
        except Exception as e:
            print(f"[Worker-{worker_id}] Error: {e}")


# ========== Thread B: Nhận kết quả từ worker ==========
def result_collector(latest_boxes, latest_lock, out_queue):
    while True:
        try:
            cam_id, boxes = out_queue.get()
            # Cập nhật boxes mới nhất cho vòng lặp hiển thị
            with latest_lock:
                latest_boxes[cam_id] = boxes
        except Exception as e:
            print(f"[Collector] Error processing queue item: {e}")


# ========== Main ==========
def main():
    # ========== Bộ nhớ chia sẻ và Queues ==========
    manager = Manager()
    latest_frames = manager.dict()
    latest_boxes = manager.dict()  # Để lưu trữ các bounding box mới nhất
    latest_lock = threading.Lock()
    last_infer_ts = defaultdict(lambda: 0.0)
    in_queue = Queue(maxsize=32)
    out_queue = Queue(maxsize=32)

    # A: receiver thread
    receiver_args = (latest_frames, latest_lock, last_infer_ts, in_queue)
    threading.Thread(target=receiver, args=receiver_args, daemon=True).start()

    # B: collector thread
    collector_args = (latest_boxes, latest_lock, out_queue)
    threading.Thread(target=result_collector, args=collector_args, daemon=True).start()

    # C: inference workers (processes)
    workers = []
    for i in range(NUM_WORKERS):
        p = Process(target=worker_process, args=(i, in_queue, out_queue), daemon=True)
        p.start()
        workers.append(p)

    # D: hiển thị từ main process
    print("[Server] Running. Press 'q' to quit.")
    while True:
        items = []
        with latest_lock:
            # Lấy bản sao của các frame mới nhất
            items = list(latest_frames.items())

        for cam_id, frame in items:
            display_frame = frame.copy()
            
            # Lấy các box mới nhất cho camera này
            with latest_lock:
                boxes = latest_boxes.get(cam_id)

            # Vẽ các box nếu có
            if boxes:
                display_frame = draw_boxes(display_frame, boxes)
            win = f"Live - {cam_id}"
            cv2.imshow(win, display_frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cv2.destroyAllWindows()
    for p in workers:
        p.terminate()


if __name__ == "__main__":
    main()


In [None]:
# Nếu chưa có annotated frame thì hiển thị raw frame
# """
# Multi-Camera Server (ImageZMQ) - Multiprocessing Inference (Cách 2)
# """

# import os
# import cv2
# import time
# import queue
# import random
# import threading
# import numpy as np
# import imagezmq
# from datetime import datetime
# from collections import defaultdict
# from multiprocessing import Process, Queue, Manager

# # ========== Cấu hình ==========
# PORT = int(os.getenv("PORT", 5555))
# SAVE_DIR = os.getenv("SAVE_DIR", "detections")
# INFER_INTERVAL = float(os.getenv("INFER_INTERVAL", 0.15))  # giãn cách giữa các lần inference
# NUM_WORKERS = int(os.getenv("NUM_WORKERS", 2))
# os.makedirs(SAVE_DIR, exist_ok=True)


# # ========== Hàm nhận diện (thay bằng model thật) ==========
# def detect_products(frame_rgb):
#     # Giả lập model: 50% có kết quả, 50% không có
#     if random.random() > 0.5:
#         h, w = frame_rgb.shape[:2]
#         return [(w//4, h//4, w//2, h//2, "product", 0.9)]
#     else:
#         return []  # Không phát hiện gì


# def draw_boxes(frame, boxes):
#     for x1, y1, x2, y2, label, score in boxes:
#         cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
#         text = f"{label}:{score:.2f}"
#         cv2.putText(frame, text, (x1, y1 - 10),
#                     cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
#     return frame


# def save_frame(cam_id, frame, boxes):
#     if not boxes:
#         return
#     ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
#     path = os.path.join(SAVE_DIR, f"{cam_id}_{ts}.jpg")
#     cv2.imwrite(path, frame)


# # ========== Thread A: Nhận ảnh ==========
# def receiver(latest_raw, latest_lock, last_infer_ts, in_queue):
#     hub = imagezmq.ImageHub(open_port=f"tcp://*:{PORT}")
#     print(f"[Receiver] Listening on tcp://*:{PORT}")

#     while True:
#         try:
#             cam_name, jpg_buffer = hub.recv_jpg()
#             hub.send_reply(b'OK')
#         except Exception as e:
#             print(f"[Receiver] Error: {e}")
#             time.sleep(0.2)
#             continue

#         cam_id = cam_name
#         np_arr = np.frombuffer(jpg_buffer, dtype=np.uint8)
#         frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
#         if frame is None:
#             continue

#         # Luôn lưu frame raw mới nhất
#         with latest_lock:
#             latest_raw[cam_id] = frame

#         # Đưa frame vào hàng đợi nếu đủ thời gian
#         now = time.time()
#         if now - last_infer_ts[cam_id] >= INFER_INTERVAL:
#             last_infer_ts[cam_id] = now
#             if in_queue.full():
#                 try:
#                     in_queue.get_nowait()
#                 except queue.Empty:
#                     pass
#             in_queue.put((cam_id, frame.copy()))


# # ========== Worker Process: Nhận diện ==========
# def worker_process(worker_id, in_q, out_q):
#     print(f"[Worker-{worker_id}] Started")
#     while True:
#         cam_id, frame = in_q.get()
#         try:
#             boxes = detect_products(frame)
#             annotated = frame.copy()
#             if boxes:
#                 annotated = draw_boxes(annotated, boxes)
#                 save_frame(cam_id, annotated, boxes)

#             # Gửi lại annotated frame cho collector
#             out_q.put((cam_id, annotated))
#         except Exception as e:
#             print(f"[Worker-{worker_id}] Error: {e}")


# # ========== Thread B: Nhận annotated frame từ worker ==========
# def result_collector(latest_annotated, latest_lock, out_queue):
#     while True:
#         try:
#             cam_id, annotated = out_queue.get()
#             with latest_lock:
#                 latest_annotated[cam_id] = annotated
#         except Exception as e:
#             print(f"[Collector] Error processing queue item: {e}")


# # ========== Main ==========
# def main():
#     manager = Manager()
#     latest_raw = manager.dict()        # Frame gốc mới nhất
#     latest_annotated = manager.dict()  # Frame annotated mới nhất
#     latest_lock = threading.Lock()
#     last_infer_ts = defaultdict(lambda: 0.0)
#     in_queue = Queue(maxsize=32)
#     out_queue = Queue(maxsize=32)

#     # A: receiver thread
#     threading.Thread(target=receiver,
#                      args=(latest_raw, latest_lock, last_infer_ts, in_queue),
#                      daemon=True).start()

#     # B: collector thread
#     threading.Thread(target=result_collector,
#                      args=(latest_annotated, latest_lock, out_queue),
#                      daemon=True).start()

#     # C: inference workers
#     workers = []
#     for i in range(NUM_WORKERS):
#         p = Process(target=worker_process, args=(i, in_queue, out_queue), daemon=True)
#         p.start()
#         workers.append(p)

#     # D: hiển thị
#     print("[Server] Running. Press 'q' to quit.")
#     while True:
#         items = []
#         with latest_lock:
#             items = list(latest_raw.items())

#         for cam_id, raw_frame in items:
#             with latest_lock:
#                 annotated = latest_annotated.get(cam_id)

#             # Ưu tiên hiển thị annotated nếu có, fallback về raw
#             display_frame = annotated if annotated is not None else raw_frame

#             win = f"Live - {cam_id}"
#             cv2.imshow(win, display_frame)

#         if cv2.waitKey(1) & 0xFF == ord('q'):
#             break

#     cv2.destroyAllWindows()
#     for p in workers:
#         p.terminate()


# if __name__ == "__main__":
#     main()


In [None]:
#Xử lý đè box cũ lên frame mới (Cách 3)
# #!/usr/bin/env python3
# # -*- coding: utf-8 -*-
# """
# Multi-Camera Server (ImageZMQ)
# - Thread A: Nhận ảnh từ camera, lưu frame mới nhất
# - Thread B: Nhận diện sản phẩm từ hàng đợi
# - Thread C: Hiển thị frame mới nhất theo camera

# Yêu cầu:
#   pip install imagezmq opencv-python numpy
# """

# import os
# import cv2
# import time
# import queue
# import threading
# import numpy as np
# import imagezmq
# from datetime import datetime
# from collections import defaultdict

# # ========== Cấu hình ==========
# PORT = int(os.getenv("PORT", 5555))
# SAVE_DIR = os.getenv("SAVE_DIR", "detections")
# INFER_INTERVAL = float(os.getenv("INFER_INTERVAL", 0.15))  # giãn cách giữa các lần inference
# NUM_WORKERS = int(os.getenv("NUM_WORKERS", 1))
# os.makedirs(SAVE_DIR, exist_ok=True)

# # ========== Bộ nhớ chia sẻ ==========
# latest_frames = {}  # {cam_id: (frame, tag, timestamp)}
# latest_lock = threading.Lock()
# last_infer_ts = defaultdict(lambda: 0.0)
# infer_queue = queue.Queue(maxsize=32)

# # ========== Hàm nhận diện (thay bằng model thật) ==========
# def detect_products(frame_bgr):
#     # TODO: Thay bằng mô hình thật như YOLO, GroundingDINO, OWLv2
#     return []  # Trả về danh sách (x1, y1, x2, y2, label, score)

# def draw_boxes(frame, boxes):
#     for x1, y1, x2, y2, label, score in boxes:
#         cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
#         text = f"{label}:{score:.2f}"
#         cv2.putText(frame, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)
#     return frame

# def save_frame(cam_id, frame, boxes):
#     if not boxes:
#         return
#     ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
#     path = os.path.join(SAVE_DIR, f"{cam_id}_{ts}.jpg")
#     cv2.imwrite(path, frame)

# # ========== Thread A: Nhận ảnh ==========
# def receiver():
#     hub = imagezmq.ImageHub(open_port=f"tcp://*:{PORT}")
#     print(f"[Receiver] Listening on tcp://*:{PORT}")

#     while True:
#         try:
#             cam_name, jpg_buffer = hub.recv_jpg()
#             hub.send_reply(b'OK')
#         except Exception as e:
#             print(f"[Receiver] Error: {e}")
#             time.sleep(0.2)
#             continue

#         cam_id, tag = cam_name.split(":", 1) if ":" in cam_name else (cam_name, "unknown")
#         np_arr = np.frombuffer(jpg_buffer, dtype=np.uint8)
#         frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
#         if frame is None:
#             continue

#         with latest_lock:
#             latest_frames[cam_id] = (frame, tag, time.time())

#         # Đưa vào hàng đợi nếu đủ thời gian
#         now = time.time()
#         if now - last_infer_ts[cam_id] >= INFER_INTERVAL:
#             last_infer_ts[cam_id] = now
#             if infer_queue.full():
#                 try:
#                     infer_queue.get_nowait()
#                     infer_queue.task_done()
#                 except queue.Empty:
#                     pass
#             infer_queue.put((cam_id, frame.copy()))

# # ========== Thread B: Nhận diện ==========
# def inference(worker_id=0):
#     while True:
#         cam_id, frame = infer_queue.get()
#         try:
#             boxes = detect_products(frame)
#             annotated = draw_boxes(frame.copy(), boxes)
#             save_frame(cam_id, annotated, boxes)

#             with latest_lock:
#                 if cam_id in latest_frames:
#                     _, tag, ts = latest_frames[cam_id]
#                     latest_frames[cam_id] = (annotated, tag, ts)
#         except Exception as e:
#             print(f"[Inference-{worker_id}] Error: {e}")
#         finally:
#             infer_queue.task_done()

# # ========== Thread C: Hiển thị ==========
# def display():
#     cv2.namedWindow("Server Info", cv2.WINDOW_NORMAL)
#     while True:
#         info = []
#         with latest_lock:
#             for cam_id, (frame, tag, ts) in latest_frames.items():
#                 cv2.imshow(f"Live - {cam_id}", frame)
#                 age_ms = int((time.time() - ts) * 1000)
#                 info.append(f"{cam_id}: {tag} ({age_ms} ms ago)")

#         # Hiển thị thông tin hệ thống
#         info_img = np.zeros((120, 640, 3), dtype=np.uint8)
#         for i, line in enumerate(info[:5]):
#             cv2.putText(info_img, line, (10, 25 + i * 22), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 1)
#         cv2.imshow("Server Info", info_img)

#         if cv2.waitKey(1) & 0xFF == ord('q'):
#             break
#     cv2.destroyAllWindows()

# # ========== Main ==========
# def main():
#     threading.Thread(target=receiver, daemon=True).start()
#     for i in range(NUM_WORKERS):
#         threading.Thread(target=inference, args=(i,), daemon=True).start()
#     threading.Thread(target=display, daemon=True).start()

#     print("[Server] Running. Press 'q' to quit.")
#     try:
#         while True:
#             time.sleep(0.5)
#     except KeyboardInterrupt:
#         print("[Server] Shutting down.")

# if __name__ == "__main__":
#     main()


In [None]:
#Box nhấp nháy
# #!/usr/bin/env python3
# # -*- coding: utf-8 -*-
# """
# Multi-Camera Server (ImageZMQ) - Overlay old boxes onto new frames (Multiprocessing inference)
# """
# import os
# import cv2
# import time
# import queue
# import threading
# import numpy as np
# import imagezmq
# from datetime import datetime
# from collections import defaultdict
# from multiprocessing import Process, Queue
# import multiprocessing

# # ========== Cấu hình ==========
# PORT = int(os.getenv("PORT", 5555))
# SAVE_DIR = os.getenv("SAVE_DIR", "detections")
# INFER_INTERVAL = float(os.getenv("INFER_INTERVAL", 0.15))  # giãn cách giữa các lần inference (s)
# NUM_WORKERS = int(os.getenv("NUM_WORKERS", 2))
# os.makedirs(SAVE_DIR, exist_ok=True)

# # ========== Hàm nhận diện (thay bằng model thật) ==========
# def detect_products(frame_bgr):
#     """Dummy model — trả về danh sách boxes: (x1,y1,x2,y2,label,score)"""
#     h, w = frame_bgr.shape[:2]
#     return [(w//4, h//4, w//2, h//2, "product", 0.9)]

# def draw_boxes(frame, boxes):
#     for x1, y1, x2, y2, label, score in boxes:
#         cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
#         text = f"{label}:{score:.2f}"
#         cv2.putText(frame, text, (x1, y1 - 10),
#                     cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
#     return frame

# def save_frame(cam_id, frame, boxes):
#     if not boxes:
#         return
#     ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
#     path = os.path.join(SAVE_DIR, f"{cam_id}_{ts}.jpg")
#     cv2.imwrite(path, frame)

# # ========== Thread A: Nhận ảnh ==========
# def receiver(latest_raw_frames, latest_lock, last_infer_ts, in_queue):
#     hub = imagezmq.ImageHub(open_port=f"tcp://*:{PORT}")
#     print(f"[Receiver] Listening on tcp://*:{PORT}")

#     while True:
#         try:
#             cam_name, jpg_buffer = hub.recv_jpg()
#             hub.send_reply(b'OK')
#         except Exception as e:
#             print(f"[Receiver] Error: {e}")
#             time.sleep(0.2)
#             continue

#         cam_id = cam_name
#         np_arr = np.frombuffer(jpg_buffer, dtype=np.uint8)
#         frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
#         if frame is None:
#             continue

#         # Lưu frame gốc (raw) - chỉ lưu, KHÔNG vẽ
#         with latest_lock:
#             latest_raw_frames[cam_id] = frame

#         # Đưa frame vào hàng đợi nếu đủ thời gian (throttle)
#         now = time.time()
#         if now - last_infer_ts[cam_id] >= INFER_INTERVAL:
#             last_infer_ts[cam_id] = now
#             # nếu queue đầy thì pop 1 item cũ để nhường chỗ
#             if in_queue.full():
#                 try:
#                     in_queue.get_nowait()
#                 except queue.Empty:
#                     pass
#             # gửi bản copy để tránh race trên memory
#             in_queue.put((cam_id, frame.copy()))

# # ========== Worker Process: Nhận diện ==========
# def worker_process(worker_id, in_q, out_q):
#     print(f"[Worker-{worker_id}] Started (pid={multiprocessing.current_process().pid})")
#     while True:
#         item = in_q.get()
#         if item is None:
#             # sentinel để dừng process (không bắt buộc, nhưng hữu ích nếu muốn graceful stop)
#             break
#         cam_id, frame = item
#         try:
#             boxes = detect_products(frame)  # có thể là []
#             # optional: lưu ảnh annotated để kiểm tra (không trả về frame cho main)
#             if boxes:
#                 annotated = draw_boxes(frame.copy(), boxes)
#                 save_frame(cam_id, annotated, boxes)
#             # chỉ trả về boxes (không trả frame) -> main overlay lên raw mới nhất
#             out_q.put((cam_id, boxes))
#         except Exception as e:
#             print(f"[Worker-{worker_id}] Error: {e}")

# # ========== Thread B: Nhận kết quả từ worker ==========
# def result_collector(latest_boxes, latest_lock, out_queue):
#     while True:
#         cam_id, boxes = out_queue.get()
#         # nếu muốn dừng collector sạch sẽ, có thể gửi sentinel, nhưng ở đây giữ loop vô hạn
#         with latest_lock:
#             latest_boxes[cam_id] = {"boxes": boxes, "ts": time.time()}

# # ========== Main ==========
# def main():
#     # Quan trọng: tạo multiprocessing.Queue, Process trong main (tránh spawn import loop)
#     latest_raw_frames = {}   # {cam_id: frame}  -- chỉ trong main process
#     latest_boxes = {}        # {cam_id: {"boxes": ..., "ts": ...}}
#     latest_lock = threading.Lock()
#     last_infer_ts = defaultdict(lambda: 0.0)

#     in_queue = Queue(maxsize=32)   # multiprocessing.Queue
#     out_queue = Queue(maxsize=32)  # multiprocessing.Queue

#     # A: receiver thread (nhận raw frames)
#     t_recv = threading.Thread(
#         target=receiver,
#         args=(latest_raw_frames, latest_lock, last_infer_ts, in_queue),
#         daemon=True
#     )
#     t_recv.start()

#     # B: collector thread (nhận boxes từ worker)
#     t_col = threading.Thread(
#         target=result_collector,
#         args=(latest_boxes, latest_lock, out_queue),
#         daemon=True
#     )
#     t_col.start()

#     # C: inference workers (processes)
#     workers = []
#     for i in range(NUM_WORKERS):
#         p = Process(target=worker_process, args=(i, in_queue, out_queue))
#         p.start()
#         workers.append(p)

#     # D: hiển thị từ main process (overlay boxes lên raw frame mới nhất)
#     print("[Server] Running. Press 'q' to quit.")
#     try:
#         while True:
#             items = []
#             with latest_lock:
#                 items = list(latest_raw_frames.items())

#             for cam_id, frame in items:
#                 # nếu frame có thể bị None, bỏ qua
#                 if frame is None:
#                     continue
#                 display_frame = frame.copy()

#                 # Lấy box mới nhất (nếu có)
#                 with latest_lock:
#                     info = latest_boxes.get(cam_id)
#                 if info:
#                     boxes = info.get("boxes", [])
#                     # vẽ nếu có boxes
#                     if boxes:
#                         display_frame = draw_boxes(display_frame, boxes)

#                 cv2.imshow(f"Live - {cam_id}", display_frame)

#             if cv2.waitKey(1) & 0xFF == ord('q'):
#                 break

#     except KeyboardInterrupt:
#         print("Interrupted by user")

#     # Cleanup
#     print("Shutting down workers...")
#     # gửi sentinel None để cho worker thoát vòng lặp (nếu worker có check None)
#     for _ in workers:
#         try:
#             in_queue.put_nowait(None)
#         except Exception:
#             pass


#     for p in workers:
#         if p.is_alive():
#             p.terminate()
#         p.join(timeout=1.0)

#     cv2.destroyAllWindows()
#     print("Exited.")

# if __name__ == "__main__":
#     # trên Windows/macOS có thể cần freeze_support() khi đóng băng exe; gọi an toàn ở đây
#     multiprocessing.freeze_support()
#     main()


In [None]:
# """
# Multi-Camera Server (ImageZMQ) - Multiprocessing Inference
# """

# import os
# import cv2
# import time
# import queue
# import threading
# import numpy as np
# import imagezmq
# from datetime import datetime
# from collections import defaultdict
# from multiprocessing import Process, Queue, Manager

# # ========== Cấu hình ==========
# PORT = int(os.getenv("PORT", 5555))
# SAVE_DIR = os.getenv("SAVE_DIR", "detections")
# INFER_INTERVAL = float(os.getenv("INFER_INTERVAL", 0.15))  # giãn cách giữa các lần inference
# NUM_WORKERS = int(os.getenv("NUM_WORKERS", 2))
# os.makedirs(SAVE_DIR, exist_ok=True)

# # ========== Hàm nhận diện (thay bằng model thật) ==========
# def detect_products(frame_bgr):
#     # Dummy model: trả về 1 box giả lập
#     h, w = frame_bgr.shape[:2]
#     return [(w//4, h//4, w//2, h//2, "product", 0.9)]


# def draw_boxes(frame, boxes):
#     for x1, y1, x2, y2, label, score in boxes:
#         cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
#         text = f"{label}:{score:.2f}"
#         cv2.putText(frame, text, (x1, y1 - 10),
#                     cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
#     return frame


# def save_frame(cam_id, frame, boxes):
#     if not boxes:
#         return
#     ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
#     path = os.path.join(SAVE_DIR, f"{cam_id}_{ts}.jpg")
#     cv2.imwrite(path, frame)


# # ========== Thread A: Nhận ảnh ==========
# def receiver(latest_frames, latest_lock, last_infer_ts, in_queue):
#     hub = imagezmq.ImageHub(open_port=f"tcp://*:{PORT}")
#     print(f"[Receiver] Listening on tcp://*:{PORT}")

#     while True:
#         try:
#             cam_name, jpg_buffer = hub.recv_jpg()
#             hub.send_reply(b'OK')
#         except Exception as e:
#             print(f"[Receiver] Error: {e}")
#             time.sleep(0.2)
#             continue

#         cam_id = cam_name
#         np_arr = np.frombuffer(jpg_buffer, dtype=np.uint8)
#         frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
#         if frame is None:
#             continue

#         with latest_lock:
#             latest_frames[cam_id] = frame

#         # Đưa frame vào hàng đợi nếu đủ thời gian
#         now = time.time()
#         if now - last_infer_ts[cam_id] >= INFER_INTERVAL:
#             last_infer_ts[cam_id] = now
#             if in_queue.full():
#                 try:
#                     in_queue.get_nowait()
#                 except queue.Empty:
#                     pass
#             in_queue.put((cam_id, frame.copy()))


# # ========== Worker Process: Nhận diện ==========
# def worker_process(worker_id, in_q, out_q):
#     print(f"[Worker-{worker_id}] Started")
#     while True:
#         cam_id, frame = in_q.get()
#         try:
#             # Worker chỉ thực hiện nhận diện
#             boxes = detect_products(frame)
#             # Chuyển frame gốc và boxes cho collector
#             out_q.put((cam_id, frame, boxes))
#         except Exception as e:
#             print(f"[Worker-{worker_id}] Error: {e}")


# # ========== Thread B: Nhận kết quả từ worker ==========
# def result_collector(latest_boxes, latest_lock, out_queue):
#     while True:
#         cam_id, frame, boxes = out_queue.get()
#         try:
#             # Cập nhật boxes mới nhất cho vòng lặp hiển thị
#             with latest_lock:
#                 latest_boxes[cam_id] = boxes

#             # Thực hiện vẽ và lưu ảnh ở đây
#             if boxes:
#                 annotated = draw_boxes(frame.copy(), boxes)
#                 save_frame(cam_id, annotated, boxes)
#         except Exception as e:
#             print(f"[Collector] Error: {e}")


# # ========== Main ==========
# def main():
#     # ========== Bộ nhớ chia sẻ và Queues ==========
#     manager = Manager()
#     latest_frames = manager.dict()
#     latest_boxes = manager.dict()  # Để lưu trữ các bounding box mới nhất
#     latest_lock = threading.Lock()
#     last_infer_ts = defaultdict(lambda: 0.0)
#     in_queue = Queue(maxsize=32)
#     out_queue = Queue(maxsize=32)

#     # A: receiver thread
#     receiver_args = (latest_frames, latest_lock, last_infer_ts, in_queue)
#     threading.Thread(target=receiver, args=receiver_args, daemon=True).start()

#     # B: collector thread
#     collector_args = (latest_boxes, latest_lock, out_queue)
#     threading.Thread(target=result_collector, args=collector_args, daemon=True).start()

#     # C: inference workers (processes)
#     workers = []
#     for i in range(NUM_WORKERS):
#         p = Process(target=worker_process, args=(i, in_queue, out_queue), daemon=True)
#         p.start()
#         workers.append(p)

#     # D: hiển thị từ main process
#     print("[Server] Running. Press 'q' to quit.")
#     while True:
#         items = []
#         with latest_lock:
#             # Lấy bản sao của các frame mới nhất
#             items = list(latest_frames.items())

#         for cam_id, frame in items:
#             display_frame = frame.copy()
            
#             # Lấy các box mới nhất cho camera này
#             with latest_lock:
#                 boxes = latest_boxes.get(cam_id)

#             # Vẽ các box nếu có
#             if boxes:
#                 display_frame = draw_boxes(display_frame, boxes)

#             win = f"Live - {cam_id}"
#             cv2.imshow(win, display_frame)

#         if cv2.waitKey(1) & 0xFF == ord('q'):
#             break

#     cv2.destroyAllWindows()
#     for p in workers:
#         p.terminate()


# if __name__ == "__main__":
#     main()


Exception in thread Thread-6 (display):
Traceback (most recent call last):
  File "/Users/quynhnhaa/Documents/Ahn/Study/VenvPython/venv/lib/python3.12/threading.py", line 1052, in _bootstrap_inner
    self.run()
  File "/Users/quynhnhaa/Documents/Ahn/Study/VenvPython/venv/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/Users/quynhnhaa/Documents/Ahn/Study/VenvPython/venv/lib/python3.12/threading.py", line 989, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/1j/xw58cr7n2192dtqz7jx53lsm0000gn/T/ipykernel_67930/3281793291.py", line 110, in display
cv2.error: Unknown C++ exception from OpenCV code


[Receiver] Listening on tcp://*:5555
[Server] Running. Press 'q' to quit.
[Server] Shutting down.


: 

In [1]:
import cv2
print(cv2.getBuildInformation())



  Version control:               4.11.0

  Platform:
    Timestamp:                   2025-01-16T09:52:59Z
    Host:                        Darwin 22.6.0 arm64
    CMake:                       3.31.4
    CMake generator:             Unix Makefiles
    CMake build tool:            /usr/bin/make
    Configuration:               Release
    Algorithm Hint:              ALGO_HINT_ACCURATE

  CPU/HW features:
    Baseline:                    NEON FP16 NEON_DOTPROD NEON_FP16
      requested:                 DETECT
    Dispatched code generation:  NEON_BF16
      requested:                 NEON_FP16 NEON_BF16 NEON_DOTPROD
      NEON_BF16 (0 files):       + NEON_BF16

  C/C++:
    Built as dynamic libs?:      NO
    C++ standard:                11
    C++ Compiler:                /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/c++  (ver 15.0.0.15000100)
    C++ flags (Release):         -fsigned-char -W -Wall -Wreturn-type -Wnon-virtual-dtor -Waddress -Ws