# **Data Ingestion using Azure Databricks** #

## **1. Create Event Hub from Event Hub Namespace** ##

## **2. Create Cluster in Databricks workspace** ##

- Azure Resource Group --> Azure Databricks Service --> Launch Workspace --> Compute --> Create with personal compute.

## **3. Install Event Hub Libraries in Databricks Cluster** ##

- By default the python library for Event Hub is not supported and built in Databricks cluster. Install it from external package (Library)
- Open Databricks cluster --> Libraries --> Install new --> PyPI --> copy-paste package name along with version from the external source ([source](https://pypi.org/project/azure-eventhub/)) --> Install
- **Best practice: Restart the cluster after installing the external library to use it without any issue.

## **4. Create sample event using Databricks notebook and sent to Event Hub** ##

### **4.1. Create notebook in Databricks workspace and attach target cluster to it** ###
- Workspace --> Create --> Notebook --> Connect --> Cluster name

### **4.2. Create and send sample Event to Event Hub using Databricks notebook** ###

In [None]:
# Import Event Hub Libraries
from azure.eventhub import  EventHubProducerClient, EventData
import json

# Event Hub Configuration
EVENT_HUB_CONNECTION_STRING = "connection-string-key-from-keyVault"
EVENT_HUB_NAME = "eventhub-name"

# Initialize the Event Hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)

# Function to send events to Event Hub
def send_event(event):
    event_data_batch =producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Sample JSON event
event = {
    "event_id": 111,
    "event_name": "Test Event",
}  

# Send the Event
send_event(event)

# Close the producer
producer.close()

## **5. Configure Key vault Secrets in Databricks** ##

### **5.1. Create Key Vault SECRET SCOPE** ###
- To establish a connection between **Databricks target workspace** and **Azure Key Vault**, we need to create **Secret scope** in the Databricks workspace based on the official Azure Databricks documentation ([Secret management](https://learn.microsoft.com/en-us/azure/databricks/security/secrets/))
- A **secret scope** is collection of secrets identified by a name. Databricks recommends aligning secret scopes to roles or applications rather than individuals. There are two types of secret scope:
    - **Azure Key Vault-backed:** You can reference secrets stored in an Azure Key Vault using Azure Key Vault-backed secret scopes. Azure Key Vault-backed secret scope is a read-only interface to the Key Vault. You must manage secrets in Azure Key Vault-backed secret scopes in Azure.
    - **Databricks-backed:** A Databricks-backed secret scope is stored in an encrypted database owned and managed by Azure Databricks.

### **5.2. Update connection string to directly get the key from Azure Key Vault** ###
- When you run the notebook, it fails with denied access error message. To solve the issue, assign **"Key vault Secrets user"** role to **AzureDatabricks** service principal.
- An **Azure Databricks application service principal** is an **Azure Entra ID (Azure AD)** identity used by applications, pipelines, or automation to authenticate to Azure Databricks without a human user. It enables secure, non-interactive access for CI/CD, jobs, and integrations (e.g., ADF, Fabric, GitHub Actions). Permissions are granted through **Azure RBAC, Databricks workspace roles**, and **Unity Catalog access**, following least-privilege principles.

In [None]:
# Getting secret value from Key Vault
eventhub_connection_string = dbutils.secrets.get(scope = "key-vault-scope-name", key = "key-vault-secret-name")
EVENT_HUB_NAME = "eventhub-name"

# Initialize the Event Hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=eventhub_connection_string, eventhub_name=EVENT_HUB_NAME)

# Function to send events to Event Hub
def send_event(event):
    event_data_batch =producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Sample JSON event
event = {
    "event_id": 333,
    "event_name": "Key Vault Test",
}  

# Send the Event
send_event(event)

# Close the producer
producer.close()

## **6. Weather API Testing** ##

Refer weather API documentation [API Doc](https://www.weatherapi.com/docs/)
- **Request URL:** Request to WeatherAPI.com API consists of **base url** and **API method**. You can make both HTTP or HTTPS request.
- **Request Parameters:** API Key, Query parameter (q)

In [None]:
# Import libraries
import requests
import json

# Getting secret value from key vault
weatherapikey = dbutils.secrets.get(scope = "key-vault-scope-name", key = "API_key-name_from_Key-Vault")
location = "location-name"

# Base URL from the official documentation
base_url = "http://api.weatherapi.com/v1/"

# Current weather endpoint = f"{base_url}/API-method"
current_weather_url = f"{base_url}/current.json"


# Define the parameters for the API request
params = {
    'key': weatherapikey,
    'q': location,
}

# Make the API request
response = requests.get(current_weather_url, params=params)

# Check if the request was successful
if response.status_code == 200:
    current_weather = response.json()
    print("Current weather:")
    print(json.dumps(current_weather, indent=3))
else:
    print(f"Error: {response.status_code}, {response.text}")

## **7. Developing complete Code for getting Weather Data** ##

In [None]:
import requests
import json

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": 'yes'
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        'key': api_key,
        'q': location,
        "days": days,
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get alerts
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location",{})
    current = current_weather.get("current",{})
    condition = current.get("condition",{})
    air_quality = current.get("air_quality",{})
    forecast = forecast_weather.get("forecast",{}).get("forecastday",[])
    alert_list = alerts.get("alerts",{}).get("alert",[])

    flattened_data = {
        'name': location_data.get("name"),
        'region': location_data.get("region"),
        'country': location_data.get("country"),
        'lat': location_data.get("lat"),
        'lon': location_data.get("lon"),
        'localtime': location_data.get("localtime"),
        'temp_c': current.get("temp_c"),
        'is_day': current.get("is_day"),
        'condition_text': condition.get("text"),
        'condition_icon': condition.get("icon"),
        'wind_kph': current.get("wind_kph"),
        'wind_degree': current.get("wind_degree"),
        'wind_dir': current.get("wind_dir"),
        'pressure_in': current.get("pressure_in"),
        'precip_in': current.get("precip_in"),
        'humidity': current.get("humidity"),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction'),
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day').get('maxtemp_c'),
                'mintemp_c': day.get('day').get('mintemp_c'),
                'condition': day.get('day').get('condition').get('text'),
                'condition_icon': day.get('day').get('condition').get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data
# Main program
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Chennai"
    weatherapikey = dbutils.secrets.get(scope = "<secret-scope>", key = "<kv-api-key>")
    
    # Get data from API
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3)
    alerts = get_alerts(base_url, weatherapikey, location)
    
    # Flatten and merge the data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    print("Weather Data:",json.dumps(merged_data, indent=3))

# Calling the main program
fetch_weather_data()

## **8. Final Data Ingestion from Databricks to Event Hub** ##

- Combine step (5.2) and step (7)
    - configure Event Hub and Databricks workspace connection string.
    - send result to Event Hub as an event instead of printing
- Implementation:
    - add import event hub library 
    - add event hub configuration (connection string, event hub name, producer initialization)
    - add the function to sent event to event hub
    - Modify the code to send the weather data to Event Hub in streaming fashion (not only the latest event)

In [None]:
import requests
import json

# ----------
# add eventhup library to connect to event hub
from azure.eventhub import  EventHubProducerClient, EventData

# Getting secret value from Key Vault
eventhub_connection_string = dbutils.secrets.get(scope = "<key-vault-scope-name>", key = "<key-vault-secret-name>")
EVENT_HUB_NAME = "<event hub name>"

# Initialize the Event Hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=eventhub_connection_string, eventhub_name=EVENT_HUB_NAME)

# Function to send events to Event Hub
def send_event(event):
    event_data_batch =producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# -----------

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": 'yes'
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        'key': api_key,
        'q': location,
        "days": days,
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get alerts
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location",{})
    current = current_weather.get("current",{})
    condition = current.get("condition",{})
    air_quality = current.get("air_quality",{})
    forecast = forecast_weather.get("forecast",{}).get("forecastday",[])
    alert_list = alerts.get("alerts",{}).get("alert",[])

    flattened_data = {
        'name': location_data.get("name"),
        'region': location_data.get("region"),
        'country': location_data.get("country"),
        'lat': location_data.get("lat"),
        'lon': location_data.get("lon"),
        'localtime': location_data.get("localtime"),
        'temp_c': current.get("temp_c"),
        'is_day': current.get("is_day"),
        'condition_text': condition.get("text"),
        'condition_icon': condition.get("icon"),
        'wind_kph': current.get("wind_kph"),
        'wind_degree': current.get("wind_degree"),
        'wind_dir': current.get("wind_dir"),
        'pressure_in': current.get("pressure_in"),
        'precip_in': current.get("precip_in"),
        'humidity': current.get("humidity"),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction'),
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day').get('maxtemp_c'),
                'mintemp_c': day.get('day').get('mintemp_c'),
                'condition': day.get('day').get('condition').get('text'),
                'condition_icon': day.get('day').get('condition').get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data
    
# Main program
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Chennai"
    weatherapikey = dbutils.secrets.get(scope = "key-vault-scope", key = "weatherapikey")
    
    # Get data from API
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3)
    alerts = get_alerts(base_url, weatherapikey, location)
    
    # Flatten and merge the data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    #print("Weather Data:",json.dumps(merged_data, indent=3))

    # -------
    # Sending the weather data to Event Hub
    send_event(merged_data)
    # -------

# Calling the main program
fetch_weather_data()

## **Sending the complete weather data to the Event Hub in Streaming fashion** ##

In [None]:
import requests
import json

# ----------
# add eventhup library to connect to event hub
from azure.eventhub import  EventHubProducerClient, EventData

# Getting secret value from Key Vault
eventhub_connection_string = dbutils.secrets.get(scope = "key-vault-scope-name", key = "<Key-Vault-EventHub-conn-str>")
EVENT_HUB_NAME = "<Event-Hub-Name>"

# Initialize the Event Hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=eventhub_connection_string, eventhub_name=EVENT_HUB_NAME)

# Function to send events to Event Hub
def send_event(event):
    event_data_batch =producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# -----------

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": 'yes'
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        'key': api_key,
        'q': location,
        "days": days,
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get alerts
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location",{})
    current = current_weather.get("current",{})
    condition = current.get("condition",{})
    air_quality = current.get("air_quality",{})
    forecast = forecast_weather.get("forecast",{}).get("forecastday",[])
    alert_list = alerts.get("alerts",{}).get("alert",[])

    flattened_data = {
        'name': location_data.get("name"),
        'region': location_data.get("region"),
        'country': location_data.get("country"),
        'lat': location_data.get("lat"),
        'lon': location_data.get("lon"),
        'localtime': location_data.get("localtime"),
        'temp_c': current.get("temp_c"),
        'is_day': current.get("is_day"),
        'condition_text': condition.get("text"),
        'condition_icon': condition.get("icon"),
        'wind_kph': current.get("wind_kph"),
        'wind_degree': current.get("wind_degree"),
        'wind_dir': current.get("wind_dir"),
        'pressure_in': current.get("pressure_in"),
        'precip_in': current.get("precip_in"),
        'humidity': current.get("humidity"),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction'),
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day').get('maxtemp_c'),
                'mintemp_c': day.get('day').get('mintemp_c'),
                'condition': day.get('day').get('condition').get('text'),
                'condition_icon': day.get('day').get('condition').get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data


def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "<location>"
    weatherapikey = dbutils.secrets.get(scope = "<key-Vault-Secret-scope>", key = "<Key-Vault-API-Name>")
    
    # Get data from API
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3)
    alerts = get_alerts(base_url, weatherapikey, location)
    
    # Flatten and merge the data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    return merged_data

# Main program
def process_df(batch_df, batch_id):
    try:
        # Fetch weather data
        weather_data = fetch_weather_data()

        # Send weather data (current weather part)
        send_event(weather_data)
    except Exception as e:
        print(f"Error sending events in batch {batch_id}: {str(e)}")
        raise e

# Set up a streaming source (for example, rate source for testing purpose)
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Process the data
query = streaming_df.writeStream.foreachBatch(process_df).start()

query.awaitTermination()

# Close the producer after termination
producer.close()