## Configuration Parameters

In [None]:
# Number of transformers to simulate
NUM_TRANSFORMERS = 10

# Number of records to generate per batch
RECORDS_PER_BATCH = 50

# Stream interval in seconds (time between batches)
STREAM_INTERVAL = 5

# Anomaly settings
ANOMALY_PROBABILITY = 0.05  # 5% chance of anomaly per reading
ANOMALY_TEMP_MIN = 130      # Anomaly temperature range
ANOMALY_TEMP_MAX = 150

# Azure Service Bus / Fabric EventStream settings
SERVICEBUS_CONNECTION_STRING = ""  # Add your connection string here
EVENTSTREAM_NAME = ""  # Add your EventStream/topic name here

## Import Required Libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import random
import string
import json
from datetime import datetime, timezone
import time
from azure.servicebus import ServiceBusClient, ServiceBusMessage

## Initialize Spark Session

In [None]:
spark = SparkSession.builder \
    .appName("TransformerStreamGenerator") \
    .getOrCreate()

## Helper Functions

In [None]:
def generate_transformer_id():
    """Generate an 8-character alphanumeric transformer ID"""
    return ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))

def generate_transformer_ids(num_transformers):
    """Generate a list of unique transformer IDs"""
    return [generate_transformer_id() for _ in range(num_transformers)]

class TemperatureSimulator:
    """Simulate temperature readings with gradual changes"""
    def __init__(self, min_temp=60, max_temp=120, anomaly_probability=0.05, anomaly_min=130, anomaly_max=150):
        self.min_temp = min_temp
        self.max_temp = max_temp
        self.anomaly_probability = anomaly_probability
        self.anomaly_min = anomaly_min
        self.anomaly_max = anomaly_max
        # Initialize with random temperature for each transformer
        self.current_temps = {}
    
    def get_temperature(self, transformer_id):
        """Get temperature for a transformer with gradual changes and occasional anomalies"""
        if transformer_id not in self.current_temps:
            # Initialize with a random temperature in the middle range
            self.current_temps[transformer_id] = random.randint(80, 100)
        
        # Check for anomaly
        if random.random() < self.anomaly_probability:
            # Return anomaly temperature without updating current temp
            return random.randint(self.anomaly_min, self.anomaly_max)
        
        # Make small random changes (-5 to +5 degrees)
        change = random.randint(-5, 5)
        new_temp = self.current_temps[transformer_id] + change
        
        # Keep within bounds using Python's built-in min/max
        if new_temp < self.min_temp:
            new_temp = self.min_temp
        elif new_temp > self.max_temp:
            new_temp = self.max_temp
            
        self.current_temps[transformer_id] = new_temp
        
        return new_temp

## Initialize Transformers and Temperature Simulator

In [None]:
# Generate transformer IDs
transformer_ids = generate_transformer_ids(NUM_TRANSFORMERS)
print(f"Generated {len(transformer_ids)} transformer IDs:")
for tid in transformer_ids:
    print(f"  - {tid}")

# Initialize temperature simulator with anomaly settings
temp_simulator = TemperatureSimulator(
    anomaly_probability=ANOMALY_PROBABILITY,
    anomaly_min=ANOMALY_TEMP_MIN,
    anomaly_max=ANOMALY_TEMP_MAX
)
print(f"\nAnomaly detection configured: {ANOMALY_PROBABILITY*100}% probability, temp range {ANOMALY_TEMP_MIN}-{ANOMALY_TEMP_MAX}°F")

## Generate Stream Data

In [None]:
def generate_stream_batch(transformer_ids, temp_simulator, num_records):
    """Generate a batch of transformer readings"""
    records = []
    
    for _ in range(num_records):
        # Select a random transformer
        transformer_id = random.choice(transformer_ids)
        
        # Generate reading
        record = {
            "DateTime": datetime.now(timezone.utc).isoformat(),
            "TransformerID": transformer_id,
            "Temperature": temp_simulator.get_temperature(transformer_id),
            "Voltage": random.randint(220, 280)
        }
        records.append(record)
    
    return records

def stream_to_json(records):
    """Convert records to JSON string"""
    return json.dumps(records, indent=2)

In [None]:
# Test EventStream connectivity
if SERVICEBUS_CONNECTION_STRING and EVENTSTREAM_NAME:
    # Generate a small test batch
    test_eventstream_batch = generate_stream_batch(transformer_ids, temp_simulator, 5)
    
    # Send to EventStream
    print("Testing EventStream connection...")
    sent_count = send_to_eventstream(test_eventstream_batch, SERVICEBUS_CONNECTION_STRING, EVENTSTREAM_NAME)
    print(f"Successfully sent {sent_count} records to EventStream")
else:
    print("Please configure SERVICEBUS_CONNECTION_STRING and EVENTSTREAM_NAME in the configuration cell")

## Test EventStream Connection

Test sending a small batch to Fabric EventStream. Make sure to set your connection string and EventStream name in the configuration cell first.

In [None]:
class EventStreamSender:
    """Send data to Fabric EventStream via Azure Service Bus"""
    
    def __init__(self, connection_string, eventstream_name):
        """
        Initialize the EventStream sender
        
        Args:
            connection_string: Azure Service Bus connection string
            eventstream_name: Name of the EventStream/topic
        """
        self.connection_string = connection_string
        self.eventstream_name = eventstream_name
        self.client = None
        self.sender = None
        
    def connect(self):
        """Establish connection to Service Bus"""
        try:
            self.client = ServiceBusClient.from_connection_string(
                conn_str=self.connection_string,
                logging_enable=True
            )
            self.sender = self.client.get_topic_sender(topic_name=self.eventstream_name)
            print(f"Connected to EventStream: {self.eventstream_name}")
            return True
        except Exception as e:
            print(f"Error connecting to EventStream: {e}")
            return False
    
    def send_batch(self, records):
        """
        Send a batch of records to EventStream
        
        Args:
            records: List of record dictionaries
            
        Returns:
            Number of records sent successfully
        """
        if not self.sender:
            print("Not connected. Call connect() first.")
            return 0
        
        try:
            # Create a batch of messages
            message_batch = self.sender.create_message_batch()
            
            sent_count = 0
            for record in records:
                # Convert record to JSON string
                message_body = json.dumps(record)
                message = ServiceBusMessage(message_body)
                
                try:
                    message_batch.add_message(message)
                    sent_count += 1
                except ValueError:
                    # Batch is full, send it and create a new batch
                    self.sender.send_messages(message_batch)
                    message_batch = self.sender.create_message_batch()
                    message_batch.add_message(message)
                    sent_count += 1
            
            # Send any remaining messages
            if len(message_batch) > 0:
                self.sender.send_messages(message_batch)
            
            return sent_count
            
        except Exception as e:
            print(f"Error sending batch: {e}")
            return 0
    
    def send_single(self, record):
        """
        Send a single record to EventStream
        
        Args:
            record: Record dictionary
            
        Returns:
            True if successful, False otherwise
        """
        if not self.sender:
            print("Not connected. Call connect() first.")
            return False
        
        try:
            message_body = json.dumps(record)
            message = ServiceBusMessage(message_body)
            self.sender.send_messages(message)
            return True
        except Exception as e:
            print(f"Error sending message: {e}")
            return False
    
    def close(self):
        """Close the connection to Service Bus"""
        try:
            if self.sender:
                self.sender.close()
            if self.client:
                self.client.close()
            print("Connection closed.")
        except Exception as e:
            print(f"Error closing connection: {e}")

def send_to_eventstream(records, connection_string, eventstream_name):
    """
    Helper function to send records to EventStream
    
    Args:
        records: List of record dictionaries
        connection_string: Azure Service Bus connection string
        eventstream_name: Name of the EventStream/topic
        
    Returns:
        Number of records sent successfully
    """
    sender = EventStreamSender(connection_string, eventstream_name)
    if sender.connect():
        sent_count = sender.send_batch(records)
        sender.close()
        return sent_count
    return 0

## Send Data to Fabric EventStream

Functions to send generated data to Azure Service Bus / Fabric EventStream.

## Generate Single Batch (Test)

Generate and display a single batch of records to verify the output format.

In [None]:
# Generate a test batch
test_batch = generate_stream_batch(transformer_ids, temp_simulator, 10)
print("Sample batch of 10 records:")
print(stream_to_json(test_batch))

## Create PySpark DataFrame from Batch

In [None]:
# Define schema
schema = StructType([
    StructField("DateTime", StringType(), False),
    StructField("TransformerID", StringType(), False),
    StructField("Temperature", IntegerType(), False),
    StructField("Voltage", IntegerType(), False)
])

# Create DataFrame from test batch
df = spark.createDataFrame(test_batch, schema=schema)
df.show(truncate=False)

## Continuous Stream Generation

Generate continuous batches of transformer data. This cell will run indefinitely until manually stopped.

In [None]:
# Continuous stream generation (run until stopped)
print(f"Starting continuous stream generation...")
print(f"Generating {RECORDS_PER_BATCH} records every {STREAM_INTERVAL} seconds")
print(f"Press 'Stop' to terminate\n")

batch_count = 0

try:
    while True:
        batch_count += 1
        
        # Generate batch
        batch = generate_stream_batch(transformer_ids, temp_simulator, RECORDS_PER_BATCH)
        
        # Create DataFrame
        df = spark.createDataFrame(batch, schema=schema)
        
        # Display summary
        print(f"\n=== Batch {batch_count} - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')} ===")
        print(f"Records generated: {df.count()}")
        df.groupBy("TransformerID").agg(
            count("*").alias("RecordCount"),
            avg("Temperature").alias("AvgTemperature"),
            avg("Voltage").alias("AvgVoltage")
        ).show()
        
        # Optional: Show sample records
        print("Sample records:")
        df.show(5, truncate=False)
        
        # Wait before next batch
        time.sleep(STREAM_INTERVAL)
        
except KeyboardInterrupt:
    print(f"\nStream generation stopped. Total batches generated: {batch_count}")

In [None]:
# Continuous stream generation with EventStream integration
if not SERVICEBUS_CONNECTION_STRING or not EVENTSTREAM_NAME:
    print("ERROR: Please configure SERVICEBUS_CONNECTION_STRING and EVENTSTREAM_NAME first!")
else:
    print(f"Starting continuous stream to EventStream...")
    print(f"Generating {RECORDS_PER_BATCH} records every {STREAM_INTERVAL} seconds")
    print(f"Press 'Stop' to terminate\n")
    
    # Connect to EventStream
    eventstream_sender = EventStreamSender(SERVICEBUS_CONNECTION_STRING, EVENTSTREAM_NAME)
    if not eventstream_sender.connect():
        print("Failed to connect to EventStream. Exiting.")
    else:
        batch_count = 0
        total_sent = 0
        
        try:
            while True:
                batch_count += 1
                
                # Generate batch
                batch = generate_stream_batch(transformer_ids, temp_simulator, RECORDS_PER_BATCH)
                
                # Send to EventStream
                sent_count = eventstream_sender.send_batch(batch)
                total_sent += sent_count
                
                # Create DataFrame for display
                df = spark.createDataFrame(batch, schema=schema)
                
                # Display summary
                print(f"\n=== Batch {batch_count} - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')} ===")
                print(f"Records generated: {df.count()}")
                print(f"Records sent to EventStream: {sent_count}")
                print(f"Total records sent: {total_sent}")
                
                df.groupBy("TransformerID").agg(
                    count("*").alias("RecordCount"),
                    avg("Temperature").alias("AvgTemperature"),
                    avg("Voltage").alias("AvgVoltage")
                ).show()
                
                # Check for anomalies in this batch
                anomalies = df.filter(col("Temperature") > 120)
                if anomalies.count() > 0:
                    print(f"⚠️  ANOMALIES DETECTED: {anomalies.count()} records")
                    anomalies.show(truncate=False)
                
                # Wait before next batch
                time.sleep(STREAM_INTERVAL)
                
        except KeyboardInterrupt:
            print(f"\nStream generation stopped.")
            print(f"Total batches generated: {batch_count}")
            print(f"Total records sent: {total_sent}")
        finally:
            eventstream_sender.close()

## Continuous Stream to EventStream

Generate continuous batches and send them to Fabric EventStream. This cell will run indefinitely until manually stopped.

## Export Single Batch to JSON File

Generate a single batch and save it to a JSON file for testing or integration purposes.

In [None]:
# Generate batch and save to file
output_batch = generate_stream_batch(transformer_ids, temp_simulator, RECORDS_PER_BATCH)
output_filename = f"transformer_stream_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"

with open(output_filename, 'w') as f:
    json.dump(output_batch, f, indent=2)

print(f"Batch exported to: {output_filename}")
print(f"Records: {len(output_batch)}")