In [0]:
%sql
USE CATALOG payroll;

CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;


In [0]:
import requests
import json
from pyspark.sql.functions import col, current_timestamp, to_date
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

BLS_SERIES_ID = 'CES0000000001' 
BLS_API_URL = "https://api.bls.gov/publicAPI/v2/timeseries/data/"

headers = {'Content-type': 'application/json'}
data = json.dumps({
    "seriesid": [BLS_SERIES_ID],
    "startyear": "2020",
    "endyear": "2025" 
})

registros = []
schema = StructType([
    StructField("data_referencia_str", StringType(), True),
    StructField("variacao_liquida_payroll_milhares", DoubleType(), True)
])

try:
    response = requests.post(BLS_API_URL, headers=headers, data=data)
    response.raise_for_status()
    api_data = response.json()
    
    if api_data and api_data.get('status') == 'REQUEST_SUCCEEDED':
        series_data = api_data['Results']['series'][0]['data']
        
        for item in series_data:
            year = item['year']
            period = item['period']
            value = item['value']
            
            month = period.replace('M', '')
            date_ref_str = f"{year}-{month}-01"
            
            if value is not None:
                registros.append((date_ref_str, float(value)))

    df_bronze = spark.createDataFrame(registros, schema=schema)
    
except Exception as e:
       df_bronze = spark.createDataFrame([], schema)

if df_bronze.count() > 0:
    df_bronze.withColumn("timestamp_ingestao", current_timestamp()) \
          .write.format("delta") \
          .mode("overwrite") \
          .option("mergeSchema", "true") \
          .saveAsTable("payroll.bronze.eua_payroll_raw")
    print("Sucesso! Tabela BRONZE populada com dados completos da API.")
else:
    print("ðŸš¨ ERRO DE INGESTÃƒO: A API nÃ£o retornou dados. Verifique a chave ou o URL.")
