In [0]:
# pull the complete response from the api
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData
from datetime import datetime, timedelta

connection_string = dbutils.secrets.get(scope = 'dbx-secret-scope', key= 'connection-string')
event_hub_name = 'weather-llive-stream'

produver = EventHubProducerClient.from_connection_string(conn_str = connection_string, eventhub_name = event_hub_name)

# send the events to event hub as a batch
def send_event(event):
    event_batch = producer.create_batch()
    event_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_batch)

# check if the response is pulled successfully
def handle_resposne(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code} - {response.text}"

# get the current weather from the api
def get_current_weather(base_url,api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {"key": api_key,
              "q": location,
              "aqi": "yes"}
    response = requests.get(current_weather_url, params = params)
    return handle_resposne(response)

# get the forecast from the api
def get_forecast(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {"key": api_key,
              "q": location,
              "days": days}
    response = requests.get(forecast_url, params = params)
    return handle_resposne(response)

# get the alerts information from the api
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {"key": api_key,
              "q": location,
              "alerts": "yes"}
    response = requests.get(alerts_url, params = params)
    return handle_resposne(response)

# combine the three pulled responses together and flatten it
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data

# fetch the weather data
def fetch_weather():
    base_url = 'http://api.weatherapi.com/v1'
    api_key = dbutils.secrets.get(scope = 'dbx-secret-scope', key= 'weather-api-key')
    location = 'Seattle'
    forecast_days = 5
    current_weather = get_current_weather(base_url, api_key, location)
    forecast = get_forecast(base_url, api_key, location, forecast_days)
    alerts = get_alerts(base_url,  api_key, location)

    combined_data = flatten_data(current_weather, forecast, alerts)
    return combined_data
    
last_sent_time = datetime.now() - timedelta(seconds= 30)
def process_batch(batch_df, batch_id):
    global last_sent_time
    current_time = datetime.now()
    if current_time - last_sent_time >= timedelta(seconds=30):
        try:
            event_data = fetch_weather()
            send_event(event_data)
            last_sent_time = current_time
            print(f"Event sent at {current_time}")
        except Exception as e:
            print(f"Error processing batch {batch_id}: {str(e)}")
            raise e

streaming_df = spark.readStream\
    .format("rate")\
        .option("rowsPerSecond", 1)\
            .load()
query = streaming_df.writeStream\
    .foreachBatch(process_batch)\
        .start()
query.awaitTermination()
producer.close()


Event sent at 2025-06-03 03:01:06.370946
Event sent at 2025-06-03 03:01:36.455832
Event sent at 2025-06-03 03:02:07.124899
Event sent at 2025-06-03 03:02:37.447692
Event sent at 2025-06-03 03:03:07.535841
Event sent at 2025-06-03 03:03:38.128646
Event sent at 2025-06-03 03:04:08.273663
Event sent at 2025-06-03 03:04:38.513116
Event sent at 2025-06-03 03:05:09.087690
Event sent at 2025-06-03 03:05:39.494908
Event sent at 2025-06-03 03:06:10.545002
Event sent at 2025-06-03 03:06:41.193312
Event sent at 2025-06-03 03:07:11.363839
Event sent at 2025-06-03 03:07:42.459795
Event sent at 2025-06-03 03:08:13.116504
Event sent at 2025-06-03 03:08:43.492083
Event sent at 2025-06-03 03:09:14.156966
Event sent at 2025-06-03 03:09:44.277035
Event sent at 2025-06-03 03:10:14.510735
Event sent at 2025-06-03 03:10:45.449289
Event sent at 2025-06-03 03:11:15.470311
Event sent at 2025-06-03 03:11:46.504802
Event sent at 2025-06-03 03:12:17.413182
Event sent at 2025-06-03 03:12:47.492848
Event sent at 20

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can