## Sending a test to the Event Hub

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
import json

# Configuration: Retrieve Event Hub connection string securely from Azure Key Vault in Databricks
try:
    eventhub_connection_string = dbutils.secrets.get(scope="key-vault-scope", key="eventhub-connection-string")
    print("Successfully retrieved Event Hub connection string from Key Vault.")
except Exception as e:
    print(f"Error retrieving secret from Key Vault: {e}")
    raise

# Event Hub Name
EVENT_HUB_NAME = "weatherstreamingeventhub"

# Initialize the Event Hub producer client
try:
    producer = EventHubProducerClient.from_connection_string(
        conn_str=eventhub_connection_string, 
        eventhub_name=EVENT_HUB_NAME
    )
    print("Successfully connected to Event Hub.")
except Exception as e:
    print(f"Error initializing Event Hub producer: {e}")
    raise

# Function to send events to Azure Event Hub
def send_event(event):
    try:
        # Create a batch for sending events
        event_data_batch = producer.create_batch()
        
        # Add the event data to the batch (converted to JSON string)
        event_data_batch.add(EventData(json.dumps(event)))
        
        # Send the batch to Event Hub
        producer.send_batch(event_data_batch)
        print("Event successfully sent to Event Hub!")
    except Exception as e:
        print(f"Error sending event: {e}")

# Sample event data to be sent to Event Hub
event = {
    "event_id": 2222,
    "event_name": "Key Vault Test"
}

# Send the event
send_event(event)

# Close the producer client after sending events
producer.close()
print("Event Hub producer closed.")


Successfully retrieved Event Hub connection string from Key Vault.
Successfully connected to Event Hub.
Event successfully sent to Event Hub!
Event Hub producer closed.


#### API Testing

In [0]:
import requests
import json

# Retrieve API key securely from Azure Key Vault in Databricks
try:
    weather_api_key = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")
    print("Successfully retrieved Weather API key from Key Vault.")
except Exception as e:
    print(f"Error retrieving API key from Key Vault: {e}")
    raise

# Define base URL and endpoint for the Weather API
base_url = "http://api.weatherapi.com/v1"
current_weather_url = f"{base_url}/current.json"

# Specify the location for which weather data is requested
location = "Auckland"  # Change to preferred city

# Define query parameters for the API request
params = {
    "key": weather_api_key,
    "q": location
}

# Make the GET request to the Weather API
response = requests.get(current_weather_url, params=params)

# Check if the request was successful
if response.status_code == 200:
    current_weather = response.json()
    print("Current Weather:")
    print(json.dumps(current_weather, indent=3))
else:
    print(f"Error: {response.status_code}, {response.text}")

Successfully retrieved Weather API key from Key Vault.
Current Weather:
{
   "location": {
      "name": "Auckland",
      "region": "",
      "country": "New Zealand",
      "lat": -36.8667,
      "lon": 174.7667,
      "tz_id": "Pacific/Auckland",
      "localtime_epoch": 1742557132,
      "localtime": "2025-03-22 00:38"
   },
   "current": {
      "last_updated_epoch": 1742556600,
      "last_updated": "2025-03-22 00:30",
      "temp_c": 16.2,
      "temp_f": 61.2,
      "is_day": 0,
      "condition": {
         "text": "Clear",
         "icon": "//cdn.weatherapi.com/weather/64x64/night/113.png",
         "code": 1000
      },
      "wind_mph": 2.5,
      "wind_kph": 4.0,
      "wind_degree": 251,
      "wind_dir": "WSW",
      "pressure_mb": 1019.0,
      "pressure_in": 30.09,
      "precip_mm": 0.0,
      "precip_in": 0.0,
      "humidity": 88,
      "cloud": 0,
      "feelslike_c": 16.2,
      "feelslike_f": 61.2,
      "windchill_c": 15.2,
      "windchill_f": 59.4,
      "heat

### Complete code for getting weather data

In [0]:
import requests
import json

# Function to handle API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": f"Error: {response.status_code}, {response.text}"}

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes"
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get forecast weather data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get weather alerts
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        "key": api_key,
        "q": location,
        "alerts": "yes"
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Function to flatten and merge weather data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        "name": location_data.get("name"),
        "region": location_data.get("region"),
        "country": location_data.get("country"),
        "lat": location_data.get("lat"),
        "lon": location_data.get("lon"),
        "localtime": location_data.get("localtime"),
        "temp_c": current.get("temp_c"),
        "is_day": current.get("is_day"),
        "condition_text": condition.get("text"),
        "condition_icon": condition.get("icon"),
        "wind_kph": current.get("wind_kph"),
        "wind_degree": current.get("wind_degree"),
        "wind_dir": current.get("wind_dir"),
        "pressure_in": current.get("pressure_in"),
        "precip_in": current.get("precip_in"),
        "humidity": current.get("humidity"),
        "cloud": current.get("cloud"),
        "feelslike_c": current.get("feelslike_c"),
        "uv": current.get("uv"),
        "air_quality": {
            "co": air_quality.get("co"),
            "no2": air_quality.get("no2"),
            "o3": air_quality.get("o3"),
            "so2": air_quality.get("so2"),
            "pm2_5": air_quality.get("pm2_5"),
            "pm10": air_quality.get("pm10"),
            "us-epa-index": air_quality.get("us-epa-index"),
            "gb-defra-index": air_quality.get("gb-defra-index")
        },
        "alerts": [
            {
                "headline": alert.get("headline"),
                "severity": alert.get("severity"),
                "description": alert.get("desc"),
                "instruction": alert.get("instruction")
            }
            for alert in alert_list
        ],
        "forecast": [
            {
                "date": day.get("date"),
                "maxtemp_c": day.get("day", {}).get("maxtemp_c"),
                "mintemp_c": day.get("day", {}).get("mintemp_c"),
                "condition": day.get("day", {}).get("condition", {}).get("text")
            }
            for day in forecast
        ]
    }
    return flattened_data

# Main function to fetch and display weather data
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1"
    location = "Auckland"  # Change to preferred city
    
    # Retrieve API key securely from Azure Key Vault
    try:
        weather_api_key = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")
    except Exception as e:
        print(f"Error retrieving API key: {e}")
        return

    # Fetch weather data from APIs
    current_weather = get_current_weather(base_url, weather_api_key, location)
    forecast_weather = get_forecast_weather(base_url, weather_api_key, location, 3)
    alerts = get_alerts(base_url, weather_api_key, location)

    # Merge and flatten data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    print("Weather Data:", json.dumps(merged_data, indent=3))

# Execute the main function
fetch_weather_data()


Weather Data: {
   "name": "Auckland",
   "region": "",
   "country": "New Zealand",
   "lat": -36.8667,
   "lon": 174.7667,
   "localtime": "2025-03-22 00:41",
   "temp_c": 16.2,
   "is_day": 0,
   "condition_text": "Clear",
   "condition_icon": "//cdn.weatherapi.com/weather/64x64/night/113.png",
   "wind_kph": 4.0,
   "wind_degree": 251,
   "wind_dir": "WSW",
   "pressure_in": 30.09,
   "precip_in": 0.0,
   "humidity": 88,
   "cloud": 0,
   "feelslike_c": 16.2,
   "uv": 0.0,
   "air_quality": {
      "co": 159.1,
      "no2": 2.59,
      "o3": 60.0,
      "so2": 2.59,
      "pm2_5": 2.59,
      "pm10": 2.775,
      "us-epa-index": 1,
      "gb-defra-index": 1
   },
   "alerts": [],
   "forecast": [
      {
         "date": "2025-03-22",
         "maxtemp_c": 22.4,
         "mintemp_c": 14.8,
         "condition": "Sunny"
      },
      {
         "date": "2025-03-23",
         "maxtemp_c": 22.4,
         "mintemp_c": 14.2,
         "condition": "Sunny"
      },
      {
         "date

### Sending the complete weather data to the Event HUB


In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData

# Retrieve Event Hub connection string securely from Azure Key Vault
eventhub_connection_string = dbutils.secrets.get(scope="key-vault-scope", key="eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"

# Initialize the Event Hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=eventhub_connection_string, eventhub_name=EVENT_HUB_NAME)

# Function to send events to Event Hub
def send_event(event):
    try:
        event_data_batch = producer.create_batch()
        event_data_batch.add(EventData(json.dumps(event)))
        producer.send_batch(event_data_batch)
        print("Event successfully sent to Event Hub!")
    except Exception as e:
        print(f"Error sending event: {e}")

# Function to handle API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": f"Error: {response.status_code}, {response.text}"}

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": 'yes'
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast Data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data

# Main function to fetch and send weather data
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Auckland"  # Change to preferred city
    
    # Retrieve API key securely from Azure Key Vault
    try:
        weather_api_key = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")
    except Exception as e:
        print(f"Error retrieving API key: {e}")
        return

    # Get data from API
    current_weather = get_current_weather(base_url, weather_api_key, location)
    forecast_weather = get_forecast_weather(base_url, weather_api_key, location, 3)
    alerts = get_alerts(base_url, weather_api_key, location)

    # Flatten and merge data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)

    # Sending the weather data to Event Hub
    send_event(merged_data)

# Execute the main function
fetch_weather_data()


Event successfully sent to Event Hub!


### Sending the weather data in streaming fashion

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
import json
import requests

# Event Hub configuration
EVENT_HUB_NAME = "weatherstreamingeventhub"
eventhub_connection_string = dbutils.secrets.get(scope="key-vault-scope", key="eventhub-connection-string")
weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

# Initialize the Event Hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=eventhub_connection_string, eventhub_name=EVENT_HUB_NAME)

# Function to send events to Event Hub
def send_event(event):
    try:
        event_data_batch = producer.create_batch()
        event_data_batch.add(EventData(json.dumps(event)))
        producer.send_batch(event_data_batch)
        print("Event successfully sent to Event Hub!")
    except Exception as e:
        print(f"Error sending event: {e}")

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": f"Error: {response.status_code}, {response.text}"}

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": 'yes'
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast Data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/forecast.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data

# Function to fetch weather data
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Auckland"
    
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3)
    alerts = get_alerts(base_url, weatherapikey, location)
    
    return flatten_data(current_weather, forecast_weather, alerts)

# Function to process streaming data batch
def process_batch(batch_df, batch_id):
    try:
        weather_data = fetch_weather_data()
        send_event(weather_data)
    except Exception as e:
        print(f"Error sending events in batch {batch_id}: {str(e)}")
        raise e

# Set up a streaming source (rate source for testing purposes)
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Write the streaming data using foreachBatch to send weather data to Event Hub
query = streaming_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()

# Close the producer after termination
producer.close()


Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event successfully sent to Event Hub!
Event succes

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

### Sending the weather data to Event HUB in every 30 seconds


In [0]:
from azure.eventhub import EventHubProducerClient, EventData
import json
import requests
from datetime import datetime, timedelta

# Event Hub configuration
EVENT_HUB_NAME = "weatherstreamingeventhub"
eventhub_connection_string = dbutils.secrets.get(scope="key-vault-scope", key="eventhub-connection-string")
weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

# Initialize the Event Hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=eventhub_connection_string, eventhub_name=EVENT_HUB_NAME)

# Function to send events to Event Hub
def send_event(event):
    try:
        event_data_batch = producer.create_batch()
        event_data_batch.add(EventData(json.dumps(event)))
        producer.send_batch(event_data_batch)
        print("Event successfully sent to Event Hub!")
    except Exception as e:
        print(f"Error sending event: {e}")

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": f"Error: {response.status_code}, {response.text}"}

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": 'yes'
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast Data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/forecast.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data

# Function to fetch weather data
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Auckland"
    
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3)
    alerts = get_alerts(base_url, weatherapikey, location)
    
    return flatten_data(current_weather, forecast_weather, alerts)

# Function to process streaming data batch
last_sent_time = datetime.now() - timedelta(seconds=30)

# Main program: Process batch data from streaming source
def process_batch(batch_df, batch_id):
    global last_sent_time
    try:
        # Get the current time
        current_time = datetime.now()
        
        # Check if at least 30 seconds have passed since the last event was sent
        if (current_time - last_sent_time).total_seconds() >= 30:
            
            # Fetch the latest weather data
            weather_data = fetch_weather_data()
            
            # Send the fetched weather data to Event Hub
            send_event(weather_data)
            
            # Update the last sent time to the current time
            last_sent_time = current_time
            print(f'Event Sent at {last_sent_time}')
    except Exception as e:
        print(f"Error sending events in batch {batch_id}: {str(e)}")
        raise e

# Set up a streaming source (rate source for testing purposes)
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Write the streaming data using foreachBatch to send weather data to Event Hub
query = streaming_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()

# Close the producer after termination
producer.close()


Event successfully sent to Event Hub!
Event Sent at 2025-03-21 12:03:31.803130
Event successfully sent to Event Hub!
Event Sent at 2025-03-21 12:04:01.860791
Event successfully sent to Event Hub!
Event Sent at 2025-03-21 12:04:33.009374


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can