In [1]:
import json
import requests
from kafka import KafkaProducer

In [2]:
# Kafka configuration
KAFKA_BROKER = 'localhost:9092'  # Adjust if your Kafka broker is on a different host/port
TOPIC_NAME = 'random_number_topic'  # Replace with your Kafka topic name

# Create a Kafka producer
producer = KafkaProducer(
    bootstrap_servers=[KAFKA_BROKER],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize JSON data
)

def ingest_stream():
    """Function to ingest JSON stream from Flask and send to Kafka."""
    url = 'http://localhost:3692/stream'  # URL of the Flask JSON stream
    response = requests.get(url, stream=True)  # Stream the response

    try:
        for line in response.iter_lines():
            if line:  # Check if the line is not empty
                try:
                    json_data = json.loads(line)  # Deserialize JSON
                    producer.send(TOPIC_NAME, json_data)  # Send data to Kafka
                    # print(f"Sent to Kafka: {json_data}")  # Optional: print the sent data
                except json.JSONDecodeError as e:
                    print(f"JSON deserialization error: {e}")
                    continue
    except Exception as e:
        print(f"Error while sending to Kafka: {e}")
    finally:
        producer.close()  # Close the producer when done

In [None]:
if __name__ == '__main__':
    ingest_stream()