##Projeto Final: Processamento de Dados em Larga Escala



* **Aluno: Thiago Brito de Andrade Tenório**
* **Turma: Dados-2023.1**
* **Tecnologia Aplicada: PySpark SQL**
* **Base de Dados: Tarifas Anac (2013-2023)**
* **Link: https://sas.anac.gov.br/sas/downloads/view/frmDownload.aspx?tema=14**

A base se trata de dados referentes a Tarifas Aéreas registrados pela ANAC (Agência Nacional de Aviação Civil) no período entre Janeiro de 2013 e Novembro de 2023.

**Dicionário de Dados:**

1. **ANO:** Ano de Registro da Tarifa
2. **MES:** Mês de Registro da Tarifa
3. **EMPRESA:** Ticker da Empresa que executou o trecho.
4. **ORIGEM:** Ticker do Local de Origem do Trecho.
5. **DESTINO:** Ticker do Local de Destino do Trecho.
6. **TARIFA:** Valor cobrado no Trecho.
7. **ASSENTOS:** Número de assentos vendidos no Trecho por determinada empresa

*Obs:* Fiz o Upload de todos os dados organizados por ano e mês para meu Github no link a seguir: https://github.com/tbatenorio/BigData/tree/main/AnacDB








In [1]:
!pip install --upgrade pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=854ed73dc1ff9563bb056a1366993f24f3ff81b5caf6e66684252b2c1ae840e7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## Library Imports & Configs


In [2]:
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
import urllib.request

os.environ['PYSPARK_SUBMIT_ARGS'] = '\
    --driver-memory 2G \
    --executor-memory 2G \
    pyspark-shell'

spark = SparkSession.builder\
    .master("local[*]")\
    .getOrCreate()

##Importação das Bases de Dados

In [3]:
%%time
# Defina o esquema
schema = StructType([
    StructField("ANO", StringType(), True),
    StructField("MES", StringType(), True),
    StructField("EMPRESA", StringType(), True),
    StructField("ORIGEM", StringType(), True),
    StructField("DESTINO", StringType(), True),
    StructField("TARIFA", FloatType(), True),
    StructField("ASSENTOS", IntegerType(), True),
])

# Crie um DataFrame vazio com o esquema
data_spark = spark.createDataFrame([], schema=schema)

meses = ['01','02','03','04','05','06','07','08','09','10','11','12']

for ano in range (2013, 2024):
  for mes in meses:
      try:
          # Nome do arquivo local
          local_file = local_file = ('File' + str(ano) + str(mes) + '.CSV')
          #Monta a url
          url = (f'https://raw.githubusercontent.com/tbatenorio/BigData/main/AnacDB/{ano}/{ano}{mes}.CSV')
          # Baixe o arquivo CSV localmente
          urllib.request.urlretrieve(url, local_file)
          #Ler o arquivo CSV no Spark DataFrame
          data = spark.read.csv(local_file, header=True, sep=';')
          #Realizar Tratamento na coluna 'TARIFA' para ser tipo Float
          data = data.withColumn('TARIFA', F.regexp_replace('TARIFA', ',', '.').cast(FloatType()))
          #Realizar Tratamento na coluna 'ASSENTOS' para ser tipo Integer
          data = data.withColumn('ASSENTOS', F.col('ASSENTOS').cast(IntegerType()))
          #Adiciona à Base de Dados Geral
          data_spark = data_spark.union(data)
      except Exception as e:
          print(f"Erro ao ler o arquivo para o ano {ano} e mês {mes}: {e}")

data_spark.show()

Erro ao ler o arquivo para o ano 2023 e mês 12: HTTP Error 404: Not Found
+----+---+-------+------+-------+------+--------+
| ANO|MES|EMPRESA|ORIGEM|DESTINO|TARIFA|ASSENTOS|
+----+---+-------+------+-------+------+--------+
|2013|  1|    AZU|  SBRJ|   SBFN| 849.9|       2|
|2013|  1|    AZU|  SBSR|   SBPA|241.31|       4|
|2013|  1|    AZU|  SBKP|   SBJF|620.91|       1|
|2013|  1|    AZU|  SBBE|   SBKP| 769.9|       1|
|2013|  1|    AZU|  SBKP|   SBJI|432.31|       2|
|2013|  1|    AZU|  SBRP|   SBRJ| 188.0|       3|
|2013|  1|    AZU|  SBVT|   SBRJ| 90.02|       1|
|2013|  1|    AZU|  SBAR|   SBSV| 179.9|      21|
|2013|  1|    AZU|  SBFL|   SBKP|129.09|       9|
|2013|  1|    AZU|  SBSR|   SBMA| 899.9|       1|
|2013|  1|    AZU|  SBSV|   SBCF| 385.9|       3|
|2013|  1|    AZU|  SBMO|   SBSV|721.51|       1|
|2013|  1|    AZU|  SBNF|   SBUL|488.71|       2|
|2013|  1|    AZU|  SBGO|   SWRD|399.81|       1|
|2013|  1|    AZU|  SBFZ|   SBBR|1149.9|       7|
|2013|  1|    AZU|  SBAR| 

In [None]:
print("O número total de Registros das bases entre 2013 e 2023 é de: " + str(data_spark.count()))

O número total de Registros das bases entre 2013 e 2023 é de: 51580063


In [None]:
data_spark.printSchema()

root
 |-- ANO: string (nullable = true)
 |-- MES: string (nullable = true)
 |-- EMPRESA: string (nullable = true)
 |-- ORIGEM: string (nullable = true)
 |-- DESTINO: string (nullable = true)
 |-- TARIFA: float (nullable = true)
 |-- ASSENTOS: integer (nullable = true)



#Problema Número 1:

##Qual Trecho teve maior número de assentos vendidos (Ano-a-Ano)?


In [None]:
%%time

#Crio a coluna referente à Rota
best_root_by_year = data_spark.withColumn('ROUTE',F.concat( F.col('ORIGEM'), F.lit(" - "), F.col('DESTINO'))).drop('ORIGEM').drop('DESTINO')

# Crio uma coluna 'TOTAL_SEATS' com o somatório de 'ASSENTOS' por ANO e ROUTE
best_root_by_year = best_root_by_year.withColumn('TOTAL_SEATS', F.sum('ASSENTOS').over(Window.partitionBy('ANO', 'ROUTE'))).drop('ASSENTOS')

# Crio uma janela por ANO e ordene pela quantidade total de assentos em ordem decrescente
window_spec = Window.partitionBy('ANO').orderBy(F.desc('TOTAL_SEATS'))

# Adicione uma coluna 'rank' com a classificação da quantidade de assentos
best_root_by_year = best_root_by_year.withColumn('rank', F.row_number().over(window_spec))

# Filtrar as linhas onde 'rank' é igual a 1, ou seja, o trecho com o maior número de assentos
best_root_by_year = best_root_by_year.filter(F.col('rank') == 1).orderBy('ANO').drop('rank')

best_root_by_year.show(11)

+----+---+-------+-------+-----------+-----------+
| ANO|MES|EMPRESA| TARIFA|      ROUTE|TOTAL_SEATS|
+----+---+-------+-------+-----------+-----------+
|2013|  1|    AZU|   67.0|SBRJ - SBSP|    1012536|
|2014|  1|    AZU|  360.9|SBRJ - SBSP|    1091060|
|2015|  1|    AZU|  849.9|SBSP - SBRJ|     925654|
|2016|  1|    AZU|  477.9|SBSP - SBRJ|     827281|
|2017|  1|    AZU| 267.21|SBSP - SBRJ|     787912|
|2018|  1|    AZU|1570.22|SBSP - SBRJ|     738536|
|2019|  1|    AZU|  179.9|SBSP - SBRJ|     702842|
|2020|  1|    AZU| 1019.9|SBRJ - SBSP|     350451|
|2021|  1|    AZU|  11.39|SBRJ - SBSP|     439748|
|2022|  1|    AZU| 1002.0|SBSP - SBRJ|     436302|
|2023|  1|    AZU| 112.14|SBSP - SBRJ|     503368|
+----+---+-------+-------+-----------+-----------+

CPU times: user 2.74 s, sys: 388 ms, total: 3.12 s
Wall time: 8min 54s


**Insight: Fica observado que o Trecho Rio-São Paulo ou vice-versa foi o que teve maior número de assentos vendidos para toda a série histórica.**

#Problema Número 2:

##Quais foram a 5 empresas com maior faturamento no ano de 2022, ano de normalização do setor aéreo pós Pandemia do COVID?

In [4]:
%%time
# Seleciona os dados e calcula o coluna 'VALOR_FATURADO'
top_5_comp_revenue = data_spark.withColumn('VALOR_FATURADO', F.col('TARIFA') * F.col('ASSENTOS'))

# Filtrar por ano e agrupar por empresa e obter os 5 maiores valores
top_5_comp_revenue = top_5_comp_revenue.where(F.col('ANO') == '2022')\
                             .groupBy('EMPRESA')\
                             .sum('VALOR_FATURADO')\
                             .withColumnRenamed('sum(VALOR_FATURADO)', 'TOTAL_REVENUE')\
                             .orderBy(F.desc('TOTAL_REVENUE'))\
                             .limit(5)

 # Formatar a coluna 'total_revenue'
top_5_comp_revenue = top_5_comp_revenue.withColumn(
    'TOTAL_REVENUE',
    F.format_number('TOTAL_REVENUE', 0)
)

# Mostrar os resultados
top_5_comp_revenue.show(truncate=False)

+-------+-------------+
|EMPRESA|TOTAL_REVENUE|
+-------+-------------+
|AZU    |6,410,829,530|
|TAM    |5,497,794,038|
|GLO    |3,996,342,260|
|PTB    |74,609,663   |
|ABJ    |2,208,270    |
+-------+-------------+

CPU times: user 831 ms, sys: 124 ms, total: 955 ms
Wall time: 2min 30s


#Problema Número 3:

##Qual o Crescimento Percental de Faturamento (Ano-a-Ano) dentro da Série Histórica das 3 empresas que mais faturaram em 2022

In [5]:
%%time

#Lista com Nomes das Top 3 Empresas de Maior Faturamento em 2022
name_comp = top_5_comp_revenue.select('EMPRESA').limit(3).rdd.flatMap(lambda x: x).collect()

window_spec = Window.partitionBy('EMPRESA').orderBy('ANO')

#Calculo Valor Faturado por Trecho
top_3_revenue_hist = data_spark.withColumn('VALOR_FATURADO', F.col('TARIFA') * F.col('ASSENTOS'))

#Filtro
top_3_revenue_hist = top_3_revenue_hist.where(F.col('EMPRESA').isin(name_comp))\
                                       .groupBy('ANO', 'EMPRESA')\
                                       .agg(F.sum('VALOR_FATURADO').alias('FAT_BY_YEAR'))\
                                       .orderBy('EMPRESA', 'ANO')
#Criação do Valor de "Shiff" do Faturamento
top_3_revenue_hist = top_3_revenue_hist.withColumn('LAST_YEAR_FAT', F.lag('FAT_BY_YEAR').over(window_spec))

#Casting
top_3_revenue_hist = top_3_revenue_hist.withColumn('LAST_YEAR_FAT', F.col('LAST_YEAR_FAT').cast(FloatType()))
top_3_revenue_hist = top_3_revenue_hist.withColumn('FAT_BY_YEAR', F.col('FAT_BY_YEAR').cast(FloatType()))

#Calculo dos Percentuais
growth_top_3_revenue_hist = top_3_revenue_hist.withColumn('GROWTH_PERCENTAGE', ((F.col('FAT_BY_YEAR') / F.col('LAST_YEAR_FAT')) - 1) * 100)

#Formatação
growth_top_3_revenue_hist = growth_top_3_revenue_hist.withColumn(
    'FAT_BY_YEAR',
    F.format_number('FAT_BY_YEAR', 0)
)

growth_top_3_revenue_hist = growth_top_3_revenue_hist.drop('LAST_YEAR_FAT')

#Formatação
growth_top_3_revenue_hist = growth_top_3_revenue_hist.withColumn(
    'GROWTH_PERCENTAGE',
    F.format_number('GROWTH_PERCENTAGE', 2)
).withColumn(
    'GROWTH_PERCENTAGE',
    F.expr("CONCAT(GROWTH_PERCENTAGE, ' %')" )
)

growth_top_3_revenue_hist.show(33)

+----+-------+-------------+-----------------+
| ANO|EMPRESA|  FAT_BY_YEAR|GROWTH_PERCENTAGE|
+----+-------+-------------+-----------------+
|2013|    AZU|4,978,275,328|             NULL|
|2014|    AZU|4,932,191,744|          -0.93 %|
|2015|    AZU|4,238,011,904|         -14.07 %|
|2016|    AZU|3,783,654,144|         -10.72 %|
|2017|    AZU|4,050,829,824|           7.06 %|
|2018|    AZU|4,086,864,896|           0.89 %|
|2019|    AZU|4,631,773,696|          13.33 %|
|2020|    AZU|2,468,897,536|         -46.70 %|
|2021|    AZU|4,558,939,136|          84.65 %|
|2022|    AZU|6,410,829,312|          40.62 %|
|2023|    AZU|6,152,730,112|          -4.03 %|
|2013|    GLO|5,078,872,576|             NULL|
|2014|    GLO|5,226,098,176|           2.90 %|
|2015|    GLO|4,239,774,976|         -18.87 %|
|2016|    GLO|3,707,688,448|         -12.55 %|
|2017|    GLO|4,114,338,816|          10.97 %|
|2018|    GLO|4,575,376,384|          11.21 %|
|2019|    GLO|5,424,360,448|          18.56 %|
|2020|    GLO

# Problema Número 4:

##Dado que há meses que possuem historicamente maior e menor número de vendas, para qual mês do ano você indicaria a criação de oferta promocional para alavancar o número de vendas?

In [6]:
%%time
#Crio um coluna com o somatório de assentos vendidos por Ano-Mês
worst_months_by_year = data_spark.withColumn('SEATS_SOLD', F.sum('ASSENTOS').over(Window.partitionBy('ANO', 'MES'))).drop('EMPRESA','ORIGEM','DESTINO','TARIFA','ASSENTOS')

worst_months_by_year = worst_months_by_year.distinct()

#Crio uma Janela baseada no Ano, Ordenando Crescente pelo Total de assentos Ano-Mes
window_spec = Window.partitionBy('ANO').orderBy(F.asc('SEATS_SOLD'))
#Obtem os 3 piores meses em Assentos vendidos por Ano
worst_months_by_year = worst_months_by_year.withColumn('rank', F.row_number().over(window_spec))\
                                           .where(F.col('rank') <= 3)\
                                           .drop('rank')

worst_months_by_year.show(33)

+----+---+----------+
| ANO|MES|SEATS_SOLD|
+----+---+----------+
|2013|  2|   3341095|
|2013| 12|   3790080|
|2013|  1|   3832488|
|2014|  6|   3651053|
|2014|  2|   3772074|
|2014| 12|   3915556|
|2015| 12|   3286404|
|2015|  9|   3674112|
|2015| 10|   3737626|
|2016| 12|   2839457|
|2016|  9|   2916480|
|2016| 10|   3143825|
|2017| 12|   2745175|
|2017|  2|   2957363|
|2017|  4|   2974308|
|2018| 12|   2498232|
|2018|  2|   2709722|
|2018|  8|   3066826|
|2019| 12|   2585896|
|2019|  4|   2639828|
|2019|  2|   2701915|
|2020|  5|    461570|
|2020|  4|    509942|
|2020|  6|    787037|
|2021|  3|   1222017|
|2021|  4|   1360092|
|2021|  2|   1443330|
|2022| 12|   1826543|
|2022|  2|   1837818|
|2022|  7|   1932906|
|2023|  2|   2006356|
|2023|  7|   2121394|
|2023|  6|   2173649|
+----+---+----------+

CPU times: user 1.31 s, sys: 211 ms, total: 1.52 s
Wall time: 4min 14s


**Diante dos dados obtidos, os meses que apresentaram pior resultado durante a série histórica são os meses de FEVEREIRO E DEZEMBRO, sendo assim indicaria tais meses para a criação de ofertas promocionais.**