## 1.0 Bibliotecas

In [0]:
%pip install -q python-dotenv

import os, json, time, datetime as dt
from zoneinfo import ZoneInfo

import requests
from dotenv import dotenv_values
from delta.tables import DeltaTable

from pyspark.sql import Row, functions as F

from datetime import timezone

import types as T

from delta.tables import DeltaTable
from pyspark.sql import functions as F, types as T
import uuid, datetime as dt



## 2.0 Conexão com API

In [0]:
user = spark.sql("SELECT current_user()").first()[0]
env_path = f"/Workspace/Users/{user}/config/config.env"

cfg = dotenv_values(env_path) or {}
API_KEY = cfg.get("OWM_API_KEY") or os.getenv("OWM_API_KEY")

if not API_KEY:
    raise ValueError(f"OWM_API_KEY não encontrada em {env_path}. Verifique o arquivo .env.")

print("Conexão com API OpenWeather ESTABELECIDA")


### 3.1 Cidades alvos


In [0]:
BASE_URL = "https://api.openweathermap.org/data/2.5/weather"

CITIES = [
    "Rio Branco,BR",
    "Maceio,BR",
    "Macapa,BR",
    "Manaus,BR",
    "Salvador,BR",
    "Fortaleza,BR",
    "Brasilia,BR",
    "Vitoria,BR",
    "Goiania,BR",
    "Sao Luis,BR",
    "Cuiaba,BR",
    "Campo Grande,BR",
    "Belo Horizonte,BR",
    "Belem,BR",
    "Joao Pessoa,BR",
    "Curitiba,BR",
    "Recife,BR",
    "Teresina,BR",
    "Rio de Janeiro,BR",
    "Natal,BR",
    "Porto Alegre,BR",
    "Porto Velho,BR",
    "Boa Vista,BR",
    "Florianopolis,BR",
    "Sao Paulo,BR",
    "Aracaju,BR",
    "Palmas,BR"
]


###3.2 Ingestão de dados

In [0]:
agora_utc   = dt.datetime.now(timezone.utc).replace(tzinfo=None, microsecond=0)
agora_local = dt.datetime.now(ZoneInfo("America/Sao_Paulo")).replace(microsecond=0)

ESPERA_BASE_S = 1.0             # pausa entre chamadas (anti rate-limit)
TIMEOUT       = (3, 10)         # connect, read

registros = []

with requests.Session() as sessao:
    sessao.headers.update({"User-Agent": "openweather-ingest/1.0"})
    for cidade in CITIES:
        try:
            resp = sessao.get(
                BASE_URL,
                params={"q": cidade, "appid": API_KEY, "units": "metric", "lang": "pt_br"},
                timeout=TIMEOUT,
            )
            resp.raise_for_status()

            data = resp.json()
            obs_ts_utc = (
                dt.datetime.utcfromtimestamp(int(data["dt"]))
                if isinstance(data.get("dt"), (int, float)) else None
            )

            registros.append({
                "city_id": str(data.get("id") or ""),
                "json_line": json.dumps(data, ensure_ascii=False),
                "obs_ts_utc": obs_ts_utc,
                "ingestion_ts_utc": agora_utc,
                "ingestion_date": agora_local.date(),
            })
            print(f"OK: {cidade}")

        except requests.HTTPError as e:
            codigo = getattr(e.response, "status_code", None)
            print(f"[HTTP {codigo}] {cidade} -> {e}")

        except requests.RequestException as e:
            print(f"[REQ ERROR] {cidade} -> {e}")

        
        time.sleep(ESPERA_BASE_S)

if not registros:
    raise RuntimeError("Nenhum registro coletado.")

df_bronze = spark.createDataFrame(registros)

#### 3.3 Auditoria de dados 


In [0]:
tabela_bronze = "workspace.weather.bronze_openweather_raw"
tabela_logs   = "workspace.weather.ingestion_logs"

# 1) Lote que irá para a tabela bronze, sem nulos e sem duplicatas 

colunas_bronze = ["city_id","json_line","obs_ts_utc","ingestion_ts_utc","ingestion_date"]
df_lote = (df_bronze
           .select(*colunas_bronze)
           .filter(F.col("obs_ts_utc").isNotNull())
           .dropDuplicates(["city_id", "obs_ts_utc"])
)

# 2) Variáveis que serão gravadas na tabela de logs

id_execucao      = str(uuid.uuid4())
inicio_utc       = dt.datetime.now(timezone.utc).replace(microsecond=0)

registros_tentados  = df_bronze.count()
registros_no_lote   = df_lote.count()
cidades_alvo        = len(CITIES)

chaves_destino            = spark.table(tabela_bronze).select("city_id","obs_ts_utc").dropDuplicates()
ja_existentes             = (df_lote.select("city_id","obs_ts_utc")
                                .join(chaves_destino, ["city_id","obs_ts_utc"], "inner")
                                .count())
registros_ja_existentes   = ja_existentes
registros_inseridos       = registros_no_lote - registros_ja_existentes

# 3) Escreve na tabela bronze
status_execucao, mensagem_erro = "SUCCESS", None
try:
    (DeltaTable.forName(spark, tabela_bronze)
       .alias("t")
       .merge(df_lote.alias("s"), "t.city_id = s.city_id AND t.obs_ts_utc = s.obs_ts_utc")
       .whenNotMatchedInsertAll()
       .execute())
except Exception as e:
    status_execucao = "FAIL"
    mensagem_erro   = str(e)

fim_utc          = dt.datetime.now(timezone.utc).replace(microsecond=0)
duracao_segundos = (fim_utc - inicio_utc).total_seconds()
data_ingestao    = df_lote.agg(F.max("ingestion_date")).first()[0]

# 4) Escreve na tabela de logs  
schema_log = T.StructType([
    T.StructField("id_execucao",             T.StringType()),
    T.StructField("data_inicio",             T.TimestampType()),
    T.StructField("data_fim",                T.TimestampType()),
    T.StructField("duracao_segundos",        T.DoubleType()),
    T.StructField("data_ingestao",           T.DateType()),
    T.StructField("cidades_alvo",            T.IntegerType()),
    T.StructField("registros_tentados",      T.IntegerType()),
    T.StructField("registros_no_lote",       T.IntegerType()),
    T.StructField("registros_ja_existentes", T.IntegerType()),
    T.StructField("registros_inseridos",     T.IntegerType()),
    T.StructField("status",                  T.StringType()),
    T.StructField("mensagem_erro",           T.StringType()),
])

linha_log = [(
    id_execucao, inicio_utc, fim_utc, float(duracao_segundos),
    data_ingestao, int(cidades_alvo),
    int(registros_tentados), int(registros_no_lote),
    int(registros_ja_existentes), int(registros_inseridos),
    status_execucao, mensagem_erro
)]

spark.createDataFrame(linha_log, schema=schema_log) \
     .write.format("delta").mode("append").saveAsTable("workspace.weather.ingestion_logs")

In [0]:
display(
  spark.table(tabela_bronze)
       .orderBy(F.col("ingestion_ts_utc").desc())
       .limit(5)
)

display(
  spark.table(tabela_logs)
       .orderBy(F.col("data_inicio").desc())
       .limit(1)
)
