In [1]:
from kafka import KafkaProducer
import cv2
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

In [2]:
class VideoFileHandler(FileSystemEventHandler):
    def __init__(self, producer, topic):
        self.producer = producer
        self.topic = topic

    def on_created(self, event):
        if not event.is_directory and event.src_path.endswith('.mp4'):
            print(f"Processing new video: {event.src_path}")
            self.process_video(event.src_path)
            os.remove(event.src_path)  # Delete video after processing
            print(f"Deleted video: {event.src_path}")

    def process_video(self, video_path):
        cap = cv2.VideoCapture(video_path)
        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break  # No more frames or error
            if frame_count % 10 == 0:
                self.send_frame_to_kafka(frame)
            frame_count += 1
        cap.release()

    def send_frame_to_kafka(self, frame):
        ret, buffer = cv2.imencode('.jpg', frame)
        self.producer.send(self.topic, buffer.tobytes())
        self.producer.flush()


In [3]:
def main():
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    topic = 'frame_topic'
    path_to_watch = '/home/jovyan/videos/'  # Directory to monitor

    event_handler = VideoFileHandler(producer, topic)
    observer = Observer()
    observer.schedule(event_handler, path=path_to_watch, recursive=False)
    observer.start()

    print("Producer running. Message being sent to Kafka Consumer. Press Ctrl+C to stop.")
    try:
        while True:
            pass
    except KeyboardInterrupt:
        observer.stop()
        observer.join()
        print("Producer stopped.")

if __name__ == "__main__":
    main()

Producer running. Message being sent to Kafka Consumer. Press Ctrl+C to stop.
Processing new video: /home/jovyan/videos/output181.mp4
Deleted video: /home/jovyan/videos/output181.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a7800a9c0] moov atom not found


Processing new video: /home/jovyan/videos/output182.mp4
Deleted video: /home/jovyan/videos/output182.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a7800ed40] moov atom not found


Processing new video: /home/jovyan/videos/output183.mp4
Deleted video: /home/jovyan/videos/output183.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a7800fe40] moov atom not found


Processing new video: /home/jovyan/videos/output184.mp4
Deleted video: /home/jovyan/videos/output184.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78010f00] moov atom not found


Processing new video: /home/jovyan/videos/output185.mp4
Deleted video: /home/jovyan/videos/output185.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a780118c0] moov atom not found


Processing new video: /home/jovyan/videos/output186.mp4
Deleted video: /home/jovyan/videos/output186.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78012240] moov atom not found


Processing new video: /home/jovyan/videos/output187.mp4
Deleted video: /home/jovyan/videos/output187.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78012d80] moov atom not found


Processing new video: /home/jovyan/videos/output188.mp4
Deleted video: /home/jovyan/videos/output188.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78013840] moov atom not found


Processing new video: /home/jovyan/videos/output189.mp4
Deleted video: /home/jovyan/videos/output189.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78013b00] moov atom not found


Processing new video: /home/jovyan/videos/output190.mp4
Deleted video: /home/jovyan/videos/output190.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78014180] moov atom not found


Processing new video: /home/jovyan/videos/output191.mp4
Deleted video: /home/jovyan/videos/output191.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78014040] moov atom not found


Processing new video: /home/jovyan/videos/output192.mp4
Deleted video: /home/jovyan/videos/output192.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78014a40] moov atom not found


Processing new video: /home/jovyan/videos/output193.mp4
Deleted video: /home/jovyan/videos/output193.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78014980] moov atom not found


Processing new video: /home/jovyan/videos/output194.mp4
Deleted video: /home/jovyan/videos/output194.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a780147c0] moov atom not found


Processing new video: /home/jovyan/videos/output195.mp4
Deleted video: /home/jovyan/videos/output195.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78014f40] moov atom not found


Processing new video: /home/jovyan/videos/output196.mp4
Deleted video: /home/jovyan/videos/output196.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78015480] moov atom not found


Processing new video: /home/jovyan/videos/output197.mp4
Deleted video: /home/jovyan/videos/output197.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78014ec0] moov atom not found


Processing new video: /home/jovyan/videos/output198.mp4
Deleted video: /home/jovyan/videos/output198.mp4


[mov,mp4,m4a,3gp,3g2,mj2 @ 0x7f5a78015740] moov atom not found


Producer stopped.
