This notebook represents a PoC for a Data Pipeline with Ingestion, Storage, Processing, Analysis, Exploration and Visualization.

We use Databricks (with PySpark), Azure Storage Account and Unity Catalog with External locations for this purpose.

Practices: DRY, KISS, Clean Code Principles and Error Handling.
Paradigm: Basic functional programming

Pipeline specifications
1. Process last 7-days data into the database table(s) when running the pipeline for the first time. Consequent runs must insert only the new records. **This is not possible due to outdated data - In a first exploration I realized that the data is too old; thus, it is not possible to process only last 7 days. Also considering the low volumne of the data, full ingest will be made.**
2. Re-running the pipeline multiple times, must not result in duplicate records in the table. **Ok**
3. Feel free to assume other details as required. Document your assumptions. **All assumptions will be commented on the code and if necessary, in a text block of the notebook**

In terms of doing incremental data, minor changes would be needed to use a temporary table that can be compared with the final one.

# Ingestion: Fetch Weather Data for a Single Station

## Definitions

In [0]:
import requests
import json
from pyspark.sql import SparkSession

def fetch_raw_weather_data(station_id: str, observations_url_template: str) -> dict:
    """Fetch raw weather data for a specific station and return it as raw JSON."""
    observations_url = observations_url_template.format(station_id=station_id)
    response = requests.get(observations_url)
    response.raise_for_status()
    return response.json()

def save_raw_data_to_storage(spark: SparkSession, raw_data: dict, raw_storage_path: str) -> None:
    """Ingest raw JSON data into the storage system in Delta format without any transformation."""
    # Convert the raw JSON data into a JSON string format suitable for Delta Lake
    raw_json_str = json.dumps(raw_data)
    
    # Create a DataFrame from the JSON string
    raw_df = spark.createDataFrame([(raw_json_str,)], ["value"])
    
    # Save the raw data to the specified storage path (Delta or raw container)
    raw_df.write.format("delta").mode("overwrite").save(raw_storage_path)

## Usage

In [0]:
# Set station ID and API URL template
station_id = "0112W"
observations_url_template = "https://api.weather.gov/stations/{station_id}/observations"

# Fetch the raw weather data (without any extraction or transformation)
raw_weather_data = fetch_raw_weather_data(station_id, observations_url_template)

# Save the raw JSON data to storage in Delta format
raw_storage_path = "abfss://raw@cs2100320032141b0ad.dfs.core.windows.net/weather_observations_raw"
save_raw_data_to_storage(spark, raw_weather_data, raw_storage_path)

# Storage: Load Data into Delta Table with External Location

## Definitions

In [0]:
from pyspark.sql import SparkSession

def create_external_table_if_not_exists(spark: SparkSession, table_name: str, delta_storage_path: str) -> None:
    """Create an external Delta table in Unity Catalog if it does not already exist."""
    spark.sql(f"""
        CREATE EXTERNAL TABLE IF NOT EXISTS {table_name}
        USING DELTA
        LOCATION '{delta_storage_path}'
    """)

def save_raw_data_to_unity_table(spark: SparkSession, raw_data: dict, table_name: str, delta_storage_path: str) -> None:
    """Save the ingested raw JSON data to Unity Catalog in the specified Delta table."""

    create_external_table_if_not_exists(spark, table_name, delta_storage_path)

## Usage

In [0]:
# Define the table name and Delta storage path
table_name = "unity_catalog.lab.weather_observations_raw"
delta_storage_path = "abfss://raw@cs2100320032141b0ad.dfs.core.windows.net/weather_observations_raw"

# Fetch the raw weather data (this can be from the previous ingestion stage)
raw_weather_data = fetch_raw_weather_data(station_id, observations_url_template)

# Save the raw data to Unity Catalog as a Delta table
save_raw_data_to_unity_table(spark, raw_weather_data, table_name, delta_storage_path)

# Processing: Preparing, cleaning and structuring the data

In [0]:
%sql
DROP TABLE IF EXISTS unity_catalog.lab.weather_observations_cleaned;

In [0]:
%sql
CREATE OR REPLACE TABLE unity_catalog.lab.weather_observations_cleaned
LOCATION 'abfss://staging@cs2100320032141b0ad.dfs.core.windows.net/weather_observations_cleaned'
AS
WITH 
parsed AS (
  SELECT
    feature.properties.station AS station_id,
    feature.id AS station_name,
    feature.geometry.coordinates[0] AS longitude,
    feature.geometry.coordinates[1] AS latitude,
    feature.properties.timestamp AS observation_timestamp,
    'UTC' AS station_timezone,  -- Assuming UTC
    round(cast(feature.properties.temperature.value AS double), 2) AS temperature,
    round(cast(feature.properties.windSpeed.value AS double), 2) AS wind_speed,
    round(cast(feature.properties.relativeHumidity.value AS double), 2) AS humidity
  FROM unity_catalog.lab.weather_observations_raw
  LATERAL VIEW EXPLODE(from_json(get_json_object(value, '$.features'), 
    'ARRAY<STRUCT<
      id: STRING, 
      geometry: STRUCT<coordinates: ARRAY<DOUBLE>>, 
      properties: STRUCT<
        station: STRING, 
        timestamp: STRING, 
        temperature: STRUCT<value: DOUBLE>, 
        windSpeed: STRUCT<value: DOUBLE>, 
        relativeHumidity: STRUCT<value: DOUBLE>
      >
    >>')) AS feature
)
SELECT * FROM parsed
WHERE station_id IS NOT NULL;

# Exploration: Know the data

In [0]:
%sql
SELECT * FROM unity_catalog.lab.weather_observations_cleaned;

In [0]:
%sql
-- Check duplications
SELECT DISTINCT * FROM unity_catalog.lab.weather_observations_cleaned;

# Analysis: Required metrics in SQL

## Average observed temperature for the last week (Monday-Sunday)
Note: as the data is outdated, the date filter was commented to explore the analyzed data

In [0]:
%sql
CREATE OR REPLACE TABLE unity_catalog.lab.weather_observations_avg
LOCATION 'abfss://curated@cs2100320032141b0ad.dfs.core.windows.net/weather_observations_avg'
AS
SELECT AVG(temperature) AS avg_temperature_last_week
FROM unity_catalog.lab.weather_observations_cleaned
/*WHERE observation_timestamp >= DATEADD(day, -7, CURRENT_DATE) -- Last 7 days from today
  AND observation_timestamp <= CURRENT_DATE;*/

In [0]:
%sql
CREATE OR REPLACE TABLE unity_catalog.lab.weather_observations_max_speed
LOCATION 'abfss://curated@cs2100320032141b0ad.dfs.core.windows.net/weather_observations_max_speed'
AS
WITH 
ranked_observations AS (
  SELECT 
    station_id, 
    observation_timestamp, 
    wind_speed, 
    ROW_NUMBER() OVER (PARTITION BY station_id ORDER BY observation_timestamp) AS row_num
  FROM unity_catalog.lab.weather_observations_cleaned
  /*WHERE observation_timestamp >= DATEADD(day, -7, CURRENT_DATE)
    AND observation_timestamp <= CURRENT_DATE*/
),
wind_speed_changes AS (
  SELECT 
    a.station_id,
    a.observation_timestamp,
    ABS(a.wind_speed - b.wind_speed) AS wind_speed_change
  FROM ranked_observations a
  INNER JOIN ranked_observations b
    ON a.station_id = b.station_id
      AND a.row_num = b.row_num + 1 -- Consecutive observations
)

SELECT MAX(wind_speed_change) AS max_wind_speed_change
FROM wind_speed_changes;

# Visualization: Databricks visualizations to show Curated Data

## Note: Select visualization tab for each code block in this section.

In [0]:
%sql
select * from unity_catalog.lab.weather_observations_avg;

Databricks visualization. Run in Databricks to view.

In [0]:
%sql
select * from unity_catalog.lab.weather_observations_max_speed

Databricks visualization. Run in Databricks to view.