In [1]:
import time
import random
import os
from dotenv import load_dotenv
from pathlib import Path
from faker import Faker
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from confluent_kafka.schema_registry import SchemaRegistryClient

In [2]:
# Load environment variables from .env file
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [3]:
# get environment variables
project_name = os.getenv('COMPOSE_PROJECT_NAME')
kafka_host = os.getenv('KAFKA_HOST')
topic = os.getenv('KAFKA_TOPIC_NAME')
num_replication = int(os.getenv('KAFKA_REPLICATION'))
num_partitions = int(os.getenv('KAFKA_PARTITION'))
schema_registry_host = os.getenv('SCHEMA_REG_HOST')

In [4]:
# print(project_name)
# print(kafka_host)
# print(topic)
# print(num_replication)
# print(num_partitions)
# print(schema_registry_host)

In [5]:
# Kafka Configuration
fake = Faker()
bootstrap_servers = f'{kafka_host}:9092'
schema_registry_url = f'http://{schema_registry_host}:8081' 
schema_registry_client = SchemaRegistryClient({'url': schema_registry_url})

In [6]:
# Schema Registry configuration
schema_registry_conf = {'url': f'http://{schema_registry_host}:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Avro schema definition
value_schema_str = """
{
   "type": "record",
   "name": "data_employee",
   "fields" : [
     {"name": "employee_id", "type": "string"},
     {"name": "employee_name", "type": "string"},
     {"name": "employee_salary", "type": "float"},
     {"name": "employee_age", "type": "int"}
   ]
}
"""
value_schema = avro.loads(value_schema_str)

producer_conf = {
    'bootstrap.servers': bootstrap_servers,
    'schema.registry.url': schema_registry_url
}

producer = AvroProducer(producer_conf, default_value_schema=value_schema)

def generate_employee_data():
    return {
        "employee_id": fake.uuid4(),
        "employee_name": fake.name(),
        "employee_salary": round(random.uniform(1000000, 10000000),2),
        "employee_age": random.randint(17, 70)
    }

while True:
    event = generate_employee_data()
    print(f"Sending event: {event}")
    producer.produce(topic=topic, value=event)
    producer.flush()
    time.sleep(5)

AvroProducer has been deprecated. Use AvroSerializer instead.


Sending event: {'employee_id': '3827fa07-1c58-4495-9827-192e16905e52', 'employee_name': 'Bethany Brown', 'employee_salary': 4045313.08, 'employee_age': 45}
Sending event: {'employee_id': '0c83e81d-2479-485f-8b05-e294cfa1d349', 'employee_name': 'Jeremy Foster', 'employee_salary': 4996386.78, 'employee_age': 18}
Sending event: {'employee_id': '74783818-dc60-4c33-bd64-92d32c5070c1', 'employee_name': 'Brittney Petersen', 'employee_salary': 6339470.59, 'employee_age': 44}
Sending event: {'employee_id': '1fb72d94-a6f3-4200-a7ff-b5a07d8f3168', 'employee_name': 'Wendy Butler', 'employee_salary': 1016838.01, 'employee_age': 40}
Sending event: {'employee_id': 'a6f16eb6-f08b-4c81-b112-9c9e97220aa0', 'employee_name': 'Christopher Richardson', 'employee_salary': 2288411.14, 'employee_age': 28}
Sending event: {'employee_id': '91ac3f2b-a096-45c1-8e76-6c890b2f3698', 'employee_name': 'Gregory Crawford', 'employee_salary': 8288851.77, 'employee_age': 47}
Sending event: {'employee_id': 'feec49ff-1559-470

KeyboardInterrupt: 