# Libraries and Variables definition
## Conda preparation
```
conda create -n event_hubs python=3.12
conda activate event_hubs
pip install azure-eventhub azure-eventhub-checkpointstoreblob python-dotenv
pip install jupyter
jupyter kernelspec uninstall event_hubs
python -m ipykernel install --name event_hubs --user
jupyter kernelspec list
```

In [1]:
import os, urllib.parse, hmac, hashlib, base64, time
from dotenv import load_dotenv # requires python-dotenv

load_dotenv("./../config/credentials_my.env")

True

# Create the Authorization Token

In [2]:
def generate_sas_token(uri, key_name, key, expiry=int(time.time() + 3600)):  
    string_to_sign = urllib.parse.quote_plus(uri) + '\n' + str(expiry)  
    signature = base64.b64encode(hmac.new(key.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8')  
    token = f'SharedAccessSignature sr={urllib.parse.quote_plus(uri)}&sig={urllib.parse.quote_plus(signature)}&se={expiry}&skn={key_name}'  
    return token  

uri = os.environ["ehn_uri"]
key_name = os.environ["ehn_key_name"]
key = os.environ["ehn_key"]
expiry = int(time.time() + 365 * 24 * 60 * 60) # Token valid for 1 year

  
sas_token = generate_sas_token(uri, key_name, key, expiry=expiry)  
print(sas_token[:20]) 

SharedAccessSignatur


# Send the HTTP POST Request

In [3]:
import requests, random
  
url = f"{uri}/messages"
  
headers = {  
    "Authorization": sas_token, 
    "Content-Type": "application/json"
}  
  
data = {  
    "value": "data"
}

for i in range(10):
    data["value"] = random.random()
    response = requests.post(url, headers=headers, json=data)  
  
print(response.status_code)  
print(response.text)  

201



# Consumer client (with @latest Checkpoint)

In [None]:
import os  
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
  
# Fetch connection details from environment variables  
connection_str = os.environ["ehn_connection_string"]  
consumer_group = os.environ["ehn_consumer_group"]  # Default consumer group  
eventhub_name = os.environ["ehn_eventhub_name"]
  
events_count = 0  
  
def on_event(partition_context, event):  
    global events_count  
    events_count += 1  
    print(f"Received event {events_count} from partition: {partition_context.partition_id}. Event data: {event.body_as_str()}")  
      
    # Update checkpoint to mark this event as processed  
    partition_context.update_checkpoint(event)  
  
# Initialize the EventHubConsumerClient  
client = EventHubConsumerClient.from_connection_string(  
    connection_str,  
    consumer_group,  
    eventhub_name=eventhub_name
)  
  
# Use the client within a context manager to ensure proper resource cleanup  
with client:  
    client.receive(  
        on_event=on_event,  
        starting_position="@latest",  # "-1" is from the beginning of the partition, "@latest" means after the consumer starts
    )  

Received event 1 from partition: 0. Event data: {
  "your": "data"
} 
Received event 2 from partition: 0. Event data: {
  "your": "data2"
} 
Received event 3 from partition: 0. Event data: {
  "your": "data3"
} 


# Consumer client (with Blob Checkpoint)

In [None]:
import os  
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
  
# Fetch connection details from environment variables  
connection_str = os.environ["ehn_connection_string"]  
consumer_group = os.environ["ehn_consumer_group"]  # Default consumer group  
eventhub_name = os.environ["ehn_eventhub_name"]
storage_connection_str = os.environ["ehn_storage_connection_string"]
container_name = "eventhub-checkpoints"

# Setup the checkpoint store  
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
  
events_count = 0  
  
def on_event(partition_context, event):  
    global events_count  
    events_count += 1  
    print(f"Received event {events_count} from partition: {partition_context.partition_id}. Event data: {event.body_as_str()}")  
      
    # Update checkpoint to mark this event as processed  
    partition_context.update_checkpoint(event)  
  
# Initialize the EventHubConsumerClient  
client = EventHubConsumerClient.from_connection_string(  
    connection_str,  
    consumer_group,  
    eventhub_name=eventhub_name,  # Pass eventhub_name as a keyword argument  
    checkpoint_store=checkpoint_store
)  
  
# Use the client within a context manager to ensure proper resource cleanup  
with client:  
    client.receive(  
        on_event=on_event,  
        starting_position="@latest",  # "-1" is from the beginning of the partition, "@latest" means after the consumer starts
    )  