In [None]:
import cv2
import time
from kafka import KafkaProducer
import frame_message_pb2


KAFKA_BROKER = "localhost:9092"
TOPIC_NAME = "count-vehicle"
VIDEO_SOURCE = "camera01.mp4"
CAMERA_ID = "camera_freeway_02"

# --- Khởi tạo Kafka Producer ---
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER
)

# --- Mở video và gửi từng frame ---
cap = cv2.VideoCapture(VIDEO_SOURCE)
if not cap.isOpened():
    print(f"Lỗi: Không thể mở file video {VIDEO_SOURCE}")
    exit()

fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = 1 / fps if fps > 0 else 0
print(f"Đang gửi video từ camera: {CAMERA_ID} với FPS: {fps:.2f}")

cnt = 0
try:
    while True:
        start_time = time.time()

        ret, frame = cap.read()
        if not ret:
            print("Đã đọc hết video.")
            break

        # Nén ảnh thành định dạng JPEG để giảm kích thước
        ret, buffer = cv2.imencode(".jpeg", frame)
        if not ret:
            print("Lỗi: Không thể nén frame.")
            continue

        # Tạo payload bằng đối tượng Protobuf
        payload_proto = frame_message_pb2.VideoPayload()
        payload_proto.camera_id = CAMERA_ID
        payload_proto.frame = buffer.tobytes()

        # Serialize đối tượng Protobuf thành bytes
        serialized_payload = payload_proto.SerializeToString()

        # Gửi payload đã được serialize qua Kafka
        producer.send(TOPIC_NAME, serialized_payload)
        cnt += 1

        # Điều chỉnh tốc độ gửi để khớp với FPS của video gốc
        elapsed_time = time.time() - start_time
        sleep_time = max(0, frame_interval - elapsed_time)
        time.sleep(sleep_time)

finally:
    # Đảm bảo tất cả message đã được gửi trước khi đóng
    producer.flush()
    producer.close()
    cap.release()
    print(f"Gửi video thành công. Tổng số frame đã gửi: {cnt}")