In [0]:
from pyspark.sql.functions import *

In [0]:
# Caminho para leitura dos 12 arquivos CSV que guardam os dados de histórico de vôos que foram importados dentro da pasta "voos" em um volume no Catalog do Databricks
path_voos = "dbfs:/Volumes/mvp/staging/voos/*.csv"

# Leitura dos arquivos CSV em spark definindo que a segunda linha é o cabeçalho e que o delimitador de origem é o ";"
df_voos = (
    spark.read
        .option("header", True)
        .option("delimiter", ";")
        .option("skipRows", 1)  # pula a primeira linha do arquivo CSV que é mostra a data de exportação dos dados, o cabeçalho real está na linha 2 de cada tabela
        .csv(path_voos)
)

# Mostra os dados presentes no dataframe criado para verificação
display(df_voos)

ICAO Empresa Aérea,Número Voo,Código Autorização (DI),Código Tipo Linha,ICAO Aeródromo Origem,ICAO Aeródromo Destino,Partida Prevista,Partida Real,Chegada Prevista,Chegada Real,Situação Voo,Código Justificativa
AZU,6039,0,N,SBCT,SBSP,2024-07-31 20:45:00,2024-07-31 20:32:00,2024-07-31 21:50:00,2024-07-31 21:47:00,REALIZADO,
AZU,6040,0,N,SBSP,SBCF,2024-07-01 09:20:00,2024-07-01 09:26:00,2024-07-01 10:35:00,2024-07-01 10:40:00,REALIZADO,
AZU,6040,0,N,SBSP,SBCF,2024-07-02 09:20:00,2024-07-02 09:13:00,2024-07-02 10:35:00,2024-07-02 10:20:00,REALIZADO,
AZU,6040,0,N,SBSP,SBCF,2024-07-04 09:20:00,2024-07-04 09:28:00,2024-07-04 10:35:00,2024-07-04 10:53:00,REALIZADO,
AZU,6040,0,N,SBSP,SBCF,2024-07-05 09:20:00,2024-07-05 09:18:00,2024-07-05 10:35:00,2024-07-05 10:36:00,REALIZADO,
AZU,6040,0,N,SBSP,SBCF,2024-07-08 09:20:00,2024-07-08 09:15:00,2024-07-08 10:35:00,2024-07-08 10:24:00,REALIZADO,
AZU,6040,0,N,SBSP,SBCF,2024-07-09 09:20:00,2024-07-09 09:34:00,2024-07-09 10:35:00,2024-07-09 11:04:00,REALIZADO,
AZU,6040,0,N,SBSP,SBCF,2024-07-11 09:20:00,2024-07-11 09:46:00,2024-07-11 10:35:00,2024-07-11 10:59:00,REALIZADO,
AZU,6040,0,N,SBSP,SBCF,2024-07-12 09:20:00,2024-07-12 09:17:00,2024-07-12 10:35:00,2024-07-12 10:30:00,REALIZADO,
AZU,6040,0,N,SBSP,SBCF,2024-07-15 09:20:00,2024-07-15 09:26:00,2024-07-15 10:35:00,2024-07-15 10:34:00,REALIZADO,


In [0]:
# Contagem do número de linhas para checar se os 12 CSV's foram lidos corretamente
df_voos.count()

987936

In [0]:
# Checagem de nome das colunas para verificar integridade da leitura
print(df_voos.columns)

['ICAO Empresa Aérea', 'Número Voo', 'Código Autorização (DI)', 'Código Tipo Linha', 'ICAO Aeródromo Origem', 'ICAO Aeródromo Destino', 'Partida Prevista', 'Partida Real', 'Chegada Prevista', 'Chegada Real', 'Situação Voo', 'Código Justificativa']


In [0]:
# Padronização do nome das colunas sem espaços e acentos para facilitar a manipulação e permitir a criação da tabela em formato Delta
df_voos = (
    df_voos
    .withColumnRenamed("ICAO Empresa Aérea", "icao_empresa_aerea")
    .withColumnRenamed("Número Voo", "numero_voo")
    .withColumnRenamed("Código Autorização (DI)", "codigo_autorizacao_di")
    .withColumnRenamed("Código Tipo Linha", "codigo_tipo_linha")
    .withColumnRenamed("ICAO Aeródromo Origem", "icao_aerodromo_origem")
    .withColumnRenamed("ICAO Aeródromo Destino", "icao_aerodromo_destino")
    .withColumnRenamed("Partida Prevista", "partida_prevista")
    .withColumnRenamed("Partida Real", "partida_real")
    .withColumnRenamed("Chegada Prevista", "chegada_prevista")
    .withColumnRenamed("Chegada Real", "chegada_real")
    .withColumnRenamed("Situação Voo", "situacao_voo")
    .withColumnRenamed("Código Justificativa", "codigo_justificativa")
)

In [0]:
# Checagem de nome das colunas para verificar integridade da leitura após as alterações
print(df_voos.columns)

['icao_empresa_aerea', 'numero_voo', 'codigo_autorizacao_di', 'codigo_tipo_linha', 'icao_aerodromo_origem', 'icao_aerodromo_destino', 'partida_prevista', 'partida_real', 'chegada_prevista', 'chegada_real', 'situacao_voo', 'codigo_justificativa']


In [0]:
# Criação da tabela Bronze em formato delta para armazenar os dados
(
    df_voos
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("mvp.bronze.voos")
)

In [0]:
# Caminho para leitura do arquivo CSV que guarda os dados de catalogação dos aeroportos públicos brasileiros
# que foram importados dentro da pasta "aeroportos" em um volume no Catalog do Databricks
path_aeroportos = "dbfs:/Volumes/mvp/staging/aeroportos/*.csv"

# Leitura dos arquivos CSV em spark definindo que a segunda linha é o cabeçalho e que o delimitador de origem é o ";"
df_aeroportos = (
    spark.read
        .option("header", True)
        .option("delimiter", ";")
        .option("skipRows", 1)  # pula a primeira linha do arquivo CSV que mostra a data de exportação dos dados, o cabeçalho real está na linha 2 de cada tabela
        .option("encoding", "ISO-8859-1") # Atualização do encoding para que os caracteres especiais sejam interpretados corretamente
        .csv(path_aeroportos)
)

# Mostra os dados presentes no dataframe criado para verificação
display(df_aeroportos)

Código OACI,CIAD,Nome,Município,UF,Município Servido,UF Servido,LATGEOPOINT,LONGEOPOINT,Latitude,Longitude,Altitude,Operação Diurna,Operação Noturna,Situação,Validade do Registro,Portaria de Registro,Link Portaria
SDZG,CE0008,Pedro Teixeira Castelo Regional Tauá,TAUÁ,Ceará,Tauá,Ceará,-5.9333333,-40.2975,"05°56'00""S","040°17'51""W",4440,VFR,VFR,Cadastrado,07/05/2030,PA2020-1189,https://pergamum.anac.gov.br/arquivos/PA2020-1189.pdf
SDIG,SP0038,Ibitinga,IBITINGA,São Paulo,Ibitinga,São Paulo,-21.747222,-48.855833,"21°44'50""S","048°51'21""W",5420,VFR,Sem Operação,Cadastrado,23/04/2030,PA2020-1074,https://pergamum.anac.gov.br/arquivos/PA2020-1074.pdf
SBVT,ES0001,Eurico de Aguiar Salles,VITÓRIA,Espírito Santo,Vitória,Espírito Santo,-20.258056,-40.286389,"20°15'29""S","040°17'11""W",30,VFR / IFR - CAT I,VFR / IFR - CAT I,Cadastrado,,,
SNJK,BA0047,Jequié,JEQUIÉ,Bahia,Jequié,Bahia,-13.877778,-40.071389,"13°52'40""S","040°04'17""W",1970,,,Cadastrado,18/11/2024,PA2014-2695,https://pergamum.anac.gov.br/arquivos/PA2014-2695.pdf
SSKU,SC0016,Lauro Antônio da Costa,CURITIBANOS,Santa Catarina,Curitibanos,Santa Catarina,-27.287222,-50.603889,"27°17'14""S","050°36'14""W",9780,VFR,VFR,Cadastrado,20/05/2030,PA2020-1296,https://pergamum.anac.gov.br/arquivos/PA2020-1296.pdf
SSEP,RS0031,São Sepé,SÃO SEPÉ,Rio Grande do Sul,São Sepé,Rio Grande do Sul,-30.182222,-53.579444,"30°10'56""S","053°34'46""W",1530,,,Cadastrado,13/01/2031,PA2021-3955,https://pergamum.anac.gov.br/arquivos/PA2021-3955.pdf
SJRG,RS0013,Regional de Rio Grande,RIO GRANDE,Rio Grande do Sul,Rio Grande,Rio Grande do Sul,-32.083611,-52.167778,"32°05'01""S","052°10'04""W",60,VFR,VFR,Cadastrado,21/07/2030,PA2022-10.073,https://pergamum.anac.gov.br/arquivos/PA2022-10.073.pdf
SSXX,SC0017,Municipal João Winckler,XANXERÊ,Santa Catarina,Xanxerê,Santa Catarina,-26.875,-52.372778,"26°52'30""S","052°22'22""W",9100,VFR,VFR,Cadastrado,05/04/2033,PA2023-10896,https://pergamum.anac.gov.br/arquivos/PA2023-10896.pdf
SBDO,MS0008,Dourados,DOURADOS,Mato Grosso do Sul,Dourados,Mato Grosso do Sul,-22.200556,-54.925556,"22°12'02""S","054°55'32""W",4580,VFR / IFR Não Precisão,VFR / IFR Não Precisão,Cadastrado,,,
SNGD,PI0005,Guadalupe,GUADALUPE,Piauí,Guadalupe,Piauí,-6.7830556,-43.580833,"06°46'59""S","043°34'51""W",1800,VFR,Sem Operação,Cadastrado,,,


In [0]:
# Padronização do nome das colunas sem espaços e acentos para facilitar a manipulação e permitir a criação da tabela em formato Delta
df_aeroportos = (
    df_aeroportos
    .withColumnRenamed("Código OACI", "icao_aerodromo")
    .withColumnRenamed("CIAD", "codigo_ciad")
    .withColumnRenamed("Nome", "nome_aeroporto")
    .withColumnRenamed("Município", "municipio")
    .withColumnRenamed("UF", "uf")
    .withColumnRenamed("Município Servido", "municipio_servido")
    .withColumnRenamed("UF Servido", "uf_servido")
    .withColumnRenamed("LATGEOPOINT", "lat_geopoint")
    .withColumnRenamed("LONGEOPOINT", "long_geopoint")
    .withColumnRenamed("Latitude", "latitude")
    .withColumnRenamed("Longitude", "longitude")
    .withColumnRenamed("Altitude", "altitude")
    .withColumnRenamed("Operação Diurna", "operacao_diurna")
    .withColumnRenamed("Operação Noturna", "operacao_noturna")
    .withColumnRenamed("Situação", "situacao")
    .withColumnRenamed("Validade do Registro", "validade_registro")
    .withColumnRenamed("Portaria de Registro", "portaria_registro")
    .withColumnRenamed("Link Portaria", "link_portaria")
)

In [0]:
# Checagem de nome das colunas para verificar integridade da leitura após as alterações
print(df_aeroportos.columns)

['icao_aerodromo', 'codigo_ciad', 'nome_aeroporto', 'municipio', 'uf', 'municipio_servido', 'uf_servido', 'lat_geopoint', 'long_geopoint', 'latitude', 'longitude', 'altitude', 'operacao_diurna', 'operacao_noturna', 'situacao', 'validade_registro', 'portaria_registro', 'link_portaria']


In [0]:
# Criação da tabela Bronze em formato delta para armazenar os dados
(
    df_aeroportos
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("mvp.bronze.aeroportos")
)

In [0]:
%sql
-- Consulta para verificação de integridade da tabela bronze criada

SELECT *
FROM mvp.bronze.voos
LIMIT 50;

icao_empresa_aerea,numero_voo,codigo_autorizacao_di,codigo_tipo_linha,icao_aerodromo_origem,icao_aerodromo_destino,partida_prevista,partida_real,chegada_prevista,chegada_real,situacao_voo,codigo_justificativa
AZU,2523,0,N,SBAR,SBCF,2024-03-16 18:10:00,2024-03-16 18:03:00,2024-03-16 20:10:00,2024-03-16 20:01:00,REALIZADO,
AZU,2523,0,N,SBAR,SBCF,2024-03-23 18:10:00,2024-03-23 18:42:00,2024-03-23 20:10:00,2024-03-23 20:44:00,REALIZADO,
AZU,2523,0,N,SBAR,SBCF,2024-03-30 18:10:00,2024-03-30 17:52:00,2024-03-30 20:10:00,2024-03-30 19:51:00,REALIZADO,
AZU,2524,0,N,SBGO,SBMO,2024-03-07 05:50:00,2024-03-07 05:42:00,2024-03-07 08:15:00,2024-03-07 08:08:00,REALIZADO,
AZU,2524,0,N,SBGO,SBMO,2024-03-14 05:40:00,2024-03-14 05:36:00,2024-03-14 08:05:00,2024-03-14 08:03:00,REALIZADO,
AZU,2524,0,N,SBGO,SBMO,2024-03-21 05:40:00,2024-03-21 05:47:00,2024-03-21 08:05:00,2024-03-21 08:14:00,REALIZADO,
AZU,2524,0,N,SBGO,SBMO,2024-03-28 05:40:00,2024-03-28 05:32:00,2024-03-28 08:05:00,2024-03-28 07:56:00,REALIZADO,
AZU,2525,0,N,SBMO,SBGO,2024-03-07 09:00:00,2024-03-07 09:00:00,2024-03-07 11:35:00,2024-03-07 11:25:00,REALIZADO,
AZU,2525,0,N,SBMO,SBGO,2024-03-14 08:50:00,2024-03-14 08:46:00,2024-03-14 11:25:00,2024-03-14 11:10:00,REALIZADO,
AZU,2525,0,N,SBMO,SBGO,2024-03-21 08:50:00,2024-03-21 08:50:00,2024-03-21 11:25:00,2024-03-21 11:13:00,REALIZADO,


In [0]:
%sql
-- Consulta para verificação de integridade da tabela bronze criada

SELECT * 
FROM mvp.bronze.aeroportos 
LIMIT 50;

icao_aerodromo,codigo_ciad,nome_aeroporto,municipio,uf,municipio_servido,uf_servido,lat_geopoint,long_geopoint,latitude,longitude,altitude,operacao_diurna,operacao_noturna,situacao,validade_registro,portaria_registro,link_portaria
SDZG,CE0008,Pedro Teixeira Castelo Regional Tauá,TAUÁ,Ceará,Tauá,Ceará,-5.9333333,-40.2975,"05°56'00""S","040°17'51""W",4440,VFR,VFR,Cadastrado,07/05/2030,PA2020-1189,https://pergamum.anac.gov.br/arquivos/PA2020-1189.pdf
SDIG,SP0038,Ibitinga,IBITINGA,São Paulo,Ibitinga,São Paulo,-21.747222,-48.855833,"21°44'50""S","048°51'21""W",5420,VFR,Sem Operação,Cadastrado,23/04/2030,PA2020-1074,https://pergamum.anac.gov.br/arquivos/PA2020-1074.pdf
SBVT,ES0001,Eurico de Aguiar Salles,VITÓRIA,Espírito Santo,Vitória,Espírito Santo,-20.258056,-40.286389,"20°15'29""S","040°17'11""W",30,VFR / IFR - CAT I,VFR / IFR - CAT I,Cadastrado,,,
SNJK,BA0047,Jequié,JEQUIÉ,Bahia,Jequié,Bahia,-13.877778,-40.071389,"13°52'40""S","040°04'17""W",1970,,,Cadastrado,18/11/2024,PA2014-2695,https://pergamum.anac.gov.br/arquivos/PA2014-2695.pdf
SSKU,SC0016,Lauro Antônio da Costa,CURITIBANOS,Santa Catarina,Curitibanos,Santa Catarina,-27.287222,-50.603889,"27°17'14""S","050°36'14""W",9780,VFR,VFR,Cadastrado,20/05/2030,PA2020-1296,https://pergamum.anac.gov.br/arquivos/PA2020-1296.pdf
SSEP,RS0031,São Sepé,SÃO SEPÉ,Rio Grande do Sul,São Sepé,Rio Grande do Sul,-30.182222,-53.579444,"30°10'56""S","053°34'46""W",1530,,,Cadastrado,13/01/2031,PA2021-3955,https://pergamum.anac.gov.br/arquivos/PA2021-3955.pdf
SJRG,RS0013,Regional de Rio Grande,RIO GRANDE,Rio Grande do Sul,Rio Grande,Rio Grande do Sul,-32.083611,-52.167778,"32°05'01""S","052°10'04""W",60,VFR,VFR,Cadastrado,21/07/2030,PA2022-10.073,https://pergamum.anac.gov.br/arquivos/PA2022-10.073.pdf
SSXX,SC0017,Municipal João Winckler,XANXERÊ,Santa Catarina,Xanxerê,Santa Catarina,-26.875,-52.372778,"26°52'30""S","052°22'22""W",9100,VFR,VFR,Cadastrado,05/04/2033,PA2023-10896,https://pergamum.anac.gov.br/arquivos/PA2023-10896.pdf
SBDO,MS0008,Dourados,DOURADOS,Mato Grosso do Sul,Dourados,Mato Grosso do Sul,-22.200556,-54.925556,"22°12'02""S","054°55'32""W",4580,VFR / IFR Não Precisão,VFR / IFR Não Precisão,Cadastrado,,,
SNGD,PI0005,Guadalupe,GUADALUPE,Piauí,Guadalupe,Piauí,-6.7830556,-43.580833,"06°46'59""S","043°34'51""W",1800,VFR,Sem Operação,Cadastrado,,,


In [0]:
%sql
-- Consulta para verificação de volume por status operacional de voo

SELECT situacao_voo,
COUNT(*) FROM mvp.bronze.voos
GROUP BY situacao_voo;

situacao_voo,COUNT(*)
NÃO INFORMADO,1546
CANCELADO,41329
REALIZADO,945061
