In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DoubleType, DateType, LongType
from pyspark.sql.functions import percentile_approx,length, regexp_replace,datediff, avg, split, expr, col, month, size, when, desc, sum as spark_sum, abs, row_number, udf
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression,DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.window import Window
import re


In [2]:
spark = SparkSession.builder \
    .appName("Ac2 BigData") \
    .getOrCreate()


In [3]:

schema = StructType([
    StructField("legId", StringType(), True),
    StructField("searchDate", DateType(), True),
    StructField("flightDate", DateType(), True),
    StructField("startingAirport", StringType(), True),
    StructField("destinationAirport", StringType(), True),
    StructField("fareBasisCode", StringType(), True),
    StructField("travelDuration", StringType(), True),
    StructField("elapsedDays", IntegerType(), True),
    StructField("isBasicEconomy", BooleanType(), True),
    StructField("isRefundable", BooleanType(), True),
    StructField("isNonStop", BooleanType(), True),
    StructField("baseFare", DoubleType(), True),
    StructField("totalFare", DoubleType(), True),
    StructField("seatsRemaining", IntegerType(), True),
    StructField("totalTravelDistance", DoubleType(), True),
    StructField("segmentsDepartureTimeEpochSeconds", StringType(), True),
    StructField("segmentsDepartureTimeRaw", StringType(), True),
    StructField("segmentsArrivalTimeEpochSeconds", StringType(), True),
    StructField("segmentsArrivalTimeRaw", StringType(), True),
    StructField("segmentsArrivalAirportCode", StringType(), True),
    StructField("segmentsDepartureAirportCode", StringType(), True),
    StructField("segmentsAirlineName", StringType(), True),
    StructField("segmentsAirlineCode", StringType(), True),
    StructField("segmentsDistance", StringType(), True),
    StructField("segmentsEquipmentDescription", StringType(), True),
    StructField("segmentsDurationInSeconds", StringType(), True)
])


In [4]:
df = spark.read.csv('./dataset/itinerariesReduce.csv', header=True, schema=schema)

print('dataset loaded')
df.printSchema()



dataset loaded
root
 |-- legId: string (nullable = true)
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- fareBasisCode: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- baseFare: double (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: double (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: string (nullable = true)
 |-- segmentsDepartureTimeRaw: string (nullable = true)
 |-- segmentsArrivalTimeEpochSeconds: string (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDeparture

In [5]:
df.createTempView('DATASET')

In [6]:
# NA PRÓXIMA CELULA IREMOS TIRAR OS CAMPOS QUE SÃO IDs, O CAMPO 'isBasicEconomy', QUE FOI A FEATURE QUE FILTRAMOS PARA DIMINUIR O DATASET, E O BASEFARE QUE É A FEATURE QUE MOSTRA O PREÇO DA PASSAGEM SEM AS TAXAS ALEM DA DE COLUNAS QUE O VALORES SAO TODOS IGUAIS

In [7]:
dfRemoveIdsColumns = spark.sql('''
SELECT 
--legId,
searchDate,
flightDate,
startingAirport,
destinationAirport,
fareBasisCode,
travelDuration,
elapsedDays,
isNonStop,
totalFare,
seatsRemaining,
totalTravelDistance,
segmentsDepartureTimeEpochSeconds,
segmentsDepartureTimeRaw,
segmentsArrivalTimeEpochSeconds,
segmentsArrivalTimeRaw,
segmentsArrivalAirportCode,
segmentsDepartureAirportCode,
segmentsAirlineName,
segmentsAirlineCode,
segmentsEquipmentDescription,
segmentsDurationInSeconds
FROM DATASET
''')

In [8]:
dfRemoveIdsColumns.createOrReplaceTempView("DATASET")
# Criando a coluna splitSegments
dfShowIntermediate = dfRemoveIdsColumns.withColumn(
    "splitSegments",
    split(col("segmentsDurationInSeconds"), '\\|\\|')  
)


In [9]:
#analise de total fare

analise = spark.sql('''
    select avg(totalFare) as media, min(totalFare) as _min, max(totalFare) as _max from DATASET
''').show()


+------------------+-----+-----+
|             media| _min| _max|
+------------------+-----+-----+
|178.86721614121868|38.61|928.6|
+------------------+-----+-----+



In [10]:
def duration_to_seconds(duration_str):
    if not duration_str:
        return None
    pattern = re.compile(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?')
    match = pattern.match(duration_str)
    if not match:
        return None
    hours = int(match.group(1) or 0)
    minutes = int(match.group(2) or 0)
    seconds = int(match.group(3) or 0)
    return hours * 3600 + minutes * 60 + seconds

duration_to_seconds_udf = udf(duration_to_seconds, IntegerType())


In [None]:
#coluna contando o tempo total de viagem
dfSegmentsDurationApply = dfShowIntermediate.withColumn(
    "travelDurationInSeconds",
    duration_to_seconds_udf(col("travelDuration"))
)

#coluna contendo o numero de conexoes que o passageiro vai fazer
dfNumConexaoApply = dfSegmentsDurationApply.withColumn(
    "numConexao",
    (size("splitSegments") - 1)
)

#o mes que vai ser feito a viagem
dfMonthTravelDurationApply = dfNumConexaoApply.withColumn(
    "mesVoo",
    month("flightDate")
)

#o mes que foi comprado a passagem
dfSearchTravelDurationApply = dfMonthTravelDurationApply.withColumn(
    "mesCompra",
    month("searchDate")
)

#quantos dias antecipados ele comprou a viagem
dfWithDaysDifference = dfSearchTravelDurationApply.withColumn(
    "diasAteViagem",
    datediff("flightDate","searchDate")
)


dfWithDaysDifference.createOrReplaceTempView('DATASET_FEATURE')
dfWithDaysDifference.printSchema()

root
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- fareBasisCode: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: double (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: string (nullable = true)
 |-- segmentsDepartureTimeRaw: string (nullable = true)
 |-- segmentsArrivalTimeEpochSeconds: string (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: string (nullable = true)
 |-- segmentsAirlineName: string (nullable = true)
 |-- segmentsAirlineCode: string (nullable = true)
 |-- segmentsEquipmentDescription: string (n

In [None]:
#one hot enconding de colunas categoricas e label enconding da coluna isNonStop

starting_airport_counts = dfWithDaysDifference.groupBy("startingAirport")\
    .count().withColumn("rank", row_number().over(Window.orderBy(desc("count"))))
destination_airport_counts = dfWithDaysDifference.groupBy("destinationAirport")\
    .count().withColumn("rank", row_number().over(Window.orderBy(desc("count"))))
fare_basis_code_counts = dfWithDaysDifference.groupBy("fareBasisCode")\
    .count().withColumn("rank", row_number().over(Window.orderBy(desc("count"))))

top_starting_airports = starting_airport_counts.filter(col("rank") <= 3).select("startingAirport")
top_destination_airports = destination_airport_counts.filter(col("rank") <= 3).select("destinationAirport")
top_fare_basis_codes = fare_basis_code_counts.filter(col("rank") <= 3).select("fareBasisCode")

dfOneHotEncoding = dfWithDaysDifference \
    .join(top_starting_airports, on="startingAirport", how="left") \
    .join(top_destination_airports, on="destinationAirport", how="left") \
    .join(top_fare_basis_codes, on="fareBasisCode", how="left")

top_starting_list    = [row[0] for row in top_starting_airports.collect()]
top_destination_list = [row[0] for row in top_destination_airports.collect()]
top_fare_basis_list  = [row[0] for row in top_fare_basis_codes.collect()]

for code in top_starting_list:
    dfOneHotEncoding = dfOneHotEncoding.withColumn(
        f"startingAirport_{code}",
        when(col("startingAirport") == code, 1).otherwise(0)
    )
dfOneHotEncoding = dfOneHotEncoding.withColumn(
    "startingAirport_outros",
    when(~col("startingAirport").isin(top_starting_list), 1).otherwise(0)
)

for code in top_destination_list:
    dfOneHotEncoding = dfOneHotEncoding.withColumn(
        f"destinationAirport_{code}",
        when(col("destinationAirport") == code, 1).otherwise(0)
    )
dfOneHotEncoding = dfOneHotEncoding.withColumn(
    "destinationAirport_outros",
    when(~col("destinationAirport").isin(top_destination_list), 1).otherwise(0)
)

for code in top_fare_basis_list:
    dfOneHotEncoding = dfOneHotEncoding.withColumn(
        f"fareBasisCode_{code}",
        when(col("fareBasisCode") == code, 1).otherwise(0)
    )
dfOneHotEncoding = dfOneHotEncoding.withColumn(
    "fareBasisCode_outros",
    when(~col("fareBasisCode").isin(top_fare_basis_list), 1).otherwise(0)
)


dfOneHotEncoding = dfOneHotEncoding.withColumn(
    "segmentsAirlineCode_clean", 
    split(col("segmentsAirlineCode"), "\|\|").getItem(0)
)

segments_counts = dfOneHotEncoding.groupBy("segmentsAirlineCode_clean")\
    .count().withColumn("rank", row_number().over(Window.orderBy(desc("count"))))
top_segments = segments_counts.filter(col("rank") <= 3).select("segmentsAirlineCode_clean")
top_segments_list = [row[0] for row in top_segments.collect()]

for code in top_segments_list:
    dfOneHotEncoding = dfOneHotEncoding.withColumn(
        f"segmentsAirlineCode_{code}", 
        when(col("segmentsAirlineCode_clean") == code, 1).otherwise(0)
    )
dfOneHotEncoding = dfOneHotEncoding.withColumn(
    "segmentsAirlineCode_outros",
    when(~col("segmentsAirlineCode_clean").isin(top_segments_list), 1).otherwise(0)
)

dfOneHotEncoding = dfOneHotEncoding.withColumn("isNonStop", col("isNonStop").cast("int"))



In [13]:

dfOneHotEncoding.createOrReplaceTempView('DATASET_NEW_FEATURE')

In [None]:
dfPosFeaturesApply = spark.sql('''
SELECT 
--searchDate,
--flightDate,
--startingAirport,
--destinationAirport,
--fareBasisCode,
travelDurationInSeconds,
--travelDuration,
--elapsedDays, --mto desbalanceada
isNonStop,
totalFare,
seatsRemaining,
totalTravelDistance,
--segmentsDepartureTimeEpochSeconds,
--segmentsDepartureTimeRaw,
--segmentsArrivalTimeEpochSeconds,
--segmentsArrivalTimeRaw,
--segmentsArrivalAirportCode,
--segmentsDepartureAirportCode,
--segmentsAirlineName,
--segmentsAirlineCode,
--segmentsEquipmentDescription,-
--segmentsDurationInSeconds,
numConexao,
mesVoo,
mesCompra,
diasAteViagem,
startingAirport_BOS,
startingAirport_LGA,
startingAirport_ORD,
startingAirport_outros,
destinationAirport_LGA,
destinationAirport_LAX,
destinationAirport_BOS,
destinationAirport_outros,
fareBasisCode_XAVQA0BQ,
fareBasisCode_VAVNA0BC,
fareBasisCode_XAVNA0BC,
fareBasisCode_outros,
segmentsAirlineCode_DL,
segmentsAirlineCode_AA,
segmentsAirlineCode_UA,
segmentsAirlineCode_outros
FROM DATASET_NEW_FEATURE
''')


dfPosFeaturesApply.createOrReplaceTempView('DATASET_NEW_FEATURES_APPLY')

In [None]:
#como nao tratei ainad os outliers irei substituir os valores nulos pela mediana da coluna

numeric_columns = [c for c, dtype in dfPosFeaturesApply.dtypes if dtype in ('int', 'double', 'float', 'long')]

medians = {}

for col_name in numeric_columns:
    median = dfPosFeaturesApply.approxQuantile(col_name, [0.5], 0.01)[0]
    medians[col_name] = median

df_cleaned = dfPosFeaturesApply.fillna(medians)

df_cleaned.show()

+-----------------------+---------+---------+--------------+-------------------+----------+------+---------+-------------+-------------------+-------------------+-------------------+----------------------+----------------------+----------------------+----------------------+-------------------------+----------------------+----------------------+----------------------+--------------------+----------------------+----------------------+----------------------+--------------------------+
|travelDurationInSeconds|isNonStop|totalFare|seatsRemaining|totalTravelDistance|numConexao|mesVoo|mesCompra|diasAteViagem|startingAirport_BOS|startingAirport_LGA|startingAirport_ORD|startingAirport_outros|destinationAirport_LGA|destinationAirport_LAX|destinationAirport_BOS|destinationAirport_outros|fareBasisCode_XAVQA0BQ|fareBasisCode_VAVNA0BC|fareBasisCode_XAVNA0BC|fareBasisCode_outros|segmentsAirlineCode_DL|segmentsAirlineCode_AA|segmentsAirlineCode_UA|segmentsAirlineCode_outros|
+-----------------------+-

In [None]:
#trato outliers usando a tecnica de capping

target_col = 'totalFare'
feature_columns = df_cleaned.columns
toRemove = [
    target_col,
    'startingAirport_BOS',
    'startingAirport_LGA',
    'startingAirport_ORD',
    'startingAirport_outros',
    'destinationAirport_LGA',
    'destinationAirport_LAX',
    'destinationAirport_BOS',
    'destinationAirport_outros',
    'fareBasisCode_XAVQA0BQ',
    'fareBasisCode_VAVNA0BC',
    'fareBasisCode_XAVNA0BC',
    'fareBasisCode_outros',
    'segmentsAirlineCode_DL',
    'segmentsAirlineCode_AA',
    'segmentsAirlineCode_UA',
    'segmentsAirlineCode_outros',
]#nao irei usar as colunas que ja fiz hotencoding ou labelencoding
for col_name_to_remove  in toRemove:
    if col_name_to_remove in feature_columns:
        feature_columns.remove(col_name_to_remove )
        
transformed_columns = []

for col_name in feature_columns:
    print('iqr aplicado na coluna',col_name)
    q1, q3 = df_cleaned.approxQuantile(col_name, [0.25, 0.75], 0.001)
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr

    capped_col = when(col(col_name) < lower_bound, lower_bound) \
        .otherwise(when(col(col_name) > upper_bound, upper_bound)
        .otherwise(col(col_name))) \
        .alias(col_name)

    transformed_columns.append(capped_col)


for col_name_to_remove  in toRemove:
    transformed_columns.append(col(col_name_to_remove))

df_cleaned_no_outliers = df_cleaned.select(*transformed_columns)

df_cleaned_no_outliers.createOrReplaceTempView('DATASET_NO_OUTLIERS')

iqr aplicado na coluna travelDurationInSeconds
iqr aplicado na coluna isNonStop
iqr aplicado na coluna seatsRemaining
iqr aplicado na coluna totalTravelDistance
iqr aplicado na coluna numConexao
iqr aplicado na coluna mesVoo
iqr aplicado na coluna mesCompra
iqr aplicado na coluna diasAteViagem


In [None]:
#vejo as correlações com minha coluna alvo 

correlacoes = []
for col in df_cleaned_no_outliers.columns:
    corr = df_cleaned_no_outliers.stat.corr(col, target_col)
    correlacoes.append((col, corr))
    print(col, corr)

schema = StructType([
    StructField("feature", StringType(), True),
    StructField("correlation_with_totalFare", DoubleType(), True)
])

correlation_df = spark.createDataFrame(correlacoes, schema)
correlation_df.orderBy("correlation_with_totalFare", ascending=False).show(truncate=False)

travelDurationInSeconds 0.5705771871427221
isNonStop -0.34015940747271106
seatsRemaining 0.002507499209874932
totalTravelDistance 0.5473811435429348
numConexao 0.3761745260013003
mesVoo -0.02051716346918684
mesCompra 0.02302979982845782
diasAteViagem -0.11273417522564888
totalFare 1.0
startingAirport_BOS -0.0975652905679468
startingAirport_LGA -0.06213087300304373
startingAirport_ORD -0.07915367583869147
startingAirport_outros 0.155027887352178
destinationAirport_LGA -0.07515735271203196
destinationAirport_LAX 0.11388781932073541
destinationAirport_BOS -0.07985117272499727
destinationAirport_outros 0.027487818809271116
fareBasisCode_XAVQA0BQ 0.03413214480455624
fareBasisCode_VAVNA0BC -0.06718081088507068
fareBasisCode_XAVNA0BC 0.0006388616784803636
fareBasisCode_outros 0.017846107756821612
segmentsAirlineCode_DL 0.09123183925911629
segmentsAirlineCode_AA -0.08509317429761012
segmentsAirlineCode_UA -0.08194380490376088
segmentsAirlineCode_outros 0.15078238628285745
+--------------------

In [None]:
#por tentativa e erro cheguei que a melhor quantidade de features para a melhor acuracia do modelo é 8, logo seleciono as top 8 colunas com mais correlação
top_n = 8

melhores_features_df = correlation_df.filter(correlation_df.feature != "totalFare") \
    .withColumn("abs_corr", abs(correlation_df["correlation_with_totalFare"])) \
    .orderBy("abs_corr", ascending=False) \
    .limit(top_n)

melhores_features = [row["feature"] for row in melhores_features_df.collect()]
print(melhores_features)

['travelDurationInSeconds', 'totalTravelDistance', 'numConexao', 'isNonStop', 'startingAirport_outros', 'segmentsAirlineCode_outros', 'destinationAirport_LAX', 'diasAteViagem']


In [20]:
dfFinal = spark.sql(f'''
    select {", ".join(melhores_features)}, totalFare from DATASET_NO_OUTLIERS
''')
dfFinal.count()

11825266

In [None]:
#removo linhas duplicadas no meu dataset 

dfFinalNoDuplicados = dfFinal.dropDuplicates()
print('df antes da remocao',dfFinal.count())
print('df pos remocao',dfFinalNoDuplicados.count())
print('dados duplicados removidos',dfFinal.count() -  dfFinalNoDuplicados.count() )

df antes da remocao 11825266
df pos remocao 5817853
dados duplicados removidos 6007413


In [None]:
#faço a normalizacao dos meus dados

feature_columns = dfFinalNoDuplicados.columns
feature_columns.remove(target_col)

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_vector = assembler.transform(dfFinalNoDuplicados)

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)

In [23]:
df_final = df_scaled.selectExpr("totalFare as label", "scaled_features as features")



In [None]:
#split do dataset entre treino e teste

train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)
print('quantidade para treino',train_data.count(),)
print('quantidade para teste', test_data.count())

quantidade para treino 4655714
quantidade para teste 1162139


In [None]:
#treinamento do modelo 1
lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)
lr_predictions = lr_model.transform(test_data)

In [None]:
#treinamento do modelo 2

dt = DecisionTreeRegressor(featuresCol="features", labelCol="label")
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)

In [None]:
#resultado do RMSE 

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

print("Linear Regression RMSE:", evaluator.evaluate(lr_predictions))
print("Decision Tree RMSE:", evaluator.evaluate(dt_predictions))

Linear Regression RMSE: 59.58187450227922
Decision Tree RMSE: 57.52245515543787


In [None]:
#resultado do R^2 

evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
print("Linear Regression R²:", evaluator_r2.evaluate(lr_predictions))
print("Decision Tree R²:", evaluator_r2.evaluate(dt_predictions))

Linear Regression R²: 0.33269191775381246
Decision Tree R²: 0.3780250576827525


In [None]:
#resultado do MAE

evaluator_mae = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="mae"
)
print("Linear Regression MAE:", evaluator_mae.evaluate(lr_predictions))
print("Decision Tree MAE:", evaluator_mae.evaluate(dt_predictions))

Linear Regression MAE: 46.24391752583568
Decision Tree MAE: 44.469013028981145
