# Pacote

In [1]:
import pandas as pd
import chardet
import glob
from unidecode import unidecode
import os
import Levenshtein as lev
import csv 

pd.set_option('display.max_columns', None)

In [2]:
import sys

print(os.environ.get("SPARK_HOME"))
print(os.environ.get("HADOOP_HOME"))
print(os.environ.get("JAVA_HOME"))

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

C:\Users\pedro\spark-3.5.0-bin-hadoop3
C:\Users\pedro\hadoop3.0
C:\Program Files\Java\jdk1.8.0_202


In [3]:
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.types import *
from pyspark.sql.functions import lower, upper,row_number,isnan, when, count, col, coalesce, broadcast, regexp_replace, regexp_extract, lit, countDistinct
from pyspark.sql import functions as F, Window, Row
from pyspark.sql.functions import *
#from functools import reduce

#Pyspark
import py4j
from pyspark import SparkContext,SQLContext,SparkConf,StorageLevel

## Pacotes para configurar sessão no spark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
                            
## Pacote para localizar o path spark 
import findspark

from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import unicodedata

## Spark session

In [4]:
# Usa todos os núcleos disponíveis na máquina local.
# Define o nome da aplicação.
# Número de núcleos alocados para o driver Spark.
# Quantidade de memória alocada para o driver Spark.
# Nível de paralelismo padrão para todas as transformações em RDDs.
# Número de partições para usar quando fazer operações de shuffle.
# Número de instâncias do executor para iniciar.
# Número de núcleos para usar por executor.
# Quantidade de memória alocada para cada executor.
# Fração da heap do executor para armazenamento e execução.
# Proporção da memória de execução acima da qual o armazenamento será despejado para o disco.
# Habilita o uso de memória fora do heap.
# Tamanho da memória fora do heap alocada para o Spark.
# Tamanho máximo dos resultados do driver.
# Memória adicional alocada por executor.
# Habilita a avaliação antecipada e a visualização dos DataFrames no Spark SQL REPL.
# Número máximo de linhas para mostrar quando a avaliação antecipada está habilitada.
# Tamanho máximo do buffer para serialização Kryo.
# Tamanho máximo das tabelas na realização do broadcast join 
# Usa KryoSerializer para serialização, oferecendo melhor desempenho.
# Classe de registrator Kryo para registrar classes personalizadas com Kryo.
# Comprime os dados shuffle para economizar espaço em disco.
# Define o nível de armazenamento para RDDs persistidos, usando tanto a memória quanto o disco.
# Comprime RDDs armazenados em memória.

spark = (SparkSession.builder 
    .master("local[*]") 
    .appName("Spark Optimization")   
    .config("spark.driver.cores", "2")   
    .config("spark.driver.memory", "8g")   
    .config("spark.default.parallelism", "24")   
    .config("spark.sql.shuffle.partitions", "24")   
    .config("spark.executor.instances", "3")   
    .config("spark.executor.cores", "2")   
    .config("spark.executor.memory", "10g")   
    .config("spark.memory.fraction", "0.6")  
    .config("spark.memory.storageFraction", "0.5")   
    .config("spark.memory.offHeap.enabled", "true")   
    .config("spark.memory.offHeap.size", "4g")   
    .config("spark.driver.maxResultSize", "4g")   
    .config("spark.executor.memoryOverhead", "2g")   
    .config("spark.sql.repl.eagerEval.enabled", True)   
    .config("spark.sql.repl.eagerEval.maxNumRows", 10)  
#    .config("spark.kryoserializer.buffer.max", "512m")  
    .config("spark.sql.autoBroadcastJoinThreshold", "400m")   
#    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")   
#    .config("spark.kryo.registrator", "MyKryoRegistrator")   
    .config("spark.shuffle.compress", "true")   
    .config("spark.storage.level", "MEMORY_AND_DISK")   
    .config("spark.rdd.compress", "true")   
    .getOrCreate())

In [5]:
spark

# Carregando dados

## Df meteorologia

In [6]:
df = (spark.read.parquet("DADOS_METEROLOGIA/DADOS_METEOROLOGICOS_TRATADOS/dados_meteorologicos_2019.parquet").repartition(12))
df = df.withColumn("hora_utc", col("hora_utc").cast("int"))
df = df.withColumnRenamed("estacao", "cidade")

In [7]:
# Renomeando as colunas para aeroportos de origem
df_origem = df.select(
    *[col(c).alias(c + '_origem') for c in df.columns])

# Renomeando as colunas para aeroportos de destino
df_destino = df.select(
    *[col(c).alias(c + '_destino') for c in df.columns])

df_origem = df_origem.withColumnRenamed("data_origem", "data_partida") \
                 .withColumnRenamed("hora_utc_origem", "hora_partida")

df_destino = df_destino.withColumnRenamed("data_destino", "data_chegada")\
                 .withColumnRenamed("hora_utc_destino", "hora_chegada")

## Df voos

In [8]:
df_voos=spark.read.option("header", "true").csv("dados_tratados/historico_voo_tratados_2019.csv").repartition(6)

df_voos = df_voos.withColumn('partida_prevista_data', to_date(col('partida_prevista')))
df_voos = df_voos.withColumn('chegada_prevista_data', to_date(col('chegada_prevista')))

In [9]:
df_voos = df_voos.withColumnRenamed("partida_prevista_data", "data_partida") \
                 .withColumnRenamed("hora_partida", "hora_partida") \
                 .withColumnRenamed("cidade_origem", "cidade_origem") \
                 .withColumnRenamed("uf_origem_x", "uf_origem")

df_voos = df_voos.withColumnRenamed("chegada_prevista_data", "data_chegada") \
                 .withColumnRenamed("uf_destino_x", "uf_destino")

# Convertendo as colunas data_partida e data_chegada para string
df_voos = df_voos.withColumn("data_partida", col("data_partida").cast("string")) \
       .withColumn("data_chegada", col("data_chegada").cast("string"))

## Join entre os datasets

In [10]:
# Colunas para realizar o join
join_cols = ['data_chegada', 'hora_chegada', 'cidade_destino', 'uf_destino']

# Realizando o join
df_joined = df_voos.join(df_destino, on=join_cols, how='inner').drop(df_destino.data_chegada, df_destino.hora_chegada, df_destino.cidade_destino, df_destino.uf_destino, df_destino.altitude_destino)

# Colunas para realizar o join
join_cols = ['data_partida','hora_partida','cidade_origem','uf_origem']

# Realizando o join
df_joined_final = df_joined.join(df_origem, on=join_cols, how='inner').drop(df_origem.data_partida, df_origem.hora_partida, df_origem.cidade_origem, df_origem.uf_origem, df_origem.altitude_origem)

df_joined_final = df_joined_final.dropDuplicates()

# Salvando o dataset tratado com dados de meterologia

In [11]:
# Reparticionando o DataFrame para uma única partição
df_joined_final = df_joined_final.repartition(1)

# Salvando o DataFrame em um único arquivo CSV
(df_joined_final.write
     .option("header", "true")
     .mode("overwrite")
     .csv("dados_tratados/historico_voo_meteorologia.csv"))