In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
try:
    spark.sql("create catalog streaming;")
except:
    print('check if catalog already exists')

try:
    spark.sql("create schema streaming.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema streaming.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema streaming.gold;")
except:
    print('check if gold schema already exists')
     

check if catalog already exists
check if bronze schema already exists
check if silver schema already exists
check if gold schema already exists


In [0]:
pip install azure-eventhub


Collecting azure-eventhub
  Downloading azure_eventhub-5.14.0-py3-none-any.whl.metadata (70 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/70.6 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [91m━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/70.6 kB[0m [31m1.0 MB/s[0m eta [36m0:00:01[0m
[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━[0m [32m61.4/70.6 kB[0m [31m788.6 kB/s[0m eta [36m0:00:01[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m70.6/70.6 kB[0m [31m758.8 kB/s[0m eta [36m0:00:00[0m
Downloading azure_eventhub-5.14.0-py3-none-any.whl (326 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/326.3 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m112.6/326.3 kB[0m [31m4.3 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━[0m [32m2

In [0]:

import os
import requests
import json
from datetime import datetime, timedelta

# Function to handle response from Api
def handle_response(response):
    if response.status_code == 200:
        current_weather = response.json()
        print(json.dumps(current_weather, indent=3))
        return current_weather
    else:
        print(f"Error: {response.status_code}, {response.text}")
        return None

def get_current_weather(location, Weather_apikey, base_url):
    current_url = f"{base_url}/current.json"
    params = {
        "key": Weather_apikey,
        "q": location,
        "aqi": "yes"
    }
    response = requests.get(current_url, params=params)
    return handle_response(response)

def get_forecast_weather(location, Weather_apikey, base_url, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        "key": Weather_apikey,
        "q": location,
        "days": days
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

def get_alerts(location, Weather_apikey, base_url):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        "key": Weather_apikey,
        "q": location,
        "alerts": "yes"
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_and_merge(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alerts = alerts.get("alerts", {}).get("alert", [])

    flatten_data = {
        'name': location_data.get("name"),
        'country': location_data.get("country"),
        'region': location_data.get("region"),
        'lat': location_data.get("lat"),
        'lon': location_data.get("lon"),
        'localtime': location_data.get("localtime"),
        'temp_c': current.get("temp_c"),
        'is_day': current.get("is_day"),
        'condition': condition.get("text"),
        'condition_icon': condition.get("icon"),
        'wind_kph': current.get("wind_kph"),
        'wind_dir': current.get("wind_dir"),
        'wind_degree': current.get("wind_degree"),
        'pressure_in': current.get("pressure_in"),
        'precip_in': current.get("precip_in"),
        'cloud': current.get("cloud"),
        'humidity': current.get("humidity"),
        'feelslike_c': current.get("feelslike_c"),
        'uv': current.get("uv"),
        'air_quality': {
            'co': air_quality.get("co"),
            'no2': air_quality.get("no2"),
            'o3': air_quality.get("o3"),
            'so2': air_quality.get("so2"),
            'pm2_5': air_quality.get("pm2_5"),
            'pm10': air_quality.get("pm10"),
            'us-epa-index': air_quality.get("us-epa-index"),
            'gb-defra-index': air_quality.get("gb-defra-index")
        },
        'alerts': [
            {
                'headline': alert.get("headline"),
                'description': alert.get("desc"),
                'severity': alert.get("severity"),
                'instruction': alert.get("instruction")
            }
            for alert in alerts
        ],
        'forecast': [
            {
                'date': day.get("date"),
                'maxtemp_c': day.get("day", {}).get("maxtemp_c"),
                'mintemp_c': day.get("day", {}).get("mintemp_c"),
                'condition': day.get("day", {}).get("condition", {}).get("text")
            }
            for day in forecast
        ]
    }
    return flatten_data





In [0]:
import json
import requests
from azure.eventhub import EventHubProducerClient, EventData

# Azure Event Hub Config
EVENT_HUB_CONNECTION_STR = "Endpoint=sb://nishant.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=7m6T7o5c7LmvTHJzVTx1+QCGWsDGjlhJK+AEhNUjsBo="
EVENT_HUB_NAME = "dataproduct "

# Function to send data to Event Hub
def send_to_event_hub(data):
    producer = EventHubProducerClient.from_connection_string(
        conn_str=EVENT_HUB_CONNECTION_STR, eventhub_name=EVENT_HUB_NAME
    )
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(data)))
    producer.send_batch(event_data_batch)
    print("✅ Data sent to Event Hub")
    producer.close()

# Fetch weather data (your existing code)
def fetch_weather():
    base_url = "http://api.weatherapi.com/v1"
    Weather_apikey = "a47546706a994fb68f4103222251502"
    location = "Chennai"

    # Get data from API
    current_weather = get_current_weather(location, Weather_apikey, base_url)
    forecast_weather = get_forecast_weather(location, Weather_apikey, base_url, 3)
    alerts = get_alerts(location, Weather_apikey, base_url)

    # Flatten and merge the data
    flatten_data = flatten_and_merge(current_weather, forecast_weather, alerts)
    return flatten_data
    # Send to Event Hub
    

# Call the function every 5 minutes to stream real-time data

last_sent=datetime.now()-timedelta(minutes=5)

# Main program
def process_batch(batch_df,batch_id):

    print(f"Processing batch {batch_id} with {batch_df.count()} records")
    
    # Show sample data
    batch_df.show(5)
    global last_sent
    try:
        current_time=datetime.now()
        if (current_time-last_sent).seconds>=300:
            weather_data=fetch_weather()
            send_to_event_hub(weather_data)
            last_sent = current_time
        weather_data = fetch_weather()
        enriched_df = batch_df.withColumn("weather_data", lit(json.dumps(weather_data)))
        # enriched_df.write \
        #     .format("delta") \
        #     .mode("append") \
        #     .option("checkpointLocation", "abfss://raw@nishantdev.dfs.core.windows.net/checkpoints/weather/") \
        #     .save("abfss://raw@nishantdev.dfs.core.windows.net/bronze/weather/")
        # Write to Delta Table in ADLS Gen2 with schema merging
        enriched_df.write \
        .format("delta") \
        .mode("append") \
        .option("checkpointLocation", "abfss://raw@nishantdev.dfs.core.windows.net/checkpoints/weather/") \
        .option("mergeSchema", "true") \
        .save("abfss://raw@nishantdev.dfs.core.windows.net/bronze/weather/")


    except Exception as e:
        print(f"Error: {e}")
        raise e
streaming_df=spark.readStream.format("rate").option("rowsPerSecond", 1).load()
streaming_df.writeStream.foreachBatch(process_batch).start()
query=streaming_df.writeStream.foreachBatch(process_batch).start()
query.awaitTermination()
producer.close()


Processing batch 0 with 0 records
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+

Processing batch 0 with 0 records
{
   "location": {
      "name": "Chennai",
      "region": "Tamil Nadu",
      "country": "India",
      "lat": 13.0833,
      "lon": 80.2833,
      "tz_id": "Asia/Kolkata",
      "localtime_epoch": 1739814662,
      "localtime": "2025-02-17 23:21"
   },
   "current": {
      "last_updated_epoch": 1739814300,
      "last_updated": "2025-02-17 23:15",
      "temp_c": 27.0,
      "temp_f": 80.6,
      "is_day": 0,
      "condition": {
         "text": "Mist",
         "icon": "//cdn.weatherapi.com/weather/64x64/night/143.png",
         "code": 1030
      },
      "wind_mph": 6.9,
      "wind_kph": 11.2,
      "wind_degree": 106,
      "wind_dir": "ESE",
      "pressure_mb": 1014.0,
      "pressure_in": 29.94,
      "precip_mm": 0.0,
      "precip_in": 0.0,
      "humidity": 79,
      "cloud": 25,
      "feelslike_c": 29.5,
      "feelslike_f": 85.0

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can