# IoT Event Data Generation

Generate synthetic IoT-style events with fixed columns and a dynamic JSON column. Use for demos, testing pipelines, or streaming (e.g. Azure Event Hub → Delta/Iceberg).

**Quick start:** Set parameters in **Cell 1 (Config)** and run cells in order. Batch output can be JSON/file; streaming uses Event Hub (Databricks).

## 1. Config (edit here)

All tunable parameters in one place. Change these to match your environment and volume size.

In [2]:
import os

# ----- Batch generation -----
NUM_EVENTS = 50
MIN_KEYS = 5
MAX_KEYS = 25
RANDOM_SEED = None  # Set to an int (e.g. 42) for reproducible data

# ----- Paths (local or Databricks Volume) -----
CATALOG_VOLUME_PATH = "/Volumes/kunal/default/iot_data"  # e.g. "/Volumes/<catalog>/<schema>/<volume>" or "./output"
OUTPUT_DIR = CATALOG_VOLUME_PATH
OUTPUT_FILENAME_PREFIX = "iot_events"
SAVE_JSON = True

# ----- Time window for generated events -----
BASE_TIME_DAYS_AGO = 1
TIME_SPREAD_HOURS = 24

# ----- Event Hub / streaming (secrets from config_local.py, not committed) -----
try:
    from config_local import EVENTHUB_CONNECTION_STRING
except ImportError:
    EVENTHUB_CONNECTION_STRING = os.environ.get("EVENTHUB_CONNECTION_STRING", "")
EVENTHUB_NAMESPACE = "oneenv-eventhub"
EVENTHUB_NAME = "one-env-windturbine"
BATCH_SIZE = 10
SLEEP_INTERVAL_SEC = 2

# ----- Delta / Iceberg (streaming sink) -----
DELTA_TABLE_NAME = "kunal.default.iot_events_stream"
DELTA_TABLE_PATH = "/mnt/delta/your_delta_table_path"
CHECKPOINT_DIR = f"{CATALOG_VOLUME_PATH}/checkpoints/delta_stream"
ICEBERG_TABLE_NAME = "kunal.default.iot_events_stream_iceberg"
CHECKPOINT_DIR_ICEBERG = f"{CATALOG_VOLUME_PATH}/checkpoints_iceberg/delta_stream"

## 2. Imports

In [3]:
import json
import random
import os
from datetime import datetime, timedelta
from pathlib import Path

import pandas as pd

## 3. Data catalogs (fixed columns, key pool, enums)

All schema and value lists in one cell for easy customisation.

In [None]:
FIXED_COLUMNS = [
    "event_id", "timestamp", "device_type", "firmware_version", "battery_level", "sensor_id",
    "temperature", "humidity", "pressure", "status", "region", "zone", "model", "version",
    "signal_strength", "uptime_hours", "fault_count", "last_calibration", "maintenance_due",
    "created_at", "updated_at", "source", "env",
]

NESTED_KEYS = {
    "location": lambda: {"latitude": round(random.uniform(-90, 90), 4), "longitude": round(random.uniform(-180, 180), 4), "altitude": round(random.uniform(0, 1000), 1)},
    "diagnostics": lambda: {"cpu_usage": round(random.uniform(0, 100), 1), "memory_usage": round(random.uniform(0, 100), 1), "disk_space_mb": random.randint(100, 1024)},
    "config": lambda: {"poll_interval_sec": random.randint(1, 60), "retry_count": random.randint(0, 5)},
}

IOT_KEY_POOL = [
    "device_type", "firmware_version", "battery_level", "location", "diagnostics", "status",
    "error_codes", "last_service", "warranty_expiry", "config", "sensor_id", "model",
    "temperature", "humidity", "signal_strength", "uptime_hours", "last_calibration",
    "fault_count", "maintenance_due", "version", "region", "zone", "tags",
]

STATUS_VALUES = ["idle", "active", "standby", "error", "maintenance", "offline"]
DEVICE_TYPES = ["Type-A", "Type-B", "Type-C", "Type-D"]
REGIONS = ["north", "south", "east", "west", "central"]
SOURCES = ["gateway", "edge", "cloud", "ingest"]
ENVS = ["prod", "staging", "test"]

## 4. Generator functions

In [None]:
def make_dynamic_value(key, nested_keys=None, device_types=None, status_values=None, key_pool=None):
    nested_keys = nested_keys or NESTED_KEYS
    device_types = device_types or DEVICE_TYPES
    status_values = status_values or STATUS_VALUES
    if key in nested_keys:
        return nested_keys[key]()
    if key == "device_type":
        return random.choice(device_types)
    if key == "firmware_version":
        return f"v{random.randint(1, 3)}.{random.randint(0, 9)}"
    if key == "battery_level":
        return round(random.uniform(0, 100), 1)
    if key == "status":
        return random.choice(status_values)
    if key in ("error_codes", "last_service", "warranty_expiry", "last_calibration", "maintenance_due"):
        return random.choice([None, None, None, "2025-01-15", round(random.uniform(1, 100), 1)])
    if key in ("temperature", "humidity", "uptime_hours", "signal_strength", "fault_count"):
        return round(random.uniform(0, 100), 1) if random.random() > 0.2 else None
    if key == "sensor_id":
        return f"SENSOR-{random.randint(1, 999):03d}"
    if key in ("model", "version", "region", "zone"):
        return random.choice([f"M-{random.randint(1, 20)}", "v2.0", "north", "zone-1", None])
    if key == "tags":
        return random.sample(["prod", "test", "edge", "critical"], k=random.randint(0, 3))
    return random.choice([random.randint(0, 100), round(random.uniform(0, 50), 2), "unknown", None])


def generate_fixed_row(event_id, base_time, device_types=None, status_values=None, regions=None, sources=None, envs=None):
    device_types = device_types or DEVICE_TYPES
    status_values = status_values or STATUS_VALUES
    regions = regions or REGIONS
    sources = sources or SOURCES
    envs = envs or ENVS
    t = base_time + timedelta(minutes=random.randint(0, 60 * max(1, TIME_SPREAD_HOURS)))
    return {
        "event_id": event_id,
        "timestamp": t.isoformat() + "Z",
        "device_type": random.choice(device_types),
        "firmware_version": f"v{random.randint(1, 3)}.{random.randint(0, 9)}",
        "battery_level": round(random.uniform(0, 100), 1),
        "sensor_id": f"SENSOR-{random.randint(1, 999):03d}",
        "temperature": round(random.uniform(-10, 50), 1) if random.random() > 0.1 else None,
        "humidity": round(random.uniform(0, 100), 1) if random.random() > 0.1 else None,
        "pressure": round(random.uniform(1000, 1025), 1) if random.random() > 0.1 else None,
        "status": random.choice(status_values),
        "region": random.choice(regions),
        "zone": f"zone-{random.randint(1, 10)}",
        "model": f"M-{random.randint(1, 20)}",
        "version": f"v{random.randint(1, 3)}.{random.randint(0, 9)}",
        "signal_strength": random.randint(1, 5) if random.random() > 0.2 else None,
        "uptime_hours": round(random.uniform(0, 8760), 1) if random.random() > 0.2 else None,
        "fault_count": random.randint(0, 20) if random.random() > 0.3 else None,
        "last_calibration": (t - timedelta(days=random.randint(1, 365))).isoformat()[:10] if random.random() > 0.5 else None,
        "maintenance_due": (t + timedelta(days=random.randint(1, 90))).isoformat()[:10] if random.random() > 0.5 else None,
        "created_at": (t - timedelta(minutes=random.randint(0, 60))).isoformat() + "Z",
        "updated_at": t.isoformat() + "Z",
        "source": random.choice(sources),
        "env": random.choice(envs),
    }


def generate_dynamic_json(min_keys=None, max_keys=None, key_pool=None):
    min_k = min_keys if min_keys is not None else MIN_KEYS
    max_k = max_keys if max_keys is not None else MAX_KEYS
    pool = key_pool or IOT_KEY_POOL
    n = random.randint(min_k, min(max_k, len(pool)))
    keys = random.sample(pool, n)
    return {k: make_dynamic_value(k) for k in keys}


def generate_iot_row(event_id, base_time, **kwargs):
    row = generate_fixed_row(event_id, base_time, **kwargs)
    row["json_column"] = generate_dynamic_json()
    return row

## 5. Generate batch and (optional) save

Single place: build table, DataFrame, column order, and optional JSON export.

In [None]:
if RANDOM_SEED is not None:
    random.seed(RANDOM_SEED)

base_time = datetime.now().astimezone().replace(microsecond=0) - timedelta(days=BASE_TIME_DAYS_AGO)
table_rows = [generate_iot_row(i + 1, base_time) for i in range(NUM_EVENTS)]

df = pd.DataFrame(table_rows)
cols = [c for c in FIXED_COLUMNS if c in df.columns] + ["json_column"]
df = df[[c for c in cols if c in df.columns]]

if SAVE_JSON:
    out_path = Path(OUTPUT_DIR)
    out_path.mkdir(parents=True, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_file = out_path / f"{OUTPUT_FILENAME_PREFIX}_{timestamp}.json"
    with open(output_file, "w") as f:
        json.dump(table_rows, f, indent=2, default=str)
    print(f"Saved {len(table_rows)} events to {output_file}")

print(f"Table: {len(df)} rows × {len(df.columns)} columns (23 fixed + 1 JSON column)")
print("Columns:", list(df.columns))

## 6. Preview

In [None]:
df_preview = df.copy()
df_preview["json_column"] = df_preview["json_column"].apply(
    lambda x: (json.dumps(x)[:80] + "...") if isinstance(x, dict) else (str(x)[:80] + "...")
)
df_preview.head()

---
## Streaming (Databricks + Event Hub)

The cells below require:
- PySpark and a Spark session (`spark`)
- Azure Event Hub connection string (set in Config or via secret)
- Databricks Volume for checkpoints (or equivalent paths)

### 7. Event Hub connection

In [None]:
connection_string = EVENTHUB_CONNECTION_STRING or os.environ.get("EVENTHUB_CONNECTION_STRING", "")
if not connection_string:
    print("Warning: EVENTHUB_CONNECTION_STRING not set. Set it in Config or env before streaming.")
else:
    print(f"Event Hub: {EVENTHUB_NAMESPACE}.servicebus.windows.net")
    print(f"Topic: {EVENTHUB_NAME}")
    print(f"Rate: ~{BATCH_SIZE / SLEEP_INTERVAL_SEC:.1f} events/second")

### 8. Stream IoT data to Event Hub

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

rate_stream = spark.readStream.format("rate").option("rowsPerSecond", max(1, int(BATCH_SIZE / SLEEP_INTERVAL_SEC) or 1)).load()
event_counter = [0]
stream_base_time = datetime.now().astimezone().replace(microsecond=0)

def generate_event_udf(trigger_value):
    event_counter[0] += 1
    return json.dumps(generate_iot_row(event_counter[0], stream_base_time))

generate_event = udf(generate_event_udf, StringType())

def extract_sensor_id_udf(json_str):
    try:
        return json.loads(json_str).get("sensor_id", "unknown")
    except Exception:
        return "unknown"

extract_sensor_id = udf(extract_sensor_id_udf, StringType())
iot_stream = rate_stream.withColumn("value", generate_event(col("value"))).withColumn("key", extract_sensor_id(col("value"))).select("key", "value")

jaas_config = f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connection_string}";'

if connection_string:
    query = iot_stream.writeStream.format("kafka") \
        .option("kafka.bootstrap.servers", f"{EVENTHUB_NAMESPACE}.servicebus.windows.net:9093") \
        .option("kafka.sasl.mechanism", "PLAIN") \
        .option("kafka.security.protocol", "SASL_SSL") \
        .option("kafka.sasl.jaas.config", jaas_config) \
        .option("topic", EVENTHUB_NAME) \
        .option("checkpointLocation", f"{CATALOG_VOLUME_PATH}/checkpoints/eventhub_stream") \
        .outputMode("append").start()
    print("Stream started. Press Stop to stop.")
    query.awaitTermination()
else:
    print("Skipped: set EVENTHUB_CONNECTION_STRING to run streaming.")

### 9. Read stream from Event Hub

In [None]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

jaas_config = f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connection_string}";'

kafka_stream = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", f"{EVENTHUB_NAMESPACE}.servicebus.windows.net:9093") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", jaas_config) \
    .option("subscribe", EVENTHUB_NAME) \
    .option("startingOffsets", "earliest") \
    .load()

kafka_stream.printSchema()

### 10. Parse JSON and convert json_column to VARIANT

In [None]:
from pyspark.sql.functions import parse_json

iot_schema = StructType([
    StructField("event_id", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("firmware_version", StringType(), True),
    StructField("battery_level", DoubleType(), True),
    StructField("sensor_id", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("pressure", DoubleType(), True),
    StructField("status", StringType(), True),
    StructField("region", StringType(), True),
    StructField("zone", StringType(), True),
    StructField("model", StringType(), True),
    StructField("version", StringType(), True),
    StructField("signal_strength", IntegerType(), True),
    StructField("uptime_hours", DoubleType(), True),
    StructField("fault_count", IntegerType(), True),
    StructField("last_calibration", StringType(), True),
    StructField("maintenance_due", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("updated_at", StringType(), True),
    StructField("source", StringType(), True),
    StructField("env", StringType(), True),
    StructField("json_column", StringType(), True),
])

parsed_stream = kafka_stream.select(
    col("key").cast("string").alias("kafka_key"),
    col("timestamp").alias("kafka_timestamp"),
    from_json(col("value").cast("string"), iot_schema).alias("data")
).select("kafka_key", "kafka_timestamp", "data.*")

parsed_stream_with_variant = parsed_stream.withColumn(
    "json_column_variant", parse_json(col("json_column"))
).drop("json_column")

parsed_stream_with_variant.printSchema()

### 11. Write stream to Delta table

In [None]:
query_delta = parsed_stream_with_variant.writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", CHECKPOINT_DIR) \
    .toTable(DELTA_TABLE_NAME)

print(f"Writing to Delta: {DELTA_TABLE_NAME}")
query_delta.awaitTermination()

### 12. Stop streams

In [None]:
# Run this cell to stop active streaming queries (Event Hub and/or Delta)
stopped = []
for name in ("query", "query_delta"):
    try:
        q = globals().get(name)
        if q is not None and getattr(q, "isActive", lambda: False)():
            q.stop()
            stopped.append(name)
            print(f"Stopped {name}")
    except Exception as e:
        print(f"{name}: {e}")
if not stopped:
    print("No active streams to stop (or run the stream cells first).")