Configuração Inicial

In [1]:
import findspark
findspark.init()
findspark.find()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

# Inicialização da sessão Spark
spark = SparkSession\
        .builder\
        .appName("Projeto_AJP")\
        .master("local[*]")\
        .config("spark.executor.memory", "8g")\
        .config("spark.driver.memory", "8g")\
        .config("spark.driver.maxResultSize", "1g")\
        .config("spark.memory.offHeap.enabled", False)\
        .enableHiveSupport()\
        .getOrCreate()

Carregar Dados

In [2]:
# Carregamento dos dados pré-processados
flights = spark.read.parquet("../data/processed/flights_cleaned.parquet")

Criar Variável Alvo

In [4]:
# Criar coluna binária para indicar atraso na partida (1 se atraso > 0, caso contrário 0)
flights = flights.withColumn('IsDelayed', when(col('DepDelayMinutes') > 0, 1).otherwise(0))

Retirar Registos de Voos Cancelados ou Desviados

In [6]:
# Filtrar voos cancelados ou desviados
flights = flights.filter((col('Cancelled') == 0) & (col('Diverted') == 0))

# Remover colunas que não serão mais necessárias
flights = flights.drop('Cancelled', 'Diverted', 'DepDelayMinutes')

Indexação e Codificação

In [8]:
# Indexar colunas categóricas
categorical_cols = ['Airline', 'Origin', 'Dest']
indexers = [StringIndexer(inputCol=col, outputCol=col+"_Index") for col in categorical_cols]

In [9]:
# OneHotEncoding para colunas categóricas indexadas
encoders = [OneHotEncoder(inputCol=col+"_Index", outputCol=col+"_OHE") for col in categorical_cols]

In [10]:
# Construir o vetor de características
assembler = VectorAssembler(inputCols=[
    'Year', 'Month', 'DayofMonth', 'DayOfWeek', 'Distance',
    'CRSDepTime', 'Airline_OHE', 'Origin_OHE', 'Dest_OHE'], outputCol='features')

In [11]:
# Normalização das características
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')

In [12]:
# Pipeline de transformação
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

In [13]:
# Ajustar e transformar os dados
flights_transformed = pipeline.fit(flights).transform(flights)

Transformação do Dataframe final

In [14]:
# Selecionar apenas as colunas necessárias para o modelo
flights_transformed = flights_transformed.select('scaledFeatures', 'IsDelayed')

In [15]:
flights_transformed.count()

28346578

Salvar Dados

In [16]:
# Salvar os dados transformados em formato Parquet
flights_transformed.write.mode("overwrite").parquet("../data/processed/flights_features.parquet")

print("Feature Engineering completa e dados salvos.")

Feature Engineering completa e dados salvos.


Encerrar Sessão Spark

In [17]:
spark.stop()