![logo](https://img-prod-cms-rt-microsoft-com.akamaized.net/cms/api/am/imageFileData/RE1Mu3b?ver=5c31)

# **Fabric**
### Simulating streaming data for Realtime Analytics ⚡ using fabric Data Engineering notebook 
### AKA: "The Wood Chipper" 
This notebook will read any CSV you give it via the "SampleCsv" Parameter and will send it to an EventStream custom app endpoint (event hub). The notebook will send a certain number of lines for each batches according to the "MyBatchSize" Parameter. The number of batch size is computed automatically according to the total number of lines and batch size and the while loop will stop once the file has been streamed completely.  

### **0. Set the parameters**

In [None]:
# The connection string is what you get from the "custom app" endpoint in EventStream
MyConnectionString = ''

# Set batch size (i.e. number of rows from the CSV that being sent at once. use a higher number when wanting a more rapid movement on the report)
MyBatchSize = 12


In [None]:
import os

# url to source csv file
FULL_URL = "https://raw.githubusercontent.com/microsoft/FabricCAT/main/QueenOfTheSkies/Data/QueenOfTheSky_ex.csv"
# lakehouse location -- assumes default lakehouse
LAKEHOUSE_FOLDER = "/lakehouse/default"

# filename and data folders
CSV_FILE_NAME = "QueenOfTheSky_ex.csv"
DATA_FOLDER = "Files/QoS"
CSV_FILE_PATH = f"{LAKEHOUSE_FOLDER}/{DATA_FOLDER}/"

if not os.path.exists(LAKEHOUSE_FOLDER):
    # add a lakehouse if the notebook has no default lakehouse
    # a new notebook will not link to any lakehouse by default
    raise FileNotFoundError(
        "Lakehouse not found, please add a lakehouse for the notebook."
    )
else:
    # verify whether or not the required files are already in the lakehouse, and if not, download and unzip
    if not os.path.exists(f"{CSV_FILE_PATH}{CSV_FILE_NAME}"):
        print(f"{CSV_FILE_PATH}{CSV_FILE_NAME}")
        os.makedirs(CSV_FILE_PATH, exist_ok=True)
        os.system(f"wget '{FULL_URL}' -O {CSV_FILE_PATH}{CSV_FILE_NAME}")

SampleCsv = f"{DATA_FOLDER}/{CSV_FILE_NAME}"


### **1. Install dependencies and Event Hub library**

In [None]:
pip install azure-eventhub>=5.11.0

In [None]:
import time
import os
import datetime
import json
import math
from azure.eventhub import EventHubProducerClient, EventData

### **2. Create a Python script to send events to your event stream**

ref: https://learn.microsoft.com/azure/event-hubs/event-hubs-capture-python#create-a-python-script-to-send-events-to-your-event-hub

In [None]:
#Read in the CSV to a dataframe. 
df = spark.read.csv(path=SampleCsv,header=True)

#Instantiate an event hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=MyConnectionString)

# Determine the row count of the file
z = df.count() 

#Set some control variables
i            = MyBatchSize 
x            = 0     # We open the batch at the first row by array index so we stat at 0
y            = x+i   # We seal the batch at Start + Increment(i)
BatchCounter = 0     # Initializing a batch counter
RowCounter   = 0     # Initializine a Row counter
TargetBatchCount = z/i if z%i ==0 else math.ceil(z/i) # Adding an additional batch if (RowCount / BatchSize) has a residual to catch them.
print ('====================================')
print ('Target batch count should be: '+ str(TargetBatchCount))
print ('====================================')
print ('Beginning stream...')
print ('====================================')

while BatchCounter < TargetBatchCount:

    BatchCounter = BatchCounter + 1 # == Mouve our batch counter one notch up  
    b = producer.create_batch()     # == Instantiate the batch
    j = df.toJSON().collect()[x:y]  # == Collect Rows from x to y and convert them to JSON
    for ii in range(0, len(j)):     # == We have to add every row in the batch individually to the event hub payload so Kusto can read it in.    
        b.add(EventData(j[ii]))     # == Add the JSON to the payload
    producer.send_batch(b)          # == Send the batch to Event hub!
    time.sleep(1)                   # == We add an intentional 1s pause
    producer.close()                # == Clean up the batch
    # Printing some stats to track the stream                
    print ('This was batch #:' + str(BatchCounter))
    print ('We loaded rows from: ' + str(x) + ' to row: ' + str(y)) 
    #Setting the control variable for the next pass
    RowCounter   = RowCounter + i
    RowRemaining = max(0,(z-RowCounter))
    x = y
    y = x+i if RowRemaining > i else x+RowRemaining
    print ('Rows remaining in the stream: ' + str(RowRemaining))
    print ('====================================')

print ('====================================')  
print ('End of stream reached')
print ('====================================')    
print ('Number of batches was: '  + str(BatchCounter))
print ('Last batch was from row: '+ str(x) + ' to row: '+ str(y)) 
