In [1]:
pip install requests pytz azure-servicebus

Collecting azure-servicebus
  Downloading azure_servicebus-7.14.3-py3-none-any.whl.metadata (98 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.0/99.0 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
Collecting azure-core>=1.28.0 (from azure-servicebus)
  Downloading azure_core-1.36.0-py3-none-any.whl.metadata (47 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m47.1/47.1 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting isodate>=0.6.0 (from azure-servicebus)
  Downloading isodate-0.7.2-py3-none-any.whl.metadata (11 kB)
Downloading azure_servicebus-7.14.3-py3-none-any.whl (412 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m412.5/412.5 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading azure_core-1.36.0-py3-none-any.whl (213 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m213.3/213.3 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading isodate-0.7.2-py3-none-any.whl (22 kB)

In [None]:
import requests
import json
import time
from datetime import datetime
from azure.servicebus import ServiceBusClient, ServiceBusMessage
import pytz

# Replace with your Fabric EventStream connection string
myconnectionstring = "<insert your EventStream SAS key>"
# API URL - Change this to any API you want to use
#API_URL = "https://api.binance.us/api/v3/ticker/price"  # Example: Binance API
API_URL = "https://api.wheretheiss.at/v1/satellites/25544"  # Example: ISS API

# Function to fetch data from any API
def fetch_api_data():
    try:
        response = requests.get(API_URL)
        response.raise_for_status()  # Raises an error if the request fails
        data = response.json()

        # Convert single object response to a list for consistency
        if isinstance(data, dict):
            return [data]  # Wrap single dictionary in a list

        return data  # Return list as-is
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data: {e}")
        return None

# Function to add timestamps in Kampala, Uganda time (East Africa Time - EAT)
def add_timestamps(data):
    kampala_tz = pytz.timezone("Africa/Kampala")  # East Africa Time (UTC+3)
    now_kampala = datetime.now(kampala_tz)

    formatted_datetime = now_kampala.strftime("%m/%d/%Y %I:%M:%S %p")  # MM/DD/YYYY HH:MM:SS AM/PM
    date_column = now_kampala.strftime("%d-%m-%Y")  # DD-MM-YYYY
    time_column = now_kampala.strftime("%H:%M:%S")  # HH:MM:SS

    for record in data:
        record["datetime"] = formatted_datetime
        record["date"] = date_column
        record["time"] = time_column

    return data

# Function to send processed data to Microsoft Fabric EventStream
def send_to_eventstream(messages, connection_string):
    # Extract EntityPath from connection string
    entity_path = None
    for param in connection_string.split(';'):
        if param.startswith('EntityPath='):
            entity_path = param.split('=')[1]
            break

    if not entity_path:
        raise ValueError("EntityPath not found in connection string. Please check your connection details.")

    # Ensure data is always a list before sending
    if isinstance(messages, dict):
        messages = [messages]  # Convert single object to a list

    # Establish connection to Fabric EventStream
    servicebus_client = ServiceBusClient.from_connection_string(connection_string)
    try:
        with servicebus_client.get_queue_sender(entity_path) as sender:
            # Convert messages to JSON format
            batch_message = [ServiceBusMessage(json.dumps(msg)) for msg in messages]
            sender.send_messages(batch_message)
            print(f"Successfully sent {len(messages)} records to EventStream.")
    except Exception as e:
        print(f"Error sending messages: {e}")
    finally:
        servicebus_client.close()

# Infinite loop to fetch and send data every 2 seconds
print(f"Starting real-time data streaming from {API_URL} to Fabric (Kampala Time)...")
while True:
    data = fetch_api_data()  # Fetch data from API
    if data:
        processed_data = add_timestamps(data)  # Add date and time in Kampala timezone
        send_to_eventstream(processed_data, myconnectionstring)  # Send data to EventStream
        print(f"Sent {len(processed_data)} records at {processed_data[0]['datetime']} EAT")
    time.sleep(1)  # Wait for 2 seconds before fetching new data

Starting real-time data streaming from https://api.wheretheiss.at/v1/satellites/25544 to Fabric (Kampala Time)...
Error sending messages: Failed to initiate the connection due to exception: [Errno -2] Name or service not known Error condition: amqp:socket-error.
Sent 1 records at 11/02/2025 10:43:33 AM EAT
Error sending messages: Failed to initiate the connection due to exception: [Errno -2] Name or service not known Error condition: amqp:socket-error.
Sent 1 records at 11/02/2025 10:43:46 AM EAT
Error sending messages: Failed to initiate the connection due to exception: [Errno -2] Name or service not known Error condition: amqp:socket-error.
Sent 1 records at 11/02/2025 10:43:58 AM EAT
Error sending messages: Failed to initiate the connection due to exception: [Errno -2] Name or service not known Error condition: amqp:socket-error.
Sent 1 records at 11/02/2025 10:44:11 AM EAT
Error sending messages: Failed to initiate the connection due to exception: [Errno -2] Name or service not kno