In [0]:
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max as spark_max

# Spark session
spark = SparkSession.builder.getOrCreate()

# JDBC Config
jdbc_url = "jdbc:sqlserver://dataillustrated.database.windows.net:1433;database=dataillustratedsqdatabase"
table_name = "dbo.NationalDebt"
jdbc_properties = {
    "user": "CloudSAdaad1f09",
    "password": "StFitw289@!&",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# 1. Get latest record_date from SQL using Spark JDBC
df_sql = spark.read.jdbc(url=jdbc_url, table=table_name, properties=jdbc_properties)
latest_date_row = df_sql.select(spark_max(col("record_date")).alias("max_date")).collect()[0]
latest_date = latest_date_row["max_date"] if isinstance(latest_date_row["max_date"], datetime) else datetime.strptime(latest_date_row["max_date"], '%Y-%m-%d')

# If table is empty, use a safe fallback
if latest_date is None:
    latest_date = datetime.strptime('2000-01-01', '%Y-%m-%d')
start_date = (latest_date + timedelta(days=1)).strftime('%Y-%m-%d')

# 2. Pull data from API after latest date
base_url = "https://api.fiscaldata.treasury.gov/services/api/fiscal_service/v2/accounting/od/debt_to_penny"
page_size = 100
page_number = 1
all_data = []

while True:
    params = {
        "page[number]": page_number,
        "page[size]": page_size,
        "filter": f"record_date:gte:{start_date}"
    }
    response = requests.get(base_url, params=params)

    if response.status_code != 200:
        print(f"Error fetching data: {response.status_code}")
        break

    records = response.json().get("data", [])
    if not records:
        print("No new records.")
        break

    all_data.extend(records)
    print(f"Fetched page {page_number}, records: {len(records)}")
    page_number += 1

if not all_data:
    print("No new data to append.")
    exit()

# 3. Convert and clean data
df_pd = pd.DataFrame(all_data)

decimal_cols = ["debt_held_public_amt", "intragov_hold_amt", "tot_pub_debt_out_amt"]
int_cols = ["src_line_nbr", "record_fiscal_year", "record_fiscal_quarter", "record_calendar_year", "record_calendar_quarter"]

for col in decimal_cols:
    df_pd[col] = pd.to_numeric(df_pd[col], errors='coerce').replace({np.nan: None})

for col in int_cols:
    df_pd[col] = pd.to_numeric(df_pd[col], errors='coerce').astype('Int64')

# 4. Create Spark DataFrame and append to SQL
df_spark = spark.createDataFrame(df_pd)

df_spark.write \
    .mode("append") \
    .jdbc(url=jdbc_url, table=table_name, properties=jdbc_properties)

print("New records successfully appended.")
