In [1]:
import cv2
import os
import time
from confluent_kafka import Producer, Consumer
import socket
from ultralytics import YOLO
import cvzone

HOSTNAME = "localhost"

# Kafka Configuration
producer_conf = {
    'bootstrap.servers': HOSTNAME + ":9092",  # Replace with your Kafka hostname
    'client.id': socket.gethostname(),
    'compression.type': 'gzip',
    'message.max.bytes': 500000000  # Adjust as needed
}
producer = Producer(producer_conf)

consumer_conf = {
    'bootstrap.servers': HOSTNAME + ":9092",  # Replace with your Kafka hostname
    'group.id': 'file_group',
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_conf)

# Directories
capture_dir = "captured_images"
received_dir = "received_images"
os.makedirs(capture_dir, exist_ok=True)
os.makedirs(received_dir, exist_ok=True)

# Initialize YOLO model
model = YOLO('yolov8n.pt')

In [2]:
def process_with_yolo(file_path):
    """
    Processes the file with YOLO for object detection.
    """
    if file_path.lower().endswith(('.jpg', '.jpeg', '.png')):
        # Process single image
        results = model(file_path, show=True)
        cv2.waitKey(0)
    elif file_path.lower().endswith(('.mp4', '.avi', '.mkv')):
        # Process video
        cap = cv2.VideoCapture(file_path)
        prev_frame_time = 0
        new_frame_time = 0

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            new_frame_time = time.time()
            results = model(frame, stream=True)
            for r in results:
                for box in r.boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    conf = round(box.conf[0], 2)
                    cls = int(box.cls[0])
                    class_name = model.names[cls]
                    cvzone.putTextRect(frame, f'{class_name} {conf}', (x1, y1))
                    cvzone.cornerRect(frame, (x1, y1, x2 - x1, y2 - y1))

            fps = 1 / (new_frame_time - prev_frame_time)
            prev_frame_time = new_frame_time
            print(f"FPS: {fps}")
            cv2.imshow("YOLO Prediction", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        cap.release()
        cv2.destroyAllWindows()

In [3]:
def capture_and_send_images(duration=120, topic='files_topic'):
    """
    Captures images from the webcam for a specified duration and sends them to Kafka.
    """
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Unable to access webcam.")
        return

    start_time = time.time()
    i = 0

    try:
        while time.time() - start_time < duration:
            ret, frame = cap.read()
            if ret:
                # Save image
                filename = os.path.join(capture_dir, f"image_{i}.jpg")
                cv2.imwrite(filename, frame)
                print(f"Captured and saved: {filename}")

                # Send image to Kafka
                with open(filename, 'rb') as file:
                    file_bytes = file.read()
                    producer.produce(topic, key=f"image_{i}.jpg", value=file_bytes)
                    print(f"File {filename} sent to topic {topic}.")
                i += 1

            time.sleep(0.5)  # Adjust capture frequency
    finally:
        cap.release()
        producer.flush()
        print("Finished capturing and sending images.")

In [4]:
def receive_files_from_kafka(topic='files_topic'):
    """
    Consumes files from Kafka and processes them with YOLO.
    """
    consumer.subscribe([topic])
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                print(f"Consumer error: {msg.error()}")
                continue
            file_name = msg.key().decode('utf-8').split("\\")[-1]
            file_path = os.path.join(received_dir, file_name)
            with open(file_path, 'wb') as file:
                file.write(msg.value())
                print(f"File saved: {file_path}")
            process_with_yolo(file_path)
    except KeyboardInterrupt:
        print("Consumer stopped.")
    finally:
        consumer.close()

In [5]:
# Capture and send images for 2 minutes
capture_and_send_images(duration=12)

# Start consuming files from Kafka
receive_files_from_kafka()

Consumer stopped.
