In [0]:
""" Descrição do projeto Gold (Base final para BI/Analytics):
    1- Filtrar Colunas;
    2- Criar novas coluna com somatório de todas as batidas
    3- Renomear colunas para deixar mais próximas da linguagem de negócio
    4- Excluir dados com a classificação = ["Sem Registro","Não Informado"]
    5- Inserir Coluna com data de atualização
    6- Salvar na camada Gold particionada por UF > 'Estado' """

In [0]:
%fs ls /mnt/Anac

In [0]:
%fs ls dbfs:/mnt/ProjetoX/Silver

In [0]:
df = spark.read.parquet("dbfs:/mnt/ProjetoX/Silver/anac_silver.parquet")

In [0]:
# Selecionar apenas as colunas que o cliente pediu: 

#1°: Selecionar somentes colunas que o cliente/setor pediu
colunas = ['Aerodromo_de _Destino', 'Aerodromo_de_Origem', 'Classificacao_da_Ocorrência', 'Danos_a_Aeronave',
'Data_da_Ocorrencia', 'Municipio', 'UF', 'Regiao', 'Tipo_de _Aerodromo', 'Tipo_de_Ocorrencia',
' Lesoes- Desconhecidas _Passageiros', 'Lesoes _Desconhecidas_Terceiros', 'Lesoes_Desconhecidas_Tripulantes',
'Lesoes_Fatais_Passageiros', 'Lesoes_Fatais_Terceiros', 'Lesoes_Fatais_Tripulantes', 'Lesoes_Graves _Passageiros',
'Lesoes_Graves_Terceiros', 'Lesoes_Graves_Tripulantes', 'Lesoes_Leves_Passageiros', 'Lesoes_Leves_Terceiros',
'Lesoes_Leves_Tripulantes ']

df = df. select (colunas)
display (df)

In [0]:
#2°: Criar uma nova coluna que faz a soma de todas as lesoes

colunas_a_somar = [
"Lesoes_Desconhecidas_Passageiros",
"Lesoes_Desconhecidas_Terceiros",
"Lesoes_Desconhecidas_Tripulantes" ,
"Lesoes_Fatais_Passageiros",
"Lesoes_Fatais_Terceiros"
"Lesoes_Fatais_Tripulantes",
"Lesoes_Graves_Passageiros",
"Lesoes _Graves_Terceiros",
"Lesoes _Graves_Tripulantes"
"Lesoes_Leves_Passageiros",
"Lesoes_Leves_Terceiros",
"Lesoes_Leves_Tripulantes"
]

df = df.withColumn("Total_Lesoes", sum(df[somartudo] for somartudo in colunas_a_somar))
                   
display(df)


In [0]:
# 3ª:renomear colunas para ficar mais intuitivas para o usuario final

df = df\
    .withColumnRenamed ('Aerodromo_de_Destino', 'Destino') \
    .WithColumnRenamed ('Aerodromo_de_Origem', 'Origem')\
    .withColumnRenamed ('Classificacao_da_Ocorrência', 'Classificacao')\
    .withColumnRenamed ('Danos_a_Aeronave', 'Danos') \
    .withColumnRenamed ('Data_da_Ocorrencia', 'Data')\
    .withColumnRenamed ('UF', 'Estado')\
    .withColumnRenamed ('Aerodromo_de _Destino'. 'Destino') \
    .withColumnRenamed ('Aerodromo_de_Destino', 'Destino')
display(df)

In [0]:
# 4 Excluir dados de estados que tenham a classificação ['Indeterminado','Sem Registro']
# Usando o símbolo de negação (~)

classificacoes_a_excluir = ['Indeterminado','Sem Registro']
df = df.filter(~df['Estado'].isin(classificados_a_excluir))
display(df)

In [0]:
# 5 inserir coluna com nome de atualização para usuario ver quando os dados foram atualizados

from pyspark.sql. functions import current_timestamp, date_format, from_utc_timestamp
df = df.withColumn ("Atualizacao",\
     date_format( from_utc_timestamp(current_timestamp(), "America/Sao_Paulo"), "yyyy-MM-dd HH: mm: ss"))
display(df)

id,valor,Atualizacao
1,abc,2024-07-06 15: 14: 18
2,def,2024-07-06 15: 14: 18
3,ghi,2024-07-06 15: 14: 18


In [0]:
# 6 Salvar na camada Gold particionada por UF > 'Estado'

df.write\
    .mode("overwrite")\
    .format("parquet")\
    .partitionBy("Estado")\
    .save('dbfs:/mnt/ProjetoX/Gold/anac_gold_particionado')    