![logo](https://img-prod-cms-rt-microsoft-com.akamaized.net/cms/api/am/imageFileData/RE1Mu3b?ver=5c31)

# **Fabric**
### Simulating streaming data for Realtime Analytics ⚡ using fabric Data Engineering notebook 
### AKA: "The Wood Chipper" 
This notebook will read any CSV you give it via the "SampleCsv" Parameter and will send it to an EventStream custom app endpoint (event hub). The notebook will send a certain number of lines for each batches according to the "MyBatchSize" Parameter. The number of batch size is computed automatically according to the total number of lines and batch size and the while loop will stop once the file has been streamed completely.  

### **0. Set the parameters**

In [1]:
# The connection string is what you get from the "custom app" endpoint in EventStream
MyConnectionString = ''

# This is the endpoint for where the CSV file is sitting.
SampleCsv = 'abfss://3582b164-c42f-4707-98ac-a85e3bf6a734@msit-onelake.dfs.fabric.microsoft.com/31ba9dc7-0bc3-4f63-9b4b-ffdadd957944/Files/QoS_data/QueenOfTheSky_ex.csv'

# Set batch size (i.e. number of rows from the CSV that being sent at once. use a higher number when wanting a more rapid movement on the report)
MyBatchSize = 12


StatementMeta(, f5198be6-199b-4b75-ae83-27a76d5079bb, 3, Finished, Available)

### **1. Install dependencies and Event Hub library**

In [2]:
pip install azure-eventhub>=5.11.0

StatementMeta(, f5198be6-199b-4b75-ae83-27a76d5079bb, 4, Finished, Available)

Note: you may need to restart the kernel to use updated packages.


In [3]:
import time
import os
import datetime
import json
import math
from azure.eventhub import EventHubProducerClient, EventData

StatementMeta(, f5198be6-199b-4b75-ae83-27a76d5079bb, 5, Finished, Available)

### **2. Create a Python script to send events to your event stream**

ref: https://learn.microsoft.com/azure/event-hubs/event-hubs-capture-python#create-a-python-script-to-send-events-to-your-event-hub

In [4]:
#Read in the CSV to a dataframe. 
df = spark.read.csv(path=SampleCsv,header=True)

#Instantiate an event hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=MyConnectionString)

# Determine the row count of the file
z = df.count() 

#Set some control variables
i            = MyBatchSize 
x            = 0     # We open the batch at the first row by array index so we stat at 0
y            = x+i   # We seal the batch at Start + Increment(i)
BatchCounter = 0     # Initializing a batch counter
RowCounter   = 0     # Initializine a Row counter
TargetBatchCount = z/i if z%i ==0 else math.ceil(z/i) # Adding an additional batch if (RowCount / BatchSize) has a residual to catch them.
print ('====================================')
print ('Target batch count should be: '+ str(TargetBatchCount))
print ('====================================')
print ('Beginning stream...')
print ('====================================')

while BatchCounter < TargetBatchCount:

    BatchCounter = BatchCounter + 1 # == Mouve our batch counter one notch up  
    b = producer.create_batch()     # == Instantiate the batch
    j = df.toJSON().collect()[x:y]  # == Collect Rows from x to y and convert them to JSON
    for ii in range(0, len(j)):     # == We have to add every row in the batch individually to the event hub payload so Kusto can read it in.    
        b.add(EventData(j[ii]))     # == Add the JSON to the payload
    producer.send_batch(b)          # == Send the batch to Event hub!
    time.sleep(1)                   # == We add an intentional 1s pause
    producer.close()                # == Clean up the batch
    # Printing some stats to track the stream                
    print ('This was batch #:' + str(BatchCounter))
    print ('We loaded rows from: ' + str(x) + ' to row: ' + str(y)) 
    #Setting the control variable for the next pass
    RowCounter   = RowCounter + i
    RowRemaining = max(0,(z-RowCounter))
    x = y
    y = x+i if RowRemaining > i else x+RowRemaining
    print ('Rows remaining in the stream: ' + str(RowRemaining))
    print ('====================================')

print ('====================================')  
print ('End of stream reached')
print ('====================================')    
print ('Number of batches was: '  + str(BatchCounter))
print ('Last batch was from row: '+ str(x) + ' to row: '+ str(y)) 


StatementMeta(, f5198be6-199b-4b75-ae83-27a76d5079bb, 6, Finished, Available)

Target batch count should be: 148
Beginning stream...
This was batch #:1
We loaded rows from: 0 to row: 12
Rows remaining in the stream: 1754
This was batch #:2
We loaded rows from: 12 to row: 24
Rows remaining in the stream: 1742
This was batch #:3
We loaded rows from: 24 to row: 36
Rows remaining in the stream: 1730
This was batch #:4
We loaded rows from: 36 to row: 48
Rows remaining in the stream: 1718
This was batch #:5
We loaded rows from: 48 to row: 60
Rows remaining in the stream: 1706
This was batch #:6
We loaded rows from: 60 to row: 72
Rows remaining in the stream: 1694
This was batch #:7
We loaded rows from: 72 to row: 84
Rows remaining in the stream: 1682
This was batch #:8
We loaded rows from: 84 to row: 96
Rows remaining in the stream: 1670
This was batch #:9
We loaded rows from: 96 to row: 108
Rows remaining in the stream: 1658
This was batch #:10
We loaded rows from: 108 to row: 120
Rows remaining in the stream: 1646
This was batch #:11
We loaded rows from: 120 to row: 