# Lab 4 - Programação Spark Dataframes

**Discentes**

- Emanoel Batista
- Raphael Ramos

## Preparação do Dataframe

In [1]:
# Importação das bibliotecas necessárias
from pyspark.sql import SparkSession  # Importa a classe SparkSession para criar uma sessão Spark
import os  # Importa a biblioteca os para interagir com o sistema operacional (ex. verificar a existência de arquivos)

# Criação da sessão Spark com o nome de aplicativo especificado
spark = SparkSession.builder.appName("Lab 4 - Programação Spark Dataframes").getOrCreate()

# Definição do caminho para o arquivo CSV
path = 'voosBrasil.csv'

# Inicializa a variável 'dfa' com valor None. Ela será usada para armazenar o DataFrame
dfa = None

# Bloco try-except para tratar possíveis erros na leitura do arquivo
try:
  # Verifica se o arquivo especificado existe no caminho fornecido
  if not os.path.exists(path):
    # Se o arquivo não existir, levanta uma exceção personalizada
    raise Exception(f'Arquivo {path} não existe')

  # Lê o arquivo CSV utilizando o Spark, com a opção de usar o cabeçalho e inferir o tipo de dados das colunas
  dfa = spark.read.csv(path, header=True, inferSchema=True)

  # Verifica se a leitura do arquivo foi bem-sucedida (se o DataFrame está vazio)
  if not dfa:
    # Se o DataFrame estiver vazio, levanta uma exceção indicando erro na leitura
    raise Exception('Erro na leitura do dataset')

# Bloco para capturar e exibir qualquer erro que ocorra no bloco try
except Exception as error:
    # Exibe a mensagem do erro caso ocorra
    print(error)

# Exibe o schema (estrutura) do DataFrame 'dfa', que mostra os nomes das colunas e seus tipos de dados
dfa.schema

your 131072x1 screen size is bogus. expect trouble
24/11/13 09:00:11 WARN Utils: Your hostname, DESKTOP-JALCD72 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/13 09:00:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/13 09:00:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

StructType([StructField('Voos', StringType(), True), StructField('Companhia.Aerea', StringType(), True), StructField('Codigo.Tipo.Linha', StringType(), True), StructField('Partida.Prevista', TimestampType(), True), StructField('Partida.Real', StringType(), True), StructField('Chegada.Prevista', TimestampType(), True), StructField('Chegada.Real', StringType(), True), StructField('Situacao.Voo', StringType(), True), StructField('Codigo.Justificativa', StringType(), True), StructField('Aeroporto.Origem', StringType(), True), StructField('Cidade.Origem', StringType(), True), StructField('Estado.Origem', StringType(), True), StructField('Pais.Origem', StringType(), True), StructField('Aeroporto.Destino', StringType(), True), StructField('Cidade.Destino', StringType(), True), StructField('Estado.Destino', StringType(), True), StructField('Pais.Destino', StringType(), True), StructField('LongDest', DoubleType(), True), StructField('LatDest', DoubleType(), True), StructField('LongOrig', Double

### Pré-processamento do Dataframe

In [2]:
# Importação da função 'to_timestamp' do módulo pyspark.sql.functions
from pyspark.sql.functions import to_timestamp

# Identificação das colunas cujos nomes contêm um ponto '.'
# A list comprehension percorre todas as colunas de 'dfa' e verifica se o nome da coluna contém um ponto
cols_with_dot = [col for col in dfa.columns if '.' in col]

# Criação de um dicionário que mapeia o nome das colunas com ponto para o mesmo nome, mas com o ponto substituído por '_'
# O método replace substitui todos os pontos por underscores
cols_dict = {col: col.replace('.', '_') for col in cols_with_dot}

# Renomeação das colunas no DataFrame 'dfa' de acordo com o dicionário 'cols_dict'
# 'withColumnsRenamed' recebe o dicionário de renomeações e aplica as mudanças no DataFrame
dfa = dfa.withColumnsRenamed(cols_dict)

# Identificação das colunas que contêm informações de data/hora, com base no nome da coluna
# Aqui, as colunas que começam com 'Chegada' ou 'Partida' são identificadas como colunas de timestamp
timestamp_cols = [col for col in cols_dict.values() if col.startswith('Chegada') or col.startswith('Partida')]

# Conversão das colunas de data/hora para o tipo Timestamp
# 'to_timestamp' converte as colunas selecionadas para o tipo Timestamp, que é mais adequado para manipulações de data e hora
for col in timestamp_cols:
    dfa = dfa.withColumn(col, to_timestamp(col))

# Exibição do DataFrame 'dfa' resultante, para verificar as mudanças feitas
# 'show' exibe as primeiras 20 linhas do DataFrame para visualização
dfa.show()

+---------+--------------------+-----------------+-------------------+-------------------+-------------------+-------------------+------------+--------------------+----------------+--------------------+-------------+--------------+-----------------+--------------------+--------------+--------------+-----------+-----------+-----------+-----------+
|     Voos|     Companhia_Aerea|Codigo_Tipo_Linha|   Partida_Prevista|       Partida_Real|   Chegada_Prevista|       Chegada_Real|Situacao_Voo|Codigo_Justificativa|Aeroporto_Origem|       Cidade_Origem|Estado_Origem|   Pais_Origem|Aeroporto_Destino|      Cidade_Destino|Estado_Destino|  Pais_Destino|   LongDest|    LatDest|   LongOrig|    LatOrig|
+---------+--------------------+-----------------+-------------------+-------------------+-------------------+-------------------+------------+--------------------+----------------+--------------------+-------------+--------------+-----------------+--------------------+--------------+--------------+--

## Análise de Pontualidade dos Voos

### Objetivo
  - Calcular o atraso médio de cada companhia aérea e identificar as mais e menos pontuais.

### Processo
  - Calcule a diferença entre Partida.Prevista e Partida.Real e entre Chegada.Prevista e Chegada.Real.
  - Agrupe por Companhia.Aerea e calcule o atraso médio para partidas e chegadas.

### Resultado esperado/saída
  - Uma lista das companhias aéreas com o melhor e pior desempenho em relação à pontualidade.

In [7]:
from pyspark.sql.types import LongType
from pyspark.sql import functions as Func
from pyspark.sql.functions import col, asc, desc

"""
Diferença entre dois timestamps

Parâmetros:
  - start_ts (string) : timestamp inicial
  - end_ts (string) : timestamp final
  - unit (char) : retorno desejado. Valores possíveis: seconds (segundos), minutes (minutos) ou hours (horas)

Retorna:
  - long : Diferença
"""
def diff_ts(start_ts , end_ts, unit):
  seconds = col(start_ts).cast('long') - col(end_ts).cast('long')
  minutes = seconds/60.0
  hours = minutes/60.0
  return locals()[unit]

"""
Rank por média de atraso de partida e chegada das companhias aéreas

Parâmetros:
  - df (DataFrame) : tabela com dados de voos do Brasil
  - order (string) : ordem desejada. Valores: asc (ascendente), desc (descendente)
  - n (int) : tamanho do rank

Retorna:
  - DataFrame : rank das n companhias ordenadas
"""
def rank_avg_delays(df, order, n):
  return df.sort(
      globals()[order]('Media_Atraso_Partida'),
      globals()[order]('Media_Atraso_Chegada')).show(n)

# Unidade de tempo
units = {'seconds' : 'segundos', 'minutes' : 'minutos', 'hours' : 'horas'}
unit = 'hours'

# diffs_ts_udf = Func.udf(diff_ts, LongType())

# Cópia do dataframe rejeitando os timestamps NULL
dfa_filtered = dfa.filter(
    dfa.Chegada_Prevista.isNotNull() & dfa.Chegada_Real.isNotNull()\
    & dfa.Partida_Real.isNotNull() & dfa.Partida_Prevista.isNotNull()
)

# Diferenças entre prevista e real
dfa_filtered = dfa_filtered.withColumn('Chegada_Diferenca', diff_ts('Chegada_Prevista', 'Chegada_Real', 'hours'))\
  .withColumn('Partida_Diferenca', diff_ts('Partida_Prevista', 'Partida_Real', 'hours'))

# Calcular médias dos atrasos
df_companies_avg_diffs = dfa_filtered.groupBy('Companhia_Aerea').agg(
    Func.avg('Partida_Diferenca').alias('Media_Atraso_Partida'),
    Func.avg('Chegada_Diferenca').alias('Media_Atraso_Chegada'))

# Tamanho do rank
n = 5

print('Companhias com pior pontualidade:')
rank_avg_delays(df_companies_avg_diffs, 'asc', n)

print('Companhias com melhor pontualidade:')
rank_avg_delays(df_companies_avg_diffs, 'desc', n)

print(f'Observação: médias de tempo em {units[unit]}. Atrasos negativos indicam que o vôo começou antes do tempo.')

Companhias com pior pontualidade:


                                                                                

+--------------------+--------------------+--------------------+
|     Companhia_Aerea|Media_Atraso_Partida|Media_Atraso_Chegada|
+--------------------+--------------------+--------------------+
|     US AIRWAYS INC.| -1.3563636363636362| -1.3257575757575757|
|               TOTAL|  -1.001130229419703| -0.6381916329284748|
|     SURINAM AIRWAYS| -0.6516161616161616| -0.6026010101010102|
|     UNITED AIRLINES|-0.48233420214306394| -0.4482575538179361|
|TAAG LINHAS AEREA...| -0.3668213783403656| -0.5459774964838254|
+--------------------+--------------------+--------------------+
only showing top 5 rows

Companhias com melhor pontualidade:




+--------------------+--------------------+--------------------+
|     Companhia_Aerea|Media_Atraso_Partida|Media_Atraso_Chegada|
+--------------------+--------------------+--------------------+
|     KOREAN AIRLINES|  15.212195121951215|  15.325842044134722|
|LINEAS AE.COSTARR...|    14.6006329113924|  14.788818565400838|
|    CONDOR FLUGDINST|   7.163303878252331|   5.032891507118312|
|TRASAMERICA  AIRL...|   7.156831992850756|    7.33631367292225|
|           ETHIOPIAN|   5.810776439089691|  5.8095716198125835|
+--------------------+--------------------+--------------------+
only showing top 5 rows

Observação: médias de tempo em horas. Atrasos negativos indicam que o vôo começou antes do tempo.


                                                                                

## Justificativas de Atrasos

### Objetivo
  - Analisar os motivos mais comuns de atrasos de voos.

### Processo
  - Agrupe os dados por Codigo.Justificativa e conte a frequência de cada justificativa.

### Resultado esperado/saída
  - Uma tabela que exibe as principais justificativas de atrasos, o que pode ajudar a entender os motivos mais frequentes.

In [4]:
# Cópia do dataframe rejeitando as justificativas NULL
dfa_filtered = dfa.filter(dfa.Codigo_Justificativa.isNotNull() & ~(dfa.Codigo_Justificativa == "NA"))

# Exibir frequencia de cada justificativa
dfa_filtered.groupBy('Codigo_Justificativa').agg(Func.count('Codigo_Justificativa').alias('Frequencia')).sort(desc('Frequencia')).show()

[Stage 9:>                                                          (0 + 8) / 8]

+--------------------+----------+
|Codigo_Justificativa|Frequencia|
+--------------------+----------+
|ANTECIPACAO DE HO...|    238359|
|          AUTORIZADO|     77323|
|CANCELAMENTO POR ...|     68328|
|AEROPORTO COM RES...|     39232|
|LIBERACAO SERV. T...|     35658|
|ATRASOS NAO ESPEC...|     26549|
| CONEXAO DE AERONAVE|     25062|
|CONEXAO AERONAVE/...|     23034|
|DEFEITOS DA AERONAVE|     20720|
|CONEXAO AERONAVE/...|     18372|
|SEGURANCA/PAX/CAR...|      8832|
|FALHA EQUIPO AUTO...|      8631|
|   TROCA DE AERONAVE|      7118|
|AEROPORTO ORIGEM ...|      5778|
|AEROPORTO DESTINO...|      5166|
|AEROPORTO DE ORIG...|      4350|
|AEROPORTO DE DEST...|      4093|
|FALTA PAX COM PAS...|      3666|
|CANCELAMENTO - AE...|      3629|
|FACILIDADES DO AE...|      3314|
+--------------------+----------+
only showing top 20 rows



                                                                                

## Fluxo de Voos entre Regiões

### Objetivo
- Observar padrões de movimentação entre cidades, estados ou países.

### Processo
  - Use Cidade.Origem, Estado.Origem, Pais.Origem e Cidade.Destino, Estado.Destino, Pais.Destino para identificar os pares mais frequentes de origem e destino.
  - Conte os voos entre esses pares e calcule a distância média usando as coordenadas (LongOrig, LatOrig, LongDest, LatDest).
  - Pesquise sobre a Fórmula de Haversine para este cálculo.

### Resultado esperado/saída
  - Identificação das rotas mais movimentadas e a média de distância percorrida entre elas.

In [5]:
from pyspark.sql.types import DoubleType
from math import sin, cos, sqrt, atan2, radians

"""
Fórmula de Haversine

Parâmetros:
  - lat_dest (double) : latitude destino (rad)
  - lat_orig (double) : latitude origem (rad)
  - lon_dest (double) : longitude destino (rad)
  - lon_orig (double) : longitude origem (rad)

Retorna:
  - double : distância entre os dois pontos (km)

Observações:
  - A Fórmula de Haversine é uma equação utilizada em navegação
  - Fornece a distância entre 2 pontos de uma esfera, a partir de suas latitudes e longitudes.
"""
def haversine(lon_dest, lat_dest, lon_orig, lat_orig):
  # Conversão para radianos
  lat_orig, lon_orig, lat_dest, lon_dest = map(radians, [lat_orig, lon_orig, lat_dest, lon_dest])

  # Raio da terra (km)
  r = 6371.0

  # Diferenças das latitudes e logintudes dos pontos em radianos
  d_lat = lat_dest - lat_orig
  d_lon = lon_dest - lon_orig

  # Calculo das constantes
  a = sin(d_lat/2) * sin(d_lat/2) + cos(lat_orig) * cos(lat_dest) * sin(d_lon/2) * sin(d_lon/2)
  c = 2 * atan2(sqrt(a), sqrt(1-a));

  # Retorna distancia
  return r * c

# Cópia do dataframe rejeitando linhas com pelo menos uma coordenada NULL
dfa_filtered = dfa.filter(
    dfa.LongOrig.isNotNull() & dfa.LongDest.isNotNull()\
    & dfa.LatOrig.isNotNull() & dfa.LatDest.isNotNull()
)

# Transformar função haversine em User Defined Function que deve aceitar colunas do tipo double
haversine_udf = Func.udf(haversine, DoubleType())

# Colunas com as coordenadas de latitude e longitude
coordinate_cols = [col for col in dfa_filtered.columns if col.endswith('Orig') or col.endswith('Dest')]

# Colunas de origem e destino dos pares
pairs_cols = [col for col in dfa_filtered.columns if not col.startswith('Aeroporto') and ( col.endswith('Origem') or col.endswith('Destino') )]

# Calcular distâncias de cada voo
dfa_filtered = dfa_filtered.withColumn('Distancia', haversine_udf(*[col(c).cast('double') for c in coordinate_cols]))

# Agrupar pares origem e destino pela média de distância
dfa_filtered.groupBy(*pairs_cols).agg(Func.avg('Distancia').alias('Media_Distancia')).sort(desc('Media_Distancia')).show()



+--------------------+-------------+--------------------+--------------------+--------------+--------------------+------------------+
|       Cidade_Origem|Estado_Origem|         Pais_Origem|      Cidade_Destino|Estado_Destino|        Pais_Destino|   Media_Distancia|
+--------------------+-------------+--------------------+--------------------+--------------+--------------------+------------------+
| Dubai International|          N/I|Emirados Arabes U...|           Guarulhos|            SP|              Brasil|12217.646630244011|
|           Guarulhos|           SP|              Brasil| Dubai International|           N/I|Emirados Arabes U...| 12217.64663024401|
|Ab Dhabi Internat...|          N/I|Emirados Arabes U...|           Guarulhos|            SP|              Brasil|12121.620224548631|
|           Guarulhos|           SP|              Brasil|Ab Dhabi Internat...|           N/I|Emirados Arabes U...|12121.620224548631|
|      Rio De Janeiro|           RJ|              Brasil| Duba

                                                                                