**sending test event to event hub**

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
import json 

#Event hub configuration
event_hub_connection_string = dbutils.secrets.get(scope="key-valut-scope",key="myeventhubconnectionstring")
EVENT_HUB_NAME = "myweatherstreamingeventhub"

#Intialize the event hub producer

producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_string, eventhub_name=EVENT_HUB_NAME
)

#Function to send events to event hub

def send_event(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

#sample json event 
event = {
    "event_id" : 2222,
    "event_name": "key vault test"
}

#send the event 
send_event(event)
#close the producer 
producer.close()


**API** **TESTING**

In [0]:
import requests
import json

#getting a secrect from key vault
weatherapikey = dbutils.secrets.get(scope="key-valut-scope",key="myweatherapikey")
location = "Tirupati"

base_url = "http://api.weatherapi.com/v1"

current_weather_url = f"{base_url}/current.json"

params = {
    "key": weatherapikey,
    "q": location,
}

response = requests.get(current_weather_url, params=params)

if response.status_code == 200:
    current_weather = response.json()
    print("Current Weather:")
    print(json.dumps(current_weather, indent=3))
else:
    print(f"Error: {response.status_code} - {response.text}")


Current Weather:
{
   "location": {
      "name": "Tirupati",
      "region": "Andhra Pradesh",
      "country": "India",
      "lat": 13.65,
      "lon": 79.4167,
      "tz_id": "Asia/Kolkata",
      "localtime_epoch": 1738686639,
      "localtime": "2025-02-04 22:00"
   },
   "current": {
      "last_updated_epoch": 1738686600,
      "last_updated": "2025-02-04 22:00",
      "temp_c": 19.9,
      "temp_f": 67.9,
      "is_day": 0,
      "condition": {
         "text": "Clear",
         "icon": "//cdn.weatherapi.com/weather/64x64/night/113.png",
         "code": 1000
      },
      "wind_mph": 2.2,
      "wind_kph": 3.6,
      "wind_degree": 180,
      "wind_dir": "S",
      "pressure_mb": 1014.0,
      "pressure_in": 29.94,
      "precip_mm": 0.0,
      "precip_in": 0.0,
      "humidity": 67,
      "cloud": 3,
      "feelslike_c": 19.9,
      "feelslike_f": 67.9,
      "windchill_c": 19.9,
      "windchill_f": 67.9,
      "heatindex_c": 19.9,
      "heatindex_f": 67.9,
      "dewpoin

# **Complete code for getting weather data**

In [0]:
import requests
import json

#function to handle API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code} - {response.text}")

#function to get current weather and aqi
def get_current_weather(base_url,api_key,location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes",
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

#function to get forecast data 
def get_forecast_weather(base_url,api_key,location,days):
    forecast_weather_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }
    response = requests.get(forecast_weather_url, params=params)
    return handle_response(response)

#fuction to get alerts 
def get_alerts(base_url,api_key,location):
    alerts_url = f"{base_url}/alerts.json"
    params = { 
        "key": api_key,
        "q": location,
        "alerts": "yes",
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

#flatten and merge the data 
def flatten_and_merge(current_weather,forecast_weather,alerts):
    location_data = current_weather.get("location",{})
    current = current_weather.get("current",{})
    condition = current.get("condition",{})
    air_quality = current.get("air_quality",{})
    forecast = forecast_weather.get("forecast",{}).get("forecastday",[])
    alert_list = alerts.get("alerts", {}).get("alert", [])


    flattend_data = {
        'name':location_data.get("name"),
        "region": location_data.get("region"),
        "country": location_data.get("country"),
        "lat":location_data.get("lat"),
        "lon":location_data.get("lon"),
        "localtime": location_data.get("localtime"),
        "temp_c": current.get("temp_c"),
        "is_day":current.get("is_day"),
        "condition_text": condition.get("text"),
        "condition_icon":condition.get("icon"),
        "wind_kph":current.get("wind_kph"),
        "wind_degree":current.get("wind_degree"),
        "wind_dir":current.get("wind_dir"),
        "pressure_in":current.get("pressure_in"),
        "precip_in":current.get("precip_in"),
        "humidity":current.get("humidity"),
        "cloud":current.get("cloud"),
        "feelslike":current.get("feelslike"),
        "uv":current.get("uv"),
        "air_quality":{
            "co":air_quality.get("co"),
            "no2":air_quality.get("no2"),
            "so2":air_quality.get("so2"),
            "pm2_5":air_quality.get("pm2_5"),
            "pm10":air_quality.get("pm10"),
            "us-epa-index":air_quality.get("us-epa-index"),
            "gp-defra-index":air_quality.get("gp-defra-index")

        },
        "alerts":[
            {
                "headline":alert.get("headline"),
                "severity":alert.get("severity"),
                "description":alert.get("description"),
                "instruction":alert.get("instruction")
            } 
            for alert in alert_list 

        ],
        "forecast":[
            {
                "date":day.get("date"),
                "maxtemp_c":day.get("day",{}).get("maxtemp_c"),
                "mintemp_c":day.get("day",{}).get("mintemp_c"),
                "condition":day.get("day",{}).get("condition",{}).get("text")

            }
            for day in forecast
        ]
    }
    return flattend_data
    

#main function
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1"
    location = "Tirupati"
    weatherapikey = dbutils.secrets.get(scope="key-valut-scope",key="myweatherapikey")

    #get data from api 
    current_weather = get_current_weather(base_url,weatherapikey,location)
    forecast_weather = get_forecast_weather(base_url,weatherapikey,location,3)
    alerts = get_alerts(base_url,weatherapikey,location)

    #flatten and erge data 
    merged_data = flatten_and_merge(current_weather,forecast_weather,alerts)
    print("weather data:",json.dumps(merged_data,indent=3))

#calling main fuction
fetch_weather_data()




weather data: {
   "name": "Tirupati",
   "region": "Andhra Pradesh",
   "country": "India",
   "lat": 13.65,
   "lon": 79.4167,
   "localtime": "2025-02-05 01:50",
   "temp_c": 19.3,
   "is_day": 0,
   "condition_text": "Clear",
   "condition_icon": "//cdn.weatherapi.com/weather/64x64/night/113.png",
   "wind_kph": 5.0,
   "wind_degree": 211,
   "wind_dir": "SSW",
   "pressure_in": 29.89,
   "precip_in": 0.0,
   "humidity": 66,
   "cloud": 2,
   "feelslike": null,
   "uv": 0.0,
   "air_quality": {
      "co": 814.0,
      "no2": 54.39,
      "so2": 15.91,
      "pm2_5": 82.51,
      "pm10": 83.435,
      "us-epa-index": 4,
      "gp-defra-index": null
   },
   "alerts": [],
   "forecast": [
      {
         "date": "2025-02-05",
         "maxtemp_c": 31.1,
         "mintemp_c": 18.1,
         "condition": "Sunny"
      },
      {
         "date": "2025-02-06",
         "maxtemp_c": 31.3,
         "mintemp_c": 17.8,
         "condition": "Sunny"
      },
      {
         "date": "2025-

# Sending complete weather data to event hub 

In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData

#Event hub configuration
event_hub_connection_string = dbutils.secrets.get(scope="key-valut-scope",key="myeventhubconnectionstring")
EVENT_HUB_NAME = "myweatherstreamingeventhub"

#Intialize the event hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_string, eventhub_name=EVENT_HUB_NAME
)

#Fuction to send events to event hub
def send_event(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)


#function to handle API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code} - {response.text}")

#function to get current weather and aqi
def get_current_weather(base_url,api_key,location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes",
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

#function to get forecast data 
def get_forecast_weather(base_url,api_key,location,days):
    forecast_weather_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }
    response = requests.get(forecast_weather_url, params=params)
    return handle_response(response)

#fuction to get alerts 
def get_alerts(base_url,api_key,location):
    alerts_url = f"{base_url}/alerts.json"
    params = { 
        "key": api_key,
        "q": location,
        "alerts": "yes",
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

#flatten and merge the data 
def flatten_and_merge(current_weather,forecast_weather,alerts):
    location_data = current_weather.get("location",{})
    current = current_weather.get("current",{})
    condition = current.get("condition",{})
    air_quality = current.get("air_quality",{})
    forecast = forecast_weather.get("forecast",{}).get("forecastday",[])
    alert_list = alerts.get("alerts",[])

    flattend_data = {
        'name':location_data.get("name"),
        "region": location_data.get("region"),
        "country": location_data.get("country"),
        "lat":location_data.get("lat"),
        "lon":location_data.get("lon"),
        "localtime": location_data.get("localtime"),
        "temp_c": current.get("temp_c"),
        "is_day":current.get("is_day"),
        "condition_text": condition.get("text"),
        "condition_icon":condition.get("icon"),
        "wind_kph":current.get("wind_kph"),
        "wind_degree":current.get("wind_degree"),
        "wind_dir":current.get("wind_dir"),
        "pressure_in":current.get("pressure_in"),
        "precip_in":current.get("precip_in"),
        "humidity":current.get("humidity"),
        "cloud":current.get("cloud"),
        "feelslike":current.get("feelslike"),
        "uv":current.get("uv"),
        "air_quality":{
            "co":air_quality.get("co"),
            "no2":air_quality.get("no2"),
            "so2":air_quality.get("so2"),
            "pm2_5":air_quality.get("pm2_5"),
            "pm10":air_quality.get("pm10"),
            "us-epa-index":air_quality.get("us-epa-index"),
            "gp-defra-index":air_quality.get("gp-defra-index")

        },
        "alerts":[
            {
                "headline":alert.get("headline"),
                "severity":alert.get("severity"),
                "description":alert.get("description"),
                "instruction":alert.get("instruction")
            } 
            for alert in (alert_list if isinstance(alert_list, list) else [])

        ],
        "forecast":[
            {
                "date":day.get("date"),
                "maxtemp_c":day.get("day",{}).get("maxtemp_c"),
                "mintemp_c":day.get("day",{}).get("mintemp_c"),
                "condition":day.get("day",{}).get("condition",{}).get("text")

            }
            for day in forecast
        ]
    }
    return flattend_data
    

#main function
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1"
    location = "Tirupati"
    weatherapikey = dbutils.secrets.get(scope="key-valut-scope",key="myweatherapikey")

    #get data from api 
    current_weather = get_current_weather(base_url,weatherapikey,location)
    forecast_weather = get_forecast_weather(base_url,weatherapikey,location,3)
    alerts = get_alerts(base_url,weatherapikey,location)

    #flatten and erge data 
    merged_data = flatten_and_merge(current_weather,forecast_weather,alerts)

    #sending weather data to event hub
    send_event(merged_data)

#calling main fuction
fetch_weather_data()


sending weather data in streaming way

In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData
from datetime import datetime,timedelta

#Event hub configuration
event_hub_connection_string = dbutils.secrets.get(scope="key-valut-scope",key="myeventhubconnectionstring")
EVENT_HUB_NAME = "myweatherstreamingeventhub"

#Intialize the event hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_string, eventhub_name=EVENT_HUB_NAME
)

#Fuction to send events to event hub
def send_event(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)


#function to handle API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code} - {response.text}")

#function to get current weather and aqi
def get_current_weather(base_url,api_key,location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes",
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

#function to get forecast data 
def get_forecast_weather(base_url,api_key,location,days):
    forecast_weather_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }
    response = requests.get(forecast_weather_url, params=params)
    return handle_response(response)

#fuction to get alerts 
def get_alerts(base_url,api_key,location):
    alerts_url = f"{base_url}/alerts.json"
    params = { 
        "key": api_key,
        "q": location,
        "alerts": "yes",
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

#flatten and merge the data 
def flatten_and_merge(current_weather,forecast_weather,alerts):
    location_data = current_weather.get("location",{})
    current = current_weather.get("current",{})
    condition = current.get("condition",{})
    air_quality = current.get("air_quality",{})
    forecast = forecast_weather.get("forecast",{}).get("forecastday",[])
    alert_list = alerts.get("alerts",[])

    flattend_data = {
        'name':location_data.get("name"),
        "region": location_data.get("region"),
        "country": location_data.get("country"),
        "lat":location_data.get("lat"),
        "lon":location_data.get("lon"),
        "localtime": location_data.get("localtime"),
        "temp_c": current.get("temp_c"),
        "is_day":current.get("is_day"),
        "condition_text": condition.get("text"),
        "condition_icon":condition.get("icon"),
        "wind_kph":current.get("wind_kph"),
        "wind_degree":current.get("wind_degree"),
        "wind_dir":current.get("wind_dir"),
        "pressure_in":current.get("pressure_in"),
        "precip_in":current.get("precip_in"),
        "humidity":current.get("humidity"),
        "cloud":current.get("cloud"),
        "feelslike":current.get("feelslike"),
        "uv":current.get("uv"),
        "air_quality":{
            "co":air_quality.get("co"),
            "no2":air_quality.get("no2"),
            "so2":air_quality.get("so2"),
            "pm2_5":air_quality.get("pm2_5"),
            "pm10":air_quality.get("pm10"),
            "us-epa-index":air_quality.get("us-epa-index"),
            "gp-defra-index":air_quality.get("gp-defra-index")

        },
        "alerts":[
            {
                "headline":alert.get("headline"),
                "severity":alert.get("severity"),
                "description":alert.get("description"),
                "instruction":alert.get("instruction")
            } 
            for alert in (alert_list if isinstance(alert_list, list) else [])

        ],
        "forecast":[
            {
                "date":day.get("date"),
                "maxtemp_c":day.get("day",{}).get("maxtemp_c"),
                "mintemp_c":day.get("day",{}).get("mintemp_c"),
                "condition":day.get("day",{}).get("condition",{}).get("text")

            }
            for day in forecast
        ]
    }
    return flattend_data
    
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1"
    location = "Tirupati"
    weatherapikey = dbutils.secrets.get(scope="key-valut-scope",key="myweatherapikey")

    #get data from api 
    current_weather = get_current_weather(base_url,weatherapikey,location)
    forecast_weather = get_forecast_weather(base_url,weatherapikey,location,3)
    alerts = get_alerts(base_url,weatherapikey,location)

    #flatten and erge data 
    merged_data = flatten_and_merge(current_weather,forecast_weather,alerts)

    return merged_data



last_sent_time = datetime.now()-timedelta(seconds=30)

#main function
def process_batch(batch_df,batch_id):
    global last_sent_time
    try:
        #get current time
        current_time = datetime.now()

        #check if 30 seconds have passed since last event was sent
        if(current_time-last_sent_time).total_seconds()>=30:
            #fetch weather data
            weather_data = fetch_weather_data()

            #sending weather data to event hub
            send_event(weather_data)
            last_sent_time = current_time
            print(f"Event sent at {last_sent_time}")
    except Exception as e:
        print(f"Error sending events in batch {batch_id}: {str(e)}")
        raise e 

#set up a streaming source 
streaming_df = spark.readStream.format("rate").option("rowsPerSecond",1).load()

#write the streaming data using forabatch to send weather data to event hub
query = streaming_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()
#close producer after termination
producer.close()

Event sent at 2025-02-04 17:42:57.100885
Event sent at 2025-02-04 17:43:27.115154


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:717)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can