# ðŸ§ª Notebook 03 â€” Kafka Producer
This notebook:

âœ” Loads schema from transaction-metadata wheel<br>
âœ” Encodes data to Avro<br>
âœ” Publishes to Kafka<br>

In [2]:
import sys

# Ensure transaction-metadata is in the path
transaction_metadata_path = "/Users/manojitroy/flink-practice/transaction-project/transaction-metadata"
if transaction_metadata_path not in sys.path:
    sys.path.insert(0, transaction_metadata_path)

from transaction_metadata.loader import get_schema


In [3]:
transaction_schema = get_schema("transaction", version="v1")
transaction_schema

{'type': 'record',
 'name': 'Transaction',
 'namespace': 'com.company.data',
 'fields': [{'name': 'transaction_id', 'type': 'string'},
  {'name': 'user_id', 'type': 'string'},
  {'name': 'amount', 'type': 'double'},
  {'name': 'currency', 'type': 'string'},
  {'name': 'merchant_name', 'type': 'string'},
  {'name': 'merchant_category', 'type': ['null', 'string'], 'default': None},
  {'name': 'transaction_type', 'type': 'string'},
  {'name': 'timestamp', 'type': 'long', 'logicalType': 'timestamp-millis'},
  {'name': 'device_id', 'type': ['null', 'string'], 'default': None},
  {'name': 'location', 'type': ['null', 'string'], 'default': None},
  {'name': 'status', 'type': 'string'}]}

In [4]:
%pip install avro-python3
from fastavro import schemaless_writer
from io import BytesIO
import io
import avro.schema
import avro.io

def encode_avro_fastavro(data, schema):
    """Encode data to Avro binary format using fastavro."""
    buf = BytesIO()
    schemaless_writer(buf, schema, data)
    return buf.getvalue()

def encode_avro(data, schema):
    """Encode data to Avro binary format using avro library."""
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    parsed_schema = avro.schema.parse(str(schema))
    writer = avro.io.DatumWriter(parsed_schema)
    writer.write(data, encoder)
    return bytes_writer.getvalue()


Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [5]:
from confluent_kafka import Producer, Consumer, KafkaException

def create_producer(bootstrap_servers="localhost:9092"):
    """Create a Kafka producer."""
    config = {
        "bootstrap.servers": bootstrap_servers,
        "acks": "all",
        "compression.type": "lz4",
    }
    return Producer(config)

def send_message(producer, topic, key, value):
    """Send a message to Kafka."""
    try:
        producer.produce(topic, key=key, value=value)
        producer.flush()
    except KafkaException as e:
        print(f"Kafka send error: {e}")

def create_consumer(topic, group_id="transaction-validator", bootstrap_servers="localhost:9092"):
    """Create a Kafka consumer."""
    config = {
        "bootstrap.servers": bootstrap_servers,
        "group.id": group_id,
        "auto.offset.reset": "earliest"
    }
    consumer = Consumer(config)
    consumer.subscribe([topic])
    return consumer

def consume_messages(consumer, max_messages=10):
    """Fetch up to N messages from Kafka."""
    messages = []
    while len(messages) < max_messages:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print("Kafka error:", msg.error())
            continue
        messages.append(msg)
    return messages


In [6]:
# Utility to build a transaction and send a single message to Kafka
import sys
import pathlib
import json

# Try to import transaction_utils. If it's a local module in the repo (likely in
# transaction-data-simulator), add that path to sys.path and retry.
try:
    from transaction_utils import build_transaction
except ModuleNotFoundError:
    # Determine a reasonable candidate path:
    # Use transaction_metadata_path (already present in the notebook) to find project root,
    # then look for 'transaction-data-simulator' sibling folder.
    project_root = pathlib.Path(transaction_metadata_path).parent if 'transaction_metadata_path' in globals() else pathlib.Path.cwd()
    candidate = str(project_root / 'transaction-data-simulator')
    if candidate not in sys.path:
        sys.path.insert(0, candidate)
    try:
        from transaction_utils import build_transaction
    except ModuleNotFoundError:
        # Helpful error if still not found
        raise ModuleNotFoundError(
            f"transaction_utils not found. Checked candidate path: {candidate}. "
            "If this is a pip package, install it with: %pip install transaction-utils"
        )

def send_single_message(topic, producer=None, bootstrap_servers="localhost:9092"):
    """Build a transaction and send it to `topic` using the available producer util."""
    # Create a producer if none provided
    if producer is None:
        producer = create_producer(bootstrap_servers)

    # Build the event
    event = build_transaction()

    # Encode to Avro binary if schema is available, otherwise JSON bytes
    try:
        if 'transaction_schema' in globals() and transaction_schema is not None:
            print('Encoding event to Avro using fastavro')
            value = encode_avro_fastavro(event, transaction_schema)
        else:
            print('No schema found, falling back to JSON bytes')
            value = json.dumps(event).encode('utf-8')
    except Exception as e:
        print('Encoding to Avro failed, falling back to JSON bytes:', e)
        value = json.dumps(event).encode('utf-8')

    key = event.get('transaction_id')
    if isinstance(key, str):
        key = key.encode('utf-8')

    # Use existing helper to send (handles flush & errors)
    send_message(producer, topic, key, value)

    return event

In [7]:
p = create_producer()

In [8]:
from time import sleep


while True:
    sent_event = send_single_message('transactions', producer=p)
    print('Sent:', sent_event)
    sleep(1)

Encoding event to Avro using fastavro
Sent: {'transaction_id': 'd3b940d9-e79b-4180-b808-a74db37d7bae', 'user_id': '5936a209-b469-475f-9c9e-6228c79261a3', 'amount': 3054.38, 'currency': 'USD', 'merchant_name': 'Myntra - Online Services - F1HV     ', 'merchant_category': ' Food', 'transaction_type': 'purchase', 'timestamp': 1763820006641, 'device_id': '5eab3a83-e2f9-4d81-a9e1-f95ad43fec61', 'location': 'Tokyo - JCE', 'status': 'pending'}
Encoding event to Avro using fastavro
Sent: {'transaction_id': 'df1b6db8-13cc-42f9-842d-ec03f230f010', 'user_id': '6e1cd218-c5f6-4244-a543-b2a6fe94b378', 'amount': 4523.77, 'currency': 'USD', 'merchant_name': 'Tata - Travel - K4I7                ', 'merchant_category': ' Grocery', 'transaction_type': 'purchase', 'timestamp': 1763820007654, 'device_id': 'e5273787-5d36-49d9-b1af-ba3f11ba958d', 'location': 'Lucknow - QYG', 'status': 'pending'}
Encoding event to Avro using fastavro
Sent: {'transaction_id': 'df1b6db8-13cc-42f9-842d-ec03f230f010', 'user_id': '

KeyboardInterrupt: 

In [9]:
p.close()

AttributeError: 'cimpl.Producer' object has no attribute 'close'