In [0]:
# Ler a tabela Delta diretamente pelo catalog.schema.tabela
df_anac = spark.table("cat_ANAC.dn_anac.tabela_anac")

# Mostrar os dados
display(df_anac)


In [0]:
df = df_anac.withColumn(
    "Status_Validade",
    F.when(F.col("DATA_VALIDADE") >= F.current_date(), "Ativo").otherwise("Vencido")
)


In [0]:
display(df)

# DESENVOLVENDO A TABELA ANALÍTICA

| Coluna                      | Tipo      | Descrição                                 |
| --------------------------- | --------- | ----------------------------------------- |
| CODIGO\_AERONAVE            | string    | Código único do drone                     |
| DATA\_VALIDADE              | timestamp | Data de validade do certificado           |
| STATUS\_VALIDADE            | string    | Ativo / Vencido                           |
| OPERADOR                    | string    | Nome do operador                          |
| CPF\_CNPJ                   | string    | Identificador do operador                 |
| TIPO\_USO                   | string    | Uso do drone (recreativo, comercial, etc) |
| FABRICANTE                  | string    | Fabricante do drone                       |
| MODELO                      | string    | Modelo do drone                           |
| NUMERO\_SERIE               | string    | Número de série do drone                  |
| PESO\_MAXIMO\_DECOLAGEM\_KG | double    | Peso máximo de decolagem                  |
| RAMO\_ATIVIDADE             | string    | Atividade principal do operador           |
| TOTAL\_DRONES\_OPERADOR     | long      | Total de drones por operador              |
| TOP\_FABRICANTE\_OPERADOR   | string    | Fabricante mais comum do operador         |
| Ano                         | integer   | Ano da validade                           |
| Mes                         | integer   | Mês da validade                           |
| Dia                         | integer   | Dia da validade                           |


In [0]:
display(df)

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("cat_anac.dn_anac.drones_analitica")


# DESENVOLVIMENTO DA TABELA CONSOLIDADA

| Coluna                  | Tipo    | Descrição                                         |
| ----------------------- | ------- | ------------------------------------------------- |
| OPERADOR                | string  | Nome do operador                                  |
| CPF\_CNPJ               | string  | Identificador do operador                         |
| TOTAL\_DRONES           | long    | Total de drones do operador                       |
| DRONES\_ATIVOS          | long    | Drones com certificado ativo                      |
| DRONES\_VENCIDOS        | long    | Drones com certificado vencido                    |
| PESO\_MAXIMO\_MEDIO\_KG | double  | Peso máximo médio de decolagem                    |
| TOP\_FABRICANTE         | string  | Fabricante mais comum do operador                 |
| TOP\_MODELO             | string  | Modelo mais comum do operador                     |
| TIPO\_USO\_PRINCIPAL    | string  | Tipo de uso mais comum do operador                |
| Ano                     | integer | Ano da validade (opcional, para análise temporal) |
| Mes                     | integer | Mês da validade (opcional, para análise temporal) |


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Agrupar e calcular métricas consolidadas
df_consolidada = df.groupBy("OPERADOR", "CPF_CNPJ").agg(
    F.count("*").alias("TOTAL_DRONES"),
    F.sum(F.when(F.col("STATUS_VALIDADE") == "Ativo", 1).otherwise(0)).alias("DRONES_ATIVOS"),
    F.sum(F.when(F.col("STATUS_VALIDADE") == "Vencido", 1).otherwise(0)).alias("DRONES_VENCIDOS"),
    F.avg("PESO_MAXIMO_DECOLAGEM_KG").alias("PESO_MAXIMO_MEDIO_KG")
)

# Top fabricante e modelo por operador
window_fab = Window.partitionBy("OPERADOR").orderBy(F.desc("count_fab"))
df_fab_count = df.groupBy("OPERADOR", "FABRICANTE").count().withColumnRenamed("count", "count_fab")
df_top_fab = df_fab_count.withColumn("rank_fab", F.row_number().over(window_fab)).filter(F.col("rank_fab") == 1).select("OPERADOR", F.col("FABRICANTE").alias("TOP_FABRICANTE"))

window_model = Window.partitionBy("OPERADOR").orderBy(F.desc("count_model"))
df_model_count = df.groupBy("OPERADOR", "MODELO").count().withColumnRenamed("count", "count_model")
df_top_model = df_model_count.withColumn("rank_model", F.row_number().over(window_model)).filter(F.col("rank_model") == 1).select("OPERADOR", F.col("MODELO").alias("TOP_MODELO"))

# Tipo de uso principal
df_tipouso_count = df.groupBy("OPERADOR", "TIPO_USO").count().withColumnRenamed("count", "count_tipouso")
window_tipo = Window.partitionBy("OPERADOR").orderBy(F.desc("count_tipouso"))
df_top_tipo = df_tipouso_count.withColumn("rank_tipo", F.row_number().over(window_tipo)).filter(F.col("rank_tipo") == 1).select("OPERADOR", F.col("TIPO_USO").alias("TIPO_USO_PRINCIPAL"))

# Juntar métricas agregadas
df_consolidada = df_consolidada.join(df_top_fab, "OPERADOR", "left")\
                               .join(df_top_model, "OPERADOR", "left")\
                               .join(df_top_tipo, "OPERADOR", "left")

# Salvar como Delta
df_consolidada.write.format("delta").mode("overwrite").saveAsTable("cat_anac.dn_anac.drones_consolidada")
