# Importando Bibliotecas

In [34]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# import plotly.express as px

import pyarrow as pa
import pyarrow.parquet as pq

from pyspark.sql import functions as F # Importando as funções sql 
from pyspark.sql.types import *

## Criando Sessão Spark

In [35]:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# PIB

## Importando os Dados

In [36]:
dir = '/workspaces/TCC_MLaaS_PIB_ANAC/base_dados/PIB/BaseDados_PIB_2009_2021.parquet.gzip'
pib = spark.read.parquet(dir)


## Explorando os Dados

In [37]:

pib.printSchema()

pib.show()

root
 |-- Ano: long (nullable = true)
 |-- Nome_UF: string (nullable = true)
 |-- Cod_Cidade: string (nullable = true)
 |-- Nome_Cidade: string (nullable = true)
 |-- Valor_Agropecuaria: double (nullable = true)
 |-- Valor_Industria: double (nullable = true)
 |-- Valor_Servicos: double (nullable = true)
 |-- Governo: double (nullable = true)
 |-- Total: double (nullable = true)
 |-- Impostos: double (nullable = true)
 |-- PIB: double (nullable = true)
 |-- PIB_per_capita: double (nullable = true)
 |-- __index_level_0__: long (nullable = true)

+----+-------+----------+--------------------+------------------+---------------+--------------+----------+----------+----------+-----------+--------------+-----------------+
| Ano|Nome_UF|Cod_Cidade|         Nome_Cidade|Valor_Agropecuaria|Valor_Industria|Valor_Servicos|   Governo|     Total|  Impostos|        PIB|PIB_per_capita|__index_level_0__|
+----+-------+----------+--------------------+------------------+---------------+--------------+----

## Tratamento dos Dados

In [38]:
# Deletando coluna desnecessária.
pib = pib.drop('__index_level_0__')

In [39]:
#Conferindo se foi excluída.
pib.columns

['Ano',
 'Nome_UF',
 'Cod_Cidade',
 'Nome_Cidade',
 'Valor_Agropecuaria',
 'Valor_Industria',
 'Valor_Servicos',
 'Governo',
 'Total',
 'Impostos',
 'PIB',
 'PIB_per_capita']

## Agrupando Valores

In [40]:

g_pib = pib.groupBy("Ano")\
        .agg((F.sum("Valor_Agropecuaria")/F.lit(1_000_000_000)).alias('Agropecuaria'),\
             (F.sum('Valor_Industria')/F.lit(1_000_000_000)).alias('Industria'),\
             (F.sum('Valor_Servicos')/F.lit(1_000_000_000)).alias('Servicos'),\
             (F.sum('Governo')/F.lit(1_000_000_000)).alias('Governo'))\
        .orderBy("Ano", ascending=True)\
        .toPandas()

## Plotando o Gráfico do PIB

In [41]:
columns = [ 'Agropecuaria', 'Industria', 'Servicos', 'Governo']

fig = px.line(g_pib, x="Ano", y=columns,title= 'Histórico das Composições do PIB 2002 - 2021')
fig.show()

# ANAC

## Importando os Dados


In [42]:

raw_Anac = spark.read.parquet('/workspaces/TCC_MLaaS_PIB_ANAC/base_dados/PassagensAereas/BaseDados_PassagensAereas.parquet.gzip')

raw_Anac

DataFrame[Ano: bigint, Mes: bigint, Empresa: string, Origem: string, Destino: string, Tarifa: string, Assentos: bigint, __index_level_0__: bigint]

## Explorando os Dados

In [43]:
raw_Anac.printSchema()
raw_Anac.show()


root
 |-- Ano: long (nullable = true)
 |-- Mes: long (nullable = true)
 |-- Empresa: string (nullable = true)
 |-- Origem: string (nullable = true)
 |-- Destino: string (nullable = true)
 |-- Tarifa: string (nullable = true)
 |-- Assentos: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)

+----+---+-------+------+-------+------+--------+-----------------+
| Ano|Mes|Empresa|Origem|Destino|Tarifa|Assentos|__index_level_0__|
+----+---+-------+------+-------+------+--------+-----------------+
|2002|  1|    GLO|  SBPA|   SBBR|397,00|      51|                0|
|2002|  1|    GLO|  SBSV|   SBRF|272,00|       5|                1|
|2002|  1|    GLO|  SBFL|   SBGL|223,00|     196|                2|
|2002|  1|    GLO|  SBGL|   SBSP| 96,00|     615|                3|
|2002|  1|    GLO|  SBGL|   SBRF|340,00|     297|                4|
|2002|  1|    GLO|  SBSP|   SBFL|145,00|     189|                5|
|2002|  1|    GLO|  SBBH|   SBBR|166,00|      37|                6|
|2002|  1

O campo **Tarifa** está com o formato string, para alterar, será necessário substitir a "*,*" pelo "*.*" para mudar o formato da coluna para o tipo numérico.

## Tratamento dos Dados


In [44]:
#Deletando a coluna desnecessária.
raw_Anac = raw_Anac.drop('__index_level_0__')


In [45]:
from pyspark.sql.functions import *

In [46]:
# Substituindo virgula por ponto.
raw_Anac = raw_Anac.withColumn('Tarifa',regexp_replace('Tarifa',',','.'))

# Alterando o formato string pra Double
anac = raw_Anac.withColumn('Tarifa',raw_Anac['Tarifa'].cast(DoubleType()).alias('Tarifa')) 

In [47]:
#Conferindo os formatos das colunas
anac.printSchema()


root
 |-- Ano: long (nullable = true)
 |-- Mes: long (nullable = true)
 |-- Empresa: string (nullable = true)
 |-- Origem: string (nullable = true)
 |-- Destino: string (nullable = true)
 |-- Tarifa: double (nullable = true)
 |-- Assentos: long (nullable = true)



In [48]:
#Adicionando uma nova coluna multiplicando os valores das passagens com os assentos vendidos
anac = anac.withColumn("Total",col("Tarifa")*col("Assentos"))

## Agrupando os Valores

In [49]:
g_assentos = anac.groupBy("Ano")\
               .agg(round((F.max("Assentos")/F.lit(1)),2).alias('max'),\
                    round((F.min('Assentos')/F.lit(1)),2).alias('min'),
                    round((F.avg('Assentos')/F.lit(1)),2).alias('avg'),
                    round((F.sum('Assentos')/F.lit(1)),2).alias('sum'))\
               .orderBy("Ano", ascending=True)\
               .toPandas()

                                                                                

In [50]:
g_tarifas = anac.groupBy("Ano")\
               .agg(round((F.max("Tarifa")/F.lit(1)),2).alias('max'),\
                    round((F.min('Tarifa')/F.lit(1)),2).alias('min'),
                    round((F.avg('Tarifa')/F.lit(1)),2).alias('avg'),
                    round((F.sum('Total')/F.lit(1)),2).alias('Total'),
                    round((F.count('Tarifa')/F.lit(1)),2).alias('count'))\
               .orderBy("Ano", ascending=True)\
               .toPandas()

                                                                                

## Plotando o Gráfico das Passagens Aéreas

In [51]:
# Plotando Gráfico histórico dos valores das passagens aereas Brasileiras

# fig = px.line(g_tarifas, x="Ano", y="avg", title="Histórico da Média dos Valores das Passagens Aéreas Brasileiras")
# fig.show()

In [52]:
# fig = px.line(g_tarifas, x="Ano", y='Total', title="Histórico Valor Total das Passagens Aéreas Brasileiras")
# fig.show()

In [53]:
# fig = px.line(g_tarifas, x="Ano", y='count', title="Histórico da Quantidade das Passagens Aéreas Brasileiras")
# fig.show()


In [54]:
# fig = px.line(g_assentos, x="Ano", y='sum', title="Histórico da Soma das Passagens Aéreas Brasileiras")
# fig.show()


# Aerodromos

## Lendo os Arquivos


In [55]:
pistas = spark.read.csv('/workspaces/TCC_MLaaS_PIB_ANAC/base_dados/Aerodromos/Base_Pistas.csv',header=True)

pistas

DataFrame[OACI: string, CIAD: string, Nome: string, Municipio: string, UF: string, Tipo: string]

## Explorando os Dados

In [56]:
pistas.show()

+----+------+--------------------+--------------------+---+-------+
|OACI|  CIAD|                Nome|           Municipio| UF|   Tipo|
+----+------+--------------------+--------------------+---+-------+
|SBAA|PA0008|CONCEIÇÃO DO ARAG...|CONCEIÇÃO DO ARAG...| PA|Publico|
|SBAE|SP0010|       BAURU/AREALVA|               BAURU| SP|Publico|
|SBAQ|SP0012|BARTOLOMEU DE GUSMÃO|          ARARAQUARA| SP|Publico|
|SBAR|SE0001|         SANTA MARIA|             ARACAJU| SE|Publico|
|SBAT|MT0003|PILOTO OSVALDO MA...|       ALTA FLORESTA| MT|Publico|
|SBAU|SP0009|           ARAÇATUBA|           ARAÇATUBA| SP|Publico|
|SBAX|MG0008|          ROMEU ZEMA|               ARAXÁ| MG|Publico|
|SBBE|PA0001|INTERNACIONAL DE ...|               BELÉM| PA|Publico|
|SBBG|RS0010|COMANDANTE GUSTAV...|                BAGÉ| RS|Publico|
|SBBH|MG0003|PAMPULHA - CARLOS...|      BELO HORIZONTE| MG|Publico|
|SBBI|PR0006|           BACACHERI|            CURITIBA| PR|Publico|
|SBBP|SP0036|ESTADUAL ARTHUR S...|   BRAGANÇA PA

## Realizando Join ANAC e Aerodromos

In [57]:
from pyspark.sql.functions import desc

In [58]:
anac_origem = anac
anac_origem.show()

+----+---+-------+------+-------+------+--------+--------+
| Ano|Mes|Empresa|Origem|Destino|Tarifa|Assentos|   Total|
+----+---+-------+------+-------+------+--------+--------+
|2002|  1|    GLO|  SBPA|   SBBR| 397.0|      51| 20247.0|
|2002|  1|    GLO|  SBSV|   SBRF| 272.0|       5|  1360.0|
|2002|  1|    GLO|  SBFL|   SBGL| 223.0|     196| 43708.0|
|2002|  1|    GLO|  SBGL|   SBSP|  96.0|     615| 59040.0|
|2002|  1|    GLO|  SBGL|   SBRF| 340.0|     297|100980.0|
|2002|  1|    GLO|  SBSP|   SBFL| 145.0|     189| 27405.0|
|2002|  1|    GLO|  SBBH|   SBBR| 166.0|      37|  6142.0|
|2002|  1|    GLO|  SBPA|   SBSP| 269.0|     184| 49496.0|
|2002|  1|    GLO|  SBSP|   SBBR| 166.0|    4621|767086.0|
|2002|  1|    GLO|  SBGL|   SBCT| 217.0|       6|  1302.0|
|2002|  1|    GLO|  SBGL|   SBPA| 228.0|     297| 67716.0|
|2002|  1|    GLO|  SBPA|   SBBR| 295.0|      69| 20355.0|
|2002|  1|    GLO|  SBBR|   SBCT| 299.0|      22|  6578.0|
|2002|  1|    GLO|  SBRF|   SBSV| 154.0|      15|  2310.

In [59]:
# Join das cidades de Origem com base nos aeroportos.

anac_origem = (anac\
    .join(pistas,anac.Origem == pistas.OACI,'inner')\
        .drop("OACI", "CIAD","Nome","Tipo")\
            .withColumnRenamed("Municipio","Municipio_Origem")
            .withColumnRenamed("UF","UF_Origem")
)
anac_origem.show()

+----+---+-------+------+-------+------+--------+--------+----------------+---------+
| Ano|Mes|Empresa|Origem|Destino|Tarifa|Assentos|   Total|Municipio_Origem|UF_Origem|
+----+---+-------+------+-------+------+--------+--------+----------------+---------+
|2002|  1|    GLO|  SBPA|   SBBR| 397.0|      51| 20247.0|    PORTO ALEGRE|       RS|
|2002|  1|    GLO|  SBSV|   SBRF| 272.0|       5|  1360.0|        SALVADOR|       BA|
|2002|  1|    GLO|  SBFL|   SBGL| 223.0|     196| 43708.0|   FLORIANÓPOLIS|       SC|
|2002|  1|    GLO|  SBGL|   SBSP|  96.0|     615| 59040.0|  RIO DE JANEIRO|       RJ|
|2002|  1|    GLO|  SBGL|   SBRF| 340.0|     297|100980.0|  RIO DE JANEIRO|       RJ|
|2002|  1|    GLO|  SBSP|   SBFL| 145.0|     189| 27405.0|       SÃO PAULO|       SP|
|2002|  1|    GLO|  SBBH|   SBBR| 166.0|      37|  6142.0|  BELO HORIZONTE|       MG|
|2002|  1|    GLO|  SBPA|   SBSP| 269.0|     184| 49496.0|    PORTO ALEGRE|       RS|
|2002|  1|    GLO|  SBSP|   SBBR| 166.0|    4621|76708

In [60]:
df_anac = (anac_origem\
    .join(pistas,anac_origem.Destino == pistas.OACI,'inner')\
        .drop("OACI", "CIAD","Nome","Tipo")\
            .withColumnRenamed("Municipio","Municipio_Destino")
            .withColumnRenamed("UF","UF_Destino")
)
df_anac.show()

+----+---+-------+------+-------+------+--------+--------+----------------+---------+-----------------+----------+
| Ano|Mes|Empresa|Origem|Destino|Tarifa|Assentos|   Total|Municipio_Origem|UF_Origem|Municipio_Destino|UF_Destino|
+----+---+-------+------+-------+------+--------+--------+----------------+---------+-----------------+----------+
|2002|  1|    GLO|  SBPA|   SBBR| 397.0|      51| 20247.0|    PORTO ALEGRE|       RS|         BRASÍLIA|        DF|
|2002|  1|    GLO|  SBSV|   SBRF| 272.0|       5|  1360.0|        SALVADOR|       BA|           RECIFE|        PE|
|2002|  1|    GLO|  SBFL|   SBGL| 223.0|     196| 43708.0|   FLORIANÓPOLIS|       SC|   RIO DE JANEIRO|        RJ|
|2002|  1|    GLO|  SBGL|   SBSP|  96.0|     615| 59040.0|  RIO DE JANEIRO|       RJ|        SÃO PAULO|        SP|
|2002|  1|    GLO|  SBGL|   SBRF| 340.0|     297|100980.0|  RIO DE JANEIRO|       RJ|           RECIFE|        PE|
|2002|  1|    GLO|  SBSP|   SBFL| 145.0|     189| 27405.0|       SÃO PAULO|     

In [61]:
df_anac.show()

+----+---+-------+------+-------+------+--------+--------+----------------+---------+-----------------+----------+
| Ano|Mes|Empresa|Origem|Destino|Tarifa|Assentos|   Total|Municipio_Origem|UF_Origem|Municipio_Destino|UF_Destino|
+----+---+-------+------+-------+------+--------+--------+----------------+---------+-----------------+----------+
|2002|  1|    GLO|  SBPA|   SBBR| 397.0|      51| 20247.0|    PORTO ALEGRE|       RS|         BRASÍLIA|        DF|
|2002|  1|    GLO|  SBSV|   SBRF| 272.0|       5|  1360.0|        SALVADOR|       BA|           RECIFE|        PE|
|2002|  1|    GLO|  SBFL|   SBGL| 223.0|     196| 43708.0|   FLORIANÓPOLIS|       SC|   RIO DE JANEIRO|        RJ|
|2002|  1|    GLO|  SBGL|   SBSP|  96.0|     615| 59040.0|  RIO DE JANEIRO|       RJ|        SÃO PAULO|        SP|
|2002|  1|    GLO|  SBGL|   SBRF| 340.0|     297|100980.0|  RIO DE JANEIRO|       RJ|           RECIFE|        PE|
|2002|  1|    GLO|  SBSP|   SBFL| 145.0|     189| 27405.0|       SÃO PAULO|     

In [62]:
df_anac.show(4)

+----+---+-------+------+-------+------+--------+-------+----------------+---------+-----------------+----------+
| Ano|Mes|Empresa|Origem|Destino|Tarifa|Assentos|  Total|Municipio_Origem|UF_Origem|Municipio_Destino|UF_Destino|
+----+---+-------+------+-------+------+--------+-------+----------------+---------+-----------------+----------+
|2002|  1|    GLO|  SBPA|   SBBR| 397.0|      51|20247.0|    PORTO ALEGRE|       RS|         BRASÍLIA|        DF|
|2002|  1|    GLO|  SBSV|   SBRF| 272.0|       5| 1360.0|        SALVADOR|       BA|           RECIFE|        PE|
|2002|  1|    GLO|  SBFL|   SBGL| 223.0|     196|43708.0|   FLORIANÓPOLIS|       SC|   RIO DE JANEIRO|        RJ|
|2002|  1|    GLO|  SBGL|   SBSP|  96.0|     615|59040.0|  RIO DE JANEIRO|       RJ|        SÃO PAULO|        SP|
+----+---+-------+------+-------+------+--------+-------+----------------+---------+-----------------+----------+
only showing top 4 rows



In [63]:
df_anac.filter(df_anac.Ano==2014)\
    .groupBy("Ano","Municipio_Origem","Municipio_Destino")\
                    .agg(round((F.sum('Total')/F.lit(1_000_000)),2).alias('Total'),\
                        # round((F.count('Municipio_Origem')/F.lit(1)),2).alias('Municipio_Origem'),\
                        round((F.count('Municipio_Origem')/F.lit(1)),2).alias('Municipio_Origem')\
                    )\
                .orderBy("Total", ascending=False).show(5)
                #    .toPandas()
 

[Stage 95:>                                                         (0 + 2) / 3]

+----+----------------+-----------------+------+----------------+
| Ano|Municipio_Origem|Municipio_Destino| Total|Municipio_Origem|
+----+----------------+-----------------+------+----------------+
|2014|       SÃO PAULO|   RIO DE JANEIRO|369.98|         39529.0|
|2014|  RIO DE JANEIRO|        SÃO PAULO|366.41|         38604.0|
|2014|          RECIFE|        GUARULHOS|209.83|         17288.0|
|2014|       GUARULHOS|           RECIFE|204.86|         16213.0|
|2014|        SALVADOR|        GUARULHOS|200.67|         21864.0|
+----+----------------+-----------------+------+----------------+
only showing top 5 rows



                                                                                

In [64]:
df_anac.filter(df_anac.Ano==2015)\
    .groupBy("Ano","Municipio_Origem","Municipio_Destino")\
                    .agg(round((F.sum('Total')/F.lit(1_000_000)),2).alias('Total'),\
                        # round((F.count('Municipio_Origem')/F.lit(1)),2).alias('Municipio_Origem'),\
                        round((F.count('Municipio_Origem')/F.lit(1)),2).alias('Municipio_Origem')\
                    )\
                .orderBy("Total", ascending=False).show(5)
                #    .toPandas()
 

[Stage 99:>                                                         (0 + 2) / 3]

+----+----------------+-----------------+------+----------------+
| Ano|Municipio_Origem|Municipio_Destino| Total|Municipio_Origem|
+----+----------------+-----------------+------+----------------+
|2015|       SÃO PAULO|   RIO DE JANEIRO|324.95|         60465.0|
|2015|  RIO DE JANEIRO|        SÃO PAULO|319.93|         57706.0|
|2015|          RECIFE|        GUARULHOS|195.04|         23111.0|
|2015|       GUARULHOS|           RECIFE|191.53|         22394.0|
|2015|        SALVADOR|        GUARULHOS|175.54|         25956.0|
+----+----------------+-----------------+------+----------------+
only showing top 5 rows



                                                                                

In [65]:
g_assentos = anac.groupBy("Ano")\
               .agg(round((F.max("Assentos")/F.lit(1)),2).alias('max'),\
                    round((F.min('Assentos')/F.lit(1)),2).alias('min'),
                    round((F.avg('Assentos')/F.lit(1)),2).alias('avg'),
                    round((F.sum('Assentos')/F.lit(1)),2).alias('sum'))\
               .orderBy("Ano", ascending=True)\
               .toPandas()

                                                                                