In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=619ac023212a6eb7ba7b4edb3e590c5123164c725852baa9f54326de1b8d97b5
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [35]:
import os
import sys
import re
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [4]:
# Inicializa SparkSession
spark = SparkSession.builder.appName("create_temp_views").getOrCreate()

# Leitura dos parquets que sao resultados das tasks anteriores
df_vra = spark.read.parquet("/content/VRA")
df_air_cia = spark.read.parquet("/content/AIR_CIA")
df_api = spark.read.parquet("/content/API")

In [5]:
#Vizualizacao dos Schemas
#pode ajudar em momentos de "type mismatch"
print("Schema do dataframe VRA: ")
df_vra.printSchema()
print("\n Schema do dataframe AIR_CIA: ")
df_air_cia.printSchema()
print("\n Schema do dataframe que possui dados extraidos da API: ")
df_api.printSchema()

Schema do dataframe VRA: 
root
 |-- chegada_prevista: timestamp (nullable = true)
 |-- chegada_real: timestamp (nullable = true)
 |-- codigo_autorizacao: string (nullable = true)
 |-- codigo_justificativa: string (nullable = true)
 |-- codigo_tipo_linha: string (nullable = true)
 |-- icao_aerodromo_destino: string (nullable = true)
 |-- icao_aerodromo_origem: string (nullable = true)
 |-- icao_empresa_aerea: string (nullable = true)
 |-- numero_voo: integer (nullable = true)
 |-- partida_prevista: timestamp (nullable = true)
 |-- partida_real: timestamp (nullable = true)
 |-- situacao_voo: string (nullable = true)


 Schema do dataframe AIR_CIA: 
root
 |-- razao_social: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- cnpj: string (nullable = true)
 |-- atividades_aereas: string (nullable = true)
 |-- endereco_sede: string (nullable = true)
 |-- telefone: string (nullable = true)
 |-- e_mail: string (nullable = true)
 |-- decisao_ope

In [6]:
# vizualizacao dos preview dataframes para ver se precisamos de algum tratamento previo
print("Preview do dataframe VRA: ")
df_vra.show(5)
print("\n Preview do dataframe AIR_CIA: ")
df_air_cia.show(5)
print("\n Preview do dataframe que possui dados extraidos da API: ")
df_api.show(5)

Preview do dataframe VRA: 
+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+------------+
|   chegada_prevista|       chegada_real|codigo_autorizacao|codigo_justificativa|codigo_tipo_linha|icao_aerodromo_destino|icao_aerodromo_origem|icao_empresa_aerea|numero_voo|   partida_prevista|       partida_real|situacao_voo|
+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+------------+
|2021-11-12 08:30:00|2021-11-12 08:30:00|                 0|                 N/A|                X|                  KORD|                 SBGR|               UAL|       844|2021-11-11 22:00:00|2021-11-11 22:14:00|   REALIZADO|
|2021-11-15 08:30:00|2021-11-15 08:30:00|                 0| 

# Notar que as colunas icao e iata oriundas da df_air_cia podem conter erros,
# desde a origem antes do split, "icao" tem apenas 3 caracteres isso pode
# interferir na hora de montar as querys

In [7]:
#Criacao das TempViews:

df_vra.createOrReplaceTempView("vra")
df_air_cia.createOrReplaceTempView("air_cia")
df_api.createOrReplaceTempView("api")

In [60]:
# caso alguma linha ainda tenha subido com valor NULL:

df_vra = df_vra.fillna('')
df_air_cia = df_air_cia.fillna('')
df_api = df_api.fillna('')


#Perguntas área de negócios:

##1. Para cada companhia aérea trazer a rota mais utilizada com as seguintes informações:

-Razão social da companhia aérea;

-Nome Aeroporto de Origem;

-ICAO do aeroporto de origem;

-Estado/UF do aeroporto de origem;

-Nome do Aeroporto de Destino;

-ICAO do Aeroporto de destino;

-Estado/UF do aeroporto de destino.

In [61]:
#Esta query retorna a lista porem sem código 'icao_empresa' unico no dataframe
#ou seja nao trás o rankeamento que foi feito na query seguinte

df_rota_mais_utilizada = spark.sql("""

WITH RotasMaisUtilizadas AS (
        SELECT
            vo.icao_empresa_aerea AS icao_empresa_aerea,
            vo.icao_aerodromo_origem AS icao_origem,
            vo.icao_aerodromo_destino AS icao_destino,
            COUNT(*) AS total_rotas
        FROM vra vo
        GROUP BY vo.icao_empresa_aerea, vo.icao_aerodromo_origem, vo.icao_aerodromo_destino
    ),
    RotasMaisUtilizadasRank AS (
        SELECT
            icao_empresa_aerea,
            icao_origem,
            icao_destino,
            total_rotas,
            ROW_NUMBER() OVER (PARTITION BY icao_empresa_aerea ORDER BY total_rotas DESC) AS rank
        FROM RotasMaisUtilizadas
    )
    SELECT
        ac.razao_social AS razao_social_companhia,
        r.icao_empresa_aerea,
        vo.icao_aerodromo_origem AS icao_origem,
        ap_origem.state AS estado_origem,
        vo.icao_aerodromo_destino AS icao_destino,
        ap_destino.state AS estado_destino,
        ap_origem.name AS nome_aeroporto_origem,
        ap_destino.name AS nome_aeroporto_destino
    FROM RotasMaisUtilizadasRank r
    JOIN vra vo ON r.icao_empresa_aerea = vo.icao_empresa_aerea
    JOIN air_cia ac ON r.icao_empresa_aerea = ac.icao
    JOIN api ap_origem ON r.icao_origem = ap_origem.icao
    JOIN api ap_destino ON r.icao_destino = ap_destino.icao
    WHERE r.rank = 1

""")
df_rota_mais_utilizada.createOrReplaceTempView("rota_mais_utilizada")

In [62]:
df_rota_mais_utilizada.distinct().show(truncate=False)

+---------------------------------------------------+------------------+-----------+--------------+------------+----------------+-----------------------------------------+-----------------------------------------------------------------------+
|razao_social_companhia                             |icao_empresa_aerea|icao_origem|estado_origem |icao_destino|estado_destino  |nome_aeroporto_origem                    |nome_aeroporto_destino                                                 |
+---------------------------------------------------+------------------+-----------+--------------+------------+----------------+-----------------------------------------+-----------------------------------------------------------------------+
|TAM LINHAS AÉREAS S.A.                             |TAM               |SBGO       |São Paulo     |SBBR        |Rio de Janeiro  |São Paulo–Congonhas Airport              |Santos Dumont Airport                                                  |
|TAM LINHAS AÉREAS S.A. 

#Agrupando para ter a rota mais frequente por icao_empresa sem repetir a empresa:

In [63]:

# Consulta para trazer o registro mais frequente por "icao_empresa_aerea" com código distinto
df_rota_mais_utilizada_por_empresa = spark.sql( """
    WITH RotasMaisUtilizadas AS (
        SELECT
            vo.icao_empresa_aerea AS icao_empresa_aerea,
            vo.icao_aerodromo_origem AS icao_origem,
            vo.icao_aerodromo_destino AS icao_destino,
            COUNT(*) AS total_rotas
        FROM vra vo
        GROUP BY vo.icao_empresa_aerea, vo.icao_aerodromo_origem, vo.icao_aerodromo_destino
    ),
    RotasMaisUtilizadasRank AS (
        SELECT
            r.*,
            ROW_NUMBER() OVER (PARTITION BY r.icao_empresa_aerea ORDER BY r.total_rotas DESC) AS rank
        FROM RotasMaisUtilizadas r
    )
    SELECT DISTINCT
        ac.razao_social AS razao_social_companhia,
        r.icao_empresa_aerea,
        r.icao_origem,
        ap_origem.state AS estado_origem,
        r.icao_destino,
        ap_destino.state AS estado_destino,
        ap_origem.name AS nome_aeroporto_origem,
        ap_destino.name AS nome_aeroporto_destino
    FROM RotasMaisUtilizadasRank r
    JOIN rota_mais_utilizada rota ON r.icao_empresa_aerea = rota.icao_empresa_aerea
    JOIN air_cia ac ON r.icao_empresa_aerea = ac.icao
    JOIN api ap_origem ON r.icao_origem = ap_origem.icao
    JOIN api ap_destino ON r.icao_destino = ap_destino.icao
    WHERE r.rank = 1
""")

# Exibir o resultado
df_rota_mais_utilizada_por_empresa.show(truncate=False)

+------------------------------------------------------------------+------------------+-----------+-----------------+------------+--------------------+-----------------------------------------+-----------------------------------------------------------------------+
|razao_social_companhia                                            |icao_empresa_aerea|icao_origem|estado_origem    |icao_destino|estado_destino      |nome_aeroporto_origem                    |nome_aeroporto_destino                                                 |
+------------------------------------------------------------------+------------------+-----------+-----------------+------------+--------------------+-----------------------------------------+-----------------------------------------------------------------------+
|AZUL CONECTA LTDA. (EX TWO TAXI AEREO LTDA)                       |ACN               |SWKO       |State of Amazonas|SBEG        |Amazonas            |Coari Airport                            |Eduardo G

#2. Para cada aeroporto trazer a companhia aérea com maior atuação no ano com as seguintes informações:
-Nome do Aeroporto;

-ICAO do Aeroporto;

-Razão social da Companhia Aérea;

-Quantidade de Rotas à partir daquele aeroporto;

-Quantidade de Rotas com destino àquele aeroporto;

-Quantidade total de pousos e decolagens naquele aeroporto.

In [66]:
# CTE para calcular o numero de rotas por linha área de cada aeroporto
df_rotas_saindo = spark.sql("""
    WITH RotasSaindo AS (
        SELECT
            ap.icao AS icao_aeroporto,
            vo.icao_empresa_aerea AS icao_empresa_aerea,
            COUNT(*) AS qtd_rotas_saindo
        FROM vra vo
        INNER JOIN api ap ON vo.icao_aerodromo_origem = ap.icao
        GROUP BY ap.icao, vo.icao_empresa_aerea
    )
    SELECT
        *,
        ROW_NUMBER() OVER (PARTITION BY icao_aeroporto ORDER BY qtd_rotas_saindo DESC) AS rank_saindo
    FROM RotasSaindo
""")
df_rotas_saindo.createOrReplaceTempView("rotas_saindo")

# CTE para calcular o numero de rotas para cada companhia aera
# chegando em cada aeroporto

df_rotas_chegando = spark.sql("""
    WITH RotasChegando AS (
        SELECT
            ap.icao AS icao_aeroporto,
            vo.icao_empresa_aerea AS icao_empresa_aerea,
            COUNT(*) AS qtd_rotas_chegando
        FROM vra vo
        INNER JOIN api ap ON vo.icao_aerodromo_destino = ap.icao
        GROUP BY ap.icao, vo.icao_empresa_aerea
    )
    SELECT
        *,
        ROW_NUMBER() OVER (PARTITION BY icao_aeroporto ORDER BY qtd_rotas_chegando DESC) AS rank_chegando
    FROM RotasChegando
""")
df_rotas_chegando.createOrReplaceTempView("rotas_chegando")

# CTE para calcular o total de pousos e decolagens por aeroporto

df_pousos_decolagens = spark.sql("""
    WITH PousosDecolagens AS (
        SELECT
            ap.icao AS icao_aeroporto,
            COUNT(*) AS qtd_pousos_decolagens
        FROM vra vo
        INNER JOIN api ap ON vo.icao_aerodromo_destino = ap.icao OR vo.icao_aerodromo_origem = ap.icao
        GROUP BY ap.icao
    ),
    RotasSaindo AS (
        SELECT
            ap_saida.icao AS icao_aeroporto,
            COUNT(DISTINCT vo_saida.numero_voo) AS qtd_rotas_saindo
        FROM vra vo_saida
        INNER JOIN api ap_saida ON vo_saida.icao_aerodromo_origem = ap_saida.icao
        GROUP BY ap_saida.icao
    ),
    RotasEntrando AS (
        SELECT
            ap_entrada.icao AS icao_aeroporto,
            COUNT(DISTINCT vo_entrada.numero_voo) AS qtd_rotas_entrando
        FROM vra vo_entrada
        INNER JOIN api ap_entrada ON vo_entrada.icao_aerodromo_destino = ap_entrada.icao
        GROUP BY ap_entrada.icao
    )
    SELECT
        pd.icao_aeroporto,
        pd.qtd_pousos_decolagens,
        COALESCE(rs.qtd_rotas_saindo, 0) AS qtd_rotas_saindo,
        COALESCE(re.qtd_rotas_entrando, 0) AS qtd_rotas_entrando
    FROM PousosDecolagens pd
    LEFT JOIN RotasSaindo rs ON pd.icao_aeroporto = rs.icao_aeroporto
    LEFT JOIN RotasEntrando re ON pd.icao_aeroporto = re.icao_aeroporto
    ORDER BY pd.qtd_pousos_decolagens DESC
""")
df_pousos_decolagens.createOrReplaceTempView("pousos_decolagens")



# Combinando as CTE's para responder as perguntas de negócio da task 2:
df_resultado = spark.sql("""
    SELECT DISTINCT
        ap.name AS nome_aeroporto,
        ap.icao AS icao_aeroporto,
        rs.icao_empresa_aerea AS icao_empresa_aerea,
        rs.qtd_rotas_saindo AS qtd_rotas_saindo,
        rc.qtd_rotas_chegando AS qtd_rotas_chegando,
        pd.qtd_pousos_decolagens AS qtd_pousos_decolagens
    FROM api ap
    LEFT JOIN rotas_saindo rs ON ap.icao = rs.icao_aeroporto AND rs.rank_saindo = 1
    LEFT JOIN rotas_chegando rc ON ap.icao = rc.icao_aeroporto AND rc.rank_chegando = 1
    LEFT JOIN pousos_decolagens pd ON ap.icao = pd.icao_aeroporto
""")
df_resultado.createOrReplaceTempView("resultado")

# Display do preview do resultado
df_resultado.show(truncate=False)


+----------------------------------------------------------------------+--------------+------------------+----------------+------------------+---------------------+
|nome_aeroporto                                                        |icao_aeroporto|icao_empresa_aerea|qtd_rotas_saindo|qtd_rotas_chegando|qtd_pousos_decolagens|
+----------------------------------------------------------------------+--------------+------------------+----------------+------------------+---------------------+
|Charles de Gaulle Airport (Roissy Airport)                            |LFPG          |AFR               |551             |539               |1236                 |
|Porto Seguro Airport                                                  |SBPS          |AZU               |1943            |1940              |10653                |
|Hartsfield–Jackson Atlanta International Airport                      |KATL          |DAL               |333             |336               |671                  |
|Itaituba 

##Exportacao das TempView para formato .parquet de maneira que se a área de negóicos quiser ela pode consumir tanto em consultas AD-HOC em um GCP Big Query; Quanto carregar em uma ferramenta de BI (PowerBi, Looker, Tableau)

In [67]:
# task 1:
df_rota_mais_utilizada_por_empresa.coalesce(1).write.mode('overwrite').parquet('task1/parquet')

df_rota_mais_utilizada_por_empresa.coalesce(1).write.mode('overwrite').json('task1/json')


In [68]:
# task 2:

# exportacao formato '.parquet'
df_resultado.coalesce(1).write.mode('overwrite').parquet("task2/parquet")

# Exportacao formato 'json'
df_resultado.coalesce(1).write.mode('overwrite').json("task2/json")
