In [0]:
# ====================================================
# 1. Importando bibliotecas necessárias
# ====================================================
import requests
import datetime
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, when, lit
from pyspark.sql.window import Window

# Criando sessão Spark
spark = SparkSession.builder.appName("SpaceXCollector").getOrCreate()


# ====================================================
# 2. Função genérica para coletar dados da SpaceX
# ====================================================
def get_spacex_data(endpoint):
    """
    Coleta dados de um endpoint da SpaceX API v4.
    Exemplo de endpoint: 'launches', 'rockets', 'ships'.
    """
    base_url = "https://api.spacexdata.com/v4"
    url = f"{base_url}/{endpoint}"

    print(f"📡 Coletando {endpoint} de {url}")
    resp = requests.get(url)

    if resp.status_code != 200:
        print(f"❌ Erro {resp.status_code} ao coletar {endpoint}")
        return []

    data = resp.json()

    # Adiciona metadata de ingestão
    ingestion_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # Se for lista, adiciona ingestion_date em cada item
    if isinstance(data, list):
        for item in data:
            item["ingestion_date"] = ingestion_date
    else:
        # Se for dict único
        data["ingestion_date"] = ingestion_date

    return data


# ====================================================
# 3. Função para normalizar os dados
# ====================================================
def normalize_data(data):
    """
    - Transforma dict/list em string JSON
    - Substitui None por string vazia
    """
    for item in data:
        for key, value in item.items():
            if value is None:
                item[key] = ""  # substitui nulos por vazio
            elif isinstance(value, (dict, list)):
                item[key] = json.dumps(value)  # transforma em string JSON
    return data


# ====================================================
# 4. Bronze – coleta dados crus e armazena
# ====================================================
endpoints = ["launches", "rockets", "ships"]

for ep in endpoints:
    data = get_spacex_data(ep)          # coleta da API
    data = normalize_data(data)         # normalização básica

    if len(data) > 0:
        # Cria DataFrame Spark diretamente da lista de dicionários
        df_bronze = spark.createDataFrame(data)

        # Tratamento de valores nulos no DataFrame
        df_bronze = df_bronze.fillna("")  # substitui nulls por vazio
        df_bronze = df_bronze.replace("null", "")  # substitui texto "null"

        # Salva como Delta no catálogo bronze
        df_bronze.write.format("delta").mode("overwrite").saveAsTable(f"workspace.bronze.spacex_{ep}")
        print(f"✅ Bronze salvo para {ep}")
    else:
        print(f"⚠️ Nenhum dado retornado para {ep}")


# ====================================================
# 5. Silver – limpeza básica (exemplo para lançamentos)
# ====================================================
df_launches = spark.table("workspace.bronze.spacex_launches")

df_silver_launches = (
    df_launches
    .select(
        col("id").alias("launch_id"),
        col("name").alias("mission_name"),
        col("date_utc"),
        col("success"),
        col("rocket"),
        col("ingestion_date")
    )
)

# Deduplicação por ID → mantém apenas o mais recente
windowSpec = Window.partitionBy("launch_id").orderBy(col("ingestion_date").desc())
df_silver_launches = (
    df_silver_launches
    .withColumn("rn", row_number().over(windowSpec))
    .filter(col("rn") == 1)
    .drop("rn")
)

# Tratamento extra: substitui nulos em mission_name por "Desconhecida"
df_silver_launches = df_silver_launches.withColumn(
    "mission_name", when(col("mission_name").isNull() | (col("mission_name") == ""), lit("Desconhecida")).otherwise(col("mission_name"))
)

df_silver_launches.write.format("delta").mode("overwrite").saveAsTable("workspace.silver.spacex_launches")
print("✅ Silver salvo para launches")


# ====================================================
# 6. Gold – enriquecimento (join com rockets)
# ====================================================
df_rockets = spark.table("workspace.bronze.spacex_rockets").select(
    col("id").alias("rocket_id"),
    col("name").alias("rocket_name"),
    col("type").alias("rocket_type"),
    col("active"),
    col("stages")
)

df_gold = (
    df_silver_launches.join(df_rockets, df_silver_launches.rocket == df_rockets.rocket_id, "left")
    .select(
        "launch_id",
        "mission_name",
        "date_utc",
        "success",
        "rocket_name",
        "rocket_type",
        "active",
        "stages",
        "ingestion_date"
    )
)

# Tratamento: preenche rocket_name nulo com "Não informado"
df_gold = df_gold.withColumn(
    "rocket_name", when(col("rocket_name").isNull() | (col("rocket_name") == ""), lit("Não informado")).otherwise(col("rocket_name"))
)

df_gold.write.format("delta").mode("overwrite").saveAsTable("workspace.gold.spacex_launches")
print("✅ Gold salvo para launches")


# ====================================================
# 7. Exemplo de consulta
# ====================================================
display(
    spark.sql("""
    SELECT mission_name, date_utc, rocket_name, success
    FROM workspace.gold.spacex_launches
    ORDER BY date_utc DESC
    LIMIT 10
    """)
)
