In [151]:
# Installation for kafka-python library (run once)
!pip install kafka-python




In [152]:
# Imports and basic configuration
import json
import uuid
import random
import time
from datetime import datetime, timedelta, timezone
from kafka import KafkaProducer
import numpy as np

# Configurable parameters
EVENTS_PER_SECOND = 10  # Rate of events per second
SIMULATION_HOURS = 8   # Duration to run the simulation in hours

In [153]:
# Kafka Endpoint configuration parameters

# KAFKA_BROKER maps to Bootstrap Server address in the Eventstream
KAFKA_BROKER = ''  # Bootstrap server address

# KAFKA_TOPIC maps to Topic name in the Eventstream
KAFKA_TOPIC = ''  # Kafka topic for event ingestion

# sas_username to be retained as is.
sas_username = '$ConnectionString'

# sas_password maps to Primary Key Connection String in the Eventstream
sas_password = ''

In [154]:
producer = None

def init_kafka_producer(
    broker, 
    sasl_username=None, 
    sasl_password=None, 
    sasl_mechanism='PLAIN', 
    security_protocol='SASL_SSL'
):
    global producer
    kafka_config = {
        'bootstrap_servers': [broker],
        'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
        'acks': 'all',
        'retries': 3,
    }
    if sasl_username and sasl_password:
        kafka_config.update({
            'security_protocol': security_protocol,
            'sasl_mechanism': sasl_mechanism,
            'sasl_plain_username': sasl_username,
            'sasl_plain_password': sasl_password
        })
    else:
        kafka_config['security_protocol'] = 'PLAINTEXT'
    
    producer = KafkaProducer(**kafka_config)
    print(f"Kafka producer initialized to broker {broker}")

def send_event_to_kafka(event):
    if producer is None:
        raise Exception("Kafka producer not initialized")
    try:
        future = producer.send(KAFKA_TOPIC, event)
        record_metadata = future.get(timeout=10)
        print(f"Event sent to topic {record_metadata.topic} "
              f"partition {record_metadata.partition} offset {record_metadata.offset}")
        producer.flush()
    except Exception as e:
        print(f"Error sending event to Kafka: {e}")


In [155]:
from datetime import datetime
import time
import random

def generateLoRaWanElsysErs1Event():
    event = {
        "end_device_ids": {
            "device_id": "eui-a81758fffe08a90f",
            "application_ids": {
                "application_id": "svelde-elsys-ers"
            }
        },
        "received_at" : datetime.now().isoformat() + "Z",
        "uplink_message" : {
            "decoded_payload" : {
                "humidity": str(random.randint(60, 70)), # 64,
                "light": str(random.randint(2200, 2500)), # 2417,
                "temperature": str(random.randint(170, 190) / 10), # 18.5,
                "vdd": str(random.randint(3400, 3450)) # 3415                
            }
        }
    }
    
    return event

def generateLoRaWanElsysErs2Event():
    event = {
        "end_device_ids": {
            "device_id": "eui-a81758fffe08a8e7",
            "application_ids": {
                "application_id": "svelde-elsys-ers"
            }
        },
        "received_at" : datetime.now().isoformat() + "Z",
        "uplink_message" : {
            "decoded_payload" : {
                "humidity": str(random.randint(60, 70)), # 64,
                "light": str(random.randint(2200, 2500)), # 2417,
                "temperature": str(random.randint(170, 190) / 10), # 18.5,
                "vdd": str(random.randint(3400, 3450)) # 3415                
            }
        }
    }
    
    return event

def generateLoRaWanGenericNode1Event():
    event = {
        "end_device_ids": {
            "device_id": "eui-0004a310001ab228",
            "application_ids": {
                "application_id": "svelde-ttn-generic-node"
            }
        },
        "received_at" : datetime.now().isoformat() + "Z",
        "uplink_message" : {
            "decoded_payload" : {
                "batt_volt": str(random.randint(32, 36) / 10), # 3.4,
                "button": str(random.randint(0, 1)), # 0 | 1,
                "temperature": str(random.randint(180, 200) / 10), # 19.5,
                "humidity": str(random.randint(500, 510) / 10) # 50.7              
            }
        }
    }
    
    return event    


def generateLoRaWanCardTrackerEvent():

    now = datetime.now()

    epoch = int(time.mktime(now.timetuple())) * 1000 + int(now.microsecond / 1000), # 1759579139000,

    event = {
        "end_device_ids": {
            "device_id": "eui-2cf7f1c053200054",
            "application_ids": {
                "application_id": "svelde-sensecap-t1000-card-tracker"
            }
        },
        "received_at" : now.isoformat() + "Z",
        "uplink_message" : {
            "decoded_payload" : {
                "err": 0,
                "messages": [
                    [
                    {
                        "measurementId": "4200",
                        "measurementValue": [],
                        "motionId": 0,
                        "timestamp": epoch,
                        "type": "Event Status"
                    },
                    {
                        "measurementId": "4197",
                        "measurementValue": 5.61234,
                        "motionId": 0,
                        "timestamp": epoch,
                        "type": "Longitude"
                    },
                    {
                        "measurementId": "4198",
                        "measurementValue": 51.461234,
                        "motionId": 0,
                        "timestamp": epoch,
                        "type": "Latitude"
                    },
                    {
                        "measurementId": "3000",
                        "measurementValue": random.randint(50, 51),
                        "motionId": 0,
                        "timestamp":  epoch,
                        "type": "Battery"
                    }
                    ]
                ],
                "payload": "090000000068e10c030055be96031138a033",
                "valid": "true"
            }
        }
    }
    
    return event    


In [156]:
def generateEvents(
    kafka_send_func=None, 
    print_events=True
):
    try:
        while True:
            
            cardTrackerEvent = generateLoRaWanCardTrackerEvent()    
            if print_events:
                print(json.dumps(cardTrackerEvent))
        
            if kafka_send_func:
                kafka_send_func(cardTrackerEvent)

            time.sleep(10)

            loRaWanElsysErs1Event = generateLoRaWanElsysErs1Event()    
            if print_events:
                print(json.dumps(loRaWanElsysErs1Event))
        
            if kafka_send_func:
                kafka_send_func(loRaWanElsysErs1Event)

            time.sleep(10)

            loRaWanElsysErs2Event = generateLoRaWanElsysErs2Event()    
            if print_events:
                print(json.dumps(loRaWanElsysErs2Event))
        
            if kafka_send_func:
                kafka_send_func(loRaWanElsysErs2Event)

            time.sleep(10)

            genericNode1Event = generateLoRaWanGenericNode1Event()    
            if print_events:
                print(json.dumps(genericNode1Event))
        
            if kafka_send_func:
                kafka_send_func(genericNode1Event)

            time.sleep(10)
    except KeyboardInterrupt:
        producer_events.close()

In [157]:
# Initialize Kafka Producer with SASL credentials
init_kafka_producer(KAFKA_BROKER, sas_username, sas_password)

generateEvents( 
    kafka_send_func=send_event_to_kafka,
    print_events=True
)

<BrokerConnection client_id=kafka-python-producer-19, node_id=bootstrap-0 host=esehamp8ig635ybazfybm5.servicebus.windows.net:9093 <checking_api_versions_recv> [IPv4 ('20.86.89.26', 9093)]>: socket disconnected
<BrokerConnection client_id=kafka-python-producer-19, node_id=bootstrap-0 host=esehamp8ig635ybazfybm5.servicebus.windows.net:9093 <checking_api_versions_recv> [IPv4 ('20.86.89.26', 9093)]>: Closing connection. KafkaConnectionError: socket disconnected


Kafka producer initialized to broker esehamp8ig635ybazfybm5.servicebus.windows.net:9093
{"end_device_ids": {"device_id": "eui-2cf7f1c053200054", "application_ids": {"application_id": "svelde-sensecap-t1000-card-tracker"}}, "received_at": "2025-10-04T13:18:17.515082Z", "uplink_message": {"decoded_payload": {"err": 0, "messages": [[{"measurementId": "4200", "measurementValue": [], "motionId": 0, "timestamp": [1759583897515], "type": "Event Status"}, {"measurementId": "4197", "measurementValue": 5.61234, "motionId": 0, "timestamp": [1759583897515], "type": "Longitude"}, {"measurementId": "4198", "measurementValue": 51.461234, "motionId": 0, "timestamp": [1759583897515], "type": "Latitude"}, {"measurementId": "3000", "measurementValue": 51, "motionId": 0, "timestamp": [1759583897515], "type": "Battery"}]], "payload": "090000000068e10c030055be96031138a033", "valid": "true"}}}
Event sent to topic es_db15897e-590b-49a6-8624-2d604c9c0dc3 partition 0 offset 1
{"end_device_ids": {"device_id": "e

NameError: name 'producer_events' is not defined