
# 01_bronze_api_to_delta_stream

## Purpose
Ingest data from a public HTTP API into a Bronze Delta table
using Spark Structured Streaming on Databricks Free Edition.

## Input
- Source: Public REST API (polled)
- Landing zone: raw layer (file-based ingestion)

## Output
- Bronze Delta table (append-only)

## Guarantees
- At-least-once processing
- Schema enforced at ingestion
- Checkpointed streaming state

## Dependencies
- 00_env_validation_and_config


In [0]:
# -------------------
# Code Configuration
# -------------------

PROJECT_NAME = "streaming_free_dbx"
USER_NAMESPACE = "mallickshashank017@gmail.com"

DATA_ROOT = f"dbfs:/Workspace/Users/{USER_NAMESPACE}/data/{PROJECT_NAME}"

RAW_ROOT = f"{DATA_ROOT}/raw"
CHECKPOINT_ROOT = "{DATA_ROOT}/checkpoint"
BRONZE_ROOT = f"{DATA_ROOT}/bronze"

RAW_ROOT, CHECKPOINT_ROOT, BRONZE_ROOT

In [0]:
# Latitude and Longitude, Weather API

Latitude = 22.5745
Longitude = 88.4713
Location_Name = "Shapoorji New Town, Kolkata"

Weather_API = (
    "https://api.open-meteo.com/v1/forecast?latitude=22.5745&longitude=88.4713&current_weather=true"
)
Weather_API

###Schema Define

In [0]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType, TimestampType

raw_schema = StructType([
    StructField("event_time", TimestampType(), False),  # False means this field is NOT nullable (required)
    StructField("latitude", DoubleType(), False),
    StructField("longitude", DoubleType(), False),
    StructField("Temperature", DoubleType(), True),
    StructField("Windspeed", DoubleType(), True),
    StructField("Winddirection", DoubleType(), True),
    StructField("Weathercode", IntegerType(), True),
    StructField("location_name", StringType(), True),
    StructField("location_time", TimestampType(), True)
])
raw_schema

###API Fetching 

In [0]:
# Testing one value
"""
import requests
from datetime import datetime, timezone
import time

response = requests.get(Weather_API, timeout=10)
response.json()
payload = response.json()
weather = response.json()["current_weather"]
{
    "event_time": weather["time"],
    "latitude": payload["latitude"],
    "longtitude": payload["longitude"],
    "temperature": weather["temperature"],
    "windspeed": weather["windspeed"],
    "winddirection": weather["winddirection"],
    "weathercode": weather["weathercode"],
    "location_name": Location_Name,
    "location_time": datetime.now(timezone.utc)

  }"""

In [0]:
import requests
from datetime import datetime, timezone
import time

def fetch_weather_event() -> dict:
  response = requests.get(Weather_API, timeout=10)
  response.raise_for_status()

  payload = response.json()
  weather = payload["current_weather"]
  return {
    "event_time": weather["time"],
    "latitude": payload["latitude"],
    "longtitude": payload["longitude"],
    "temperature": weather["temperature"],
    "windspeed": weather["windspeed"],
    "winddirection": weather["winddirection"],
    "weathercode": weather["weathercode"],
    "location_name": Location_Name,
    "location_time": datetime.now(timezone.utc)

  }

In [0]:
from pyspark.sql import Row

def write_raw_weather_event():
    event = fetch_weather_event()
    
    df = spark.createDataFrame(
        [Row(**event)],
        schema=raw_schema
    )
    df.write.format("delta").mode("append").save(RAW_ROOT)



In [0]:
import time

frequency = 30

print("Pulling data for each 30 seconds-inverval...")

while True:
    try:
        write_raw_weather_event()
        print("Weather event written to RAW (Delta format)")
    except Exception as e:
        print(f"Error is {e}")
    
    time.sleep(frequency)