In [0]:
from pyspark.sql.functions import current_timestamp, when, col
import requests
import time
from datetime import datetime, timedelta

# ---------------------------
# CONFIGURATION
# ---------------------------
catalog_prefix = dbutils.widgets.get("catalog_prefix")
catalog_name = f"{catalog_prefix}ridb"
schema_name = "bronze"
table_name = "reservations"
api_key = dbutils.secrets.get(scope = "ridb_secrets", key = "apikey") # replace with your own key
api_url = "https://ridb.recreation.gov/api/v1/reservations"
page_size = 500 # max limit
delay_seconds = 0.1 # rate limit is 50/second

# Rolling 2-day window based on current date
today = datetime.now()
start_date = (today - timedelta(days=1)).strftime("%Y-%m-%d")
end_date = today.strftime("%Y-%m-%d")

# ---------------------------
# CREATE CATALOG & SCHEMA
# ---------------------------
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
spark.sql(f"USE SCHEMA {schema_name}")

# ---------------------------
# FUNCTION TO FETCH A PAGE
# ---------------------------
def get_page(page):
    params = {
        "limit": page_size,
        "page": page,
        "dateFrom": start_date,
        "dateTo": end_date
    }
    headers = {
        "accept": "application/json",
        "apikey": api_key
    }
    response = requests.get(api_url, headers=headers, params=params)
    if response.status_code != 200:
        raise Exception(f"API error: {response.status_code} - {response.text}")
    return response.json().get("data", [])

# ---------------------------
# FETCH ALL RECORDS UNTIL NO MORE DATA
# ---------------------------
all_records = []
page = 0
while True:
    data = get_page(page)
    if not data:
        break
    all_records.extend(data)
    page += 1
    time.sleep(delay_seconds)

# ---------------------------
# CONVERT TO DATAFRAME
# ---------------------------
df = spark.createDataFrame(all_records)

# Replace empty strings with nulls for all string columns
for c in df.columns:
    df = df.withColumn(
        c,
        when(col(c) == "", None).otherwise(col(c))
    )

df = df.withColumn("ingested_at", current_timestamp())

# ---------------------------
# WRITE TO DELTA TABLE (MERGE)
# ---------------------------
table_path = f"{catalog_name}.{schema_name}.{table_name}"

if not spark.catalog.tableExists(table_path):
    df.write.format("delta") \
        .option("delta.enableChangeDataFeed", "true") \
        .saveAsTable(table_path)
else:
    df.createOrReplaceTempView("staging_reservations")
    spark.sql(f"""
        MERGE INTO {table_path} t
        USING staging_reservations s
        ON t.historical_reservation_id = s.historical_reservation_id AND t.order_date = s.order_date
        WHEN NOT MATCHED THEN
          INSERT *
    """)