# 1. Инициализация PySpark фреймворка

In [1]:
import numpy as np
import pandas as pd
import os

Импорт библиотек Spark SQL и Spark ML

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

spark = SparkSession.builder.master("local[*]").getOrCreate()

Загрузка исходных данных

In [3]:
dataframe = spark.read.csv('filtered_data/itineraries_fixed.csv', inferSchema=True, header=True, sep=';')
dataframe.limit(5).toPandas()

Unnamed: 0,startingAirport,destinationAirport,isNonStop,isBasicEconomy,isRefundable,baseFare,totalFare,seatsRemaining,totalTravelDistance,travelDuration
0,ATL,BOS,True,False,False,217.67,248.6,9,947,149
1,ATL,BOS,True,False,False,217.67,248.6,4,947,150
2,ATL,BOS,True,False,False,217.67,248.6,9,947,150
3,ATL,BOS,True,False,False,217.67,248.6,8,947,152
4,ATL,BOS,True,False,False,217.67,248.6,9,947,154


# Задача регрессии

### Подготовка данных

Выберем подмножество столбцов для использования в качестве признаков и создадим логическое поле метки с именем label со значениями 1 или 0. В качестве метки выбран столбец load_95

In [4]:
data = dataframe.select(
    "startingAirport", 
    "destinationAirport", 
    "isNonStop", 
    "isBasicEconomy", 
    "isRefundable", 
    "baseFare",
    "totalFare", 
    "seatsRemaining", 
    "totalTravelDistance",
    ((col("travelDuration")).alias("label")))
data.show(10)

+---------------+------------------+---------+--------------+------------+--------+---------+--------------+-------------------+-----+
|startingAirport|destinationAirport|isNonStop|isBasicEconomy|isRefundable|baseFare|totalFare|seatsRemaining|totalTravelDistance|label|
+---------------+------------------+---------+--------------+------------+--------+---------+--------------+-------------------+-----+
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             9|                947|  149|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             4|                947|  150|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             9|                947|  150|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             8|                947|  152|
|            ATL|               BOS|     true|         

## Разделим данные

Используем 70% данных для обучения, а 30% оставим для тестирования. В данных тестирования столбе ц*binary_load_00l переименован  в trueLale*l, поэтому можно использовать его позже для сравнения прогнозируемых меток с известными фактическими значениями.

In [5]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 652729  Testing Rows: 278816


Задача регрессии (Случайный лес)

In [6]:
# Создание столбца признаков для задачи регрессии
strIdx = StringIndexer(inputCols=["startingAirport", "destinationAirport"], outputCols=["startingAirport_idx", "destinationAirport_idx"])
oneHotEnc = OneHotEncoder(inputCols=["startingAirport_idx", "destinationAirport_idx"], outputCols=["startingAirport_enc", "destinationAirport_enc"])
catVect = VectorAssembler(inputCols=["startingAirport_enc", "destinationAirport_enc", "isNonStop", "isBasicEconomy", "isRefundable", "seatsRemaining"], outputCol="reg_catFeatures")
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol="reg_idxCatFeatures")
numVect = VectorAssembler(inputCols=["baseFare", "totalFare", "totalTravelDistance"], outputCol="reg_numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='reg_normFeatures')
featVect = VectorAssembler(inputCols=["reg_idxCatFeatures", "reg_numFeatures", "reg_normFeatures"], outputCol="reg_features")

# Создание модели RandomForestRegressor
rf = RandomForestRegressor(labelCol="label", featuresCol="reg_features", numTrees=10)

# Создание и выполнение пайплайна для задачи регрессии
reg_pipeline = Pipeline(stages=[strIdx, oneHotEnc, catVect, catIdx, numVect, minMax, featVect, rf])
reg_model = reg_pipeline.fit(train)
reg_prediction = reg_model.transform(test)

# Вывод результатов
reg_prediction.select("reg_features", "prediction", "trueLabel").show(10, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------+----------------+---------+
|reg_features                                                                                                                 |prediction      |trueLabel|
+-----------------------------------------------------------------------------------------------------------------------------+----------------+---------+
|(40,[8,18,34,35,36,37,38,39],[1.0,1.0,120.0,152.6,947.0,0.15140136054421768,0.1623849620643076,0.21527234818868907])         |387.318939844366|417      |
|(40,[8,18,33,34,35,36,37,38,39],[1.0,1.0,2.0,161.86,196.1,956.0,0.20835374149659863,0.21730019062527614,0.2176179306750065]) |387.318939844366|252      |
|(40,[8,18,33,34,35,36,37,38,39],[1.0,1.0,2.0,161.86,196.1,956.0,0.20835374149659863,0.21730019062527614,0.2176179306750065]) |387.318939844366|318      |
|(40,[8,18,33,34,35,36,37,38,39],[1.0,1.0,2.0,161.86,196.1,956.0,0.208

In [7]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(reg_prediction)
print("RMSE = ", rmse)

RMSE =  109.10754310014411


In [8]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(reg_prediction)
print("R² = ", r2)

R² =  0.7153895255963252


In [9]:
data.select(stddev('label')).show()

+------------------+
|     stddev(label)|
+------------------+
|204.51993614793133|
+------------------+



CrossValidator разделяет обучающий набор данных на заданное количество складок (фолдов), обучает модель на части данных и оценивает ее на оставшихся данных. Этот процесс повторяется для каждой комбинации гиперпараметров, и выбирается модель с наилучшей средней производительностью на всех фолдах.

In [10]:
param_grid = (ParamGridBuilder()
              .addGrid(rf.maxDepth, [2, 5, 10])
              .addGrid(rf.numTrees, [5, 10, 20])
              .build())

cv = CrossValidator(estimator=reg_pipeline,
                            estimatorParamMaps=param_grid,
                            evaluator=RegressionEvaluator(),
                            numFolds=3) 

# Обучение и подбор гиперпараметров
cv_model = cv.fit(train)

best_cv_model = cv_model.bestModel

# Оценка производительности на тестовом наборе
cv_prediction = best_cv_model.transform(test)

In [11]:
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(cv_prediction)
print("RMSE = ", rmse)

RMSE =  100.15816580956987


In [12]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(cv_prediction)
print("R² = ", r2)

R² =  0.7601641769719019


# Задача бинарной классификации

## Подготовка данных

Выберем подмножество столбцов для использования в качестве признаков и создадим логическое поле метки с именем label со значениями 1 или 0. В частности, *1* для показателей нагрузок более 35 в *load_95* и *0* для нагрузок менее 35.

In [13]:
data = dataframe.select(
    "startingAirport", 
    "destinationAirport", 
    "isNonStop", 
    "isBasicEconomy", 
    "isRefundable", 
    "baseFare",
    "totalFare", 
    "seatsRemaining", 
    "totalTravelDistance",
    ((col("travelDuration") > 380).cast("Int").alias("label")))
data.show(10)

+---------------+------------------+---------+--------------+------------+--------+---------+--------------+-------------------+-----+
|startingAirport|destinationAirport|isNonStop|isBasicEconomy|isRefundable|baseFare|totalFare|seatsRemaining|totalTravelDistance|label|
+---------------+------------------+---------+--------------+------------+--------+---------+--------------+-------------------+-----+
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             9|                947|    0|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             4|                947|    0|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             9|                947|    0|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             8|                947|    0|
|            ATL|               BOS|     true|         

## Разделим данные

Используем 70% данных для обучения, а 30% оставим для тестирования. В данных тестирования столбе ц*binary_load_00l переименован в* trueLablel, поэтому можно использовать его позже для сравнения прогнозируемых меток с известными фактическими значениями.

In [14]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 652305  Testing Rows: 279240


## Вычисление отношения между классами

In [15]:
positive_count = train.filter(col("label") == 1).count()
negative_count = train.filter(col("label") == 0).count()
balance_ratio = positive_count / negative_count
print("Positive to Negative Class Ratio:", balance_ratio)

Positive to Negative Class Ratio: 1.0451061108167508


Если balance_ratio близко к 1, это свидетельствует о балансе. Если значительно больше или меньше 1, это может указывать на дисбаланс.

In [19]:
# Создание столбца признаков для задачи регрессии
strIdx = StringIndexer(inputCols=["startingAirport", "destinationAirport"], outputCols=["startingAirport_idx", "destinationAirport_idx"])
oneHotEnc = OneHotEncoder(inputCols=["startingAirport_idx", "destinationAirport_idx"], outputCols=["startingAirport_enc", "destinationAirport_enc"])
catVect = VectorAssembler(inputCols=["startingAirport_enc", "destinationAirport_enc", "isNonStop", "isBasicEconomy", "isRefundable", "seatsRemaining"], outputCol="reg_catFeatures")
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol="reg_idxCatFeatures")
numVect = VectorAssembler(inputCols=["baseFare", "totalFare", "totalTravelDistance"], outputCol="reg_numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='reg_normFeatures')
featVect = VectorAssembler(inputCols=["reg_idxCatFeatures", "reg_numFeatures", "reg_normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
pipeline = Pipeline(stages=[strIdx, oneHotEnc, catVect, catIdx, numVect, minMax, featVect, lr])

In [20]:
pipelineModel = pipeline.fit(train)

In [21]:
prediction = pipelineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------+----------+---------+
|features                                                                                                                            |prediction|trueLabel|
+------------------------------------------------------------------------------------------------------------------------------------+----------+---------+
|(40,[8,18,34,35,36,37,38,39],[1.0,1.0,120.0,152.6,947.0,0.15140136054421768,0.1623849620643076,0.21527234818868907])                |0.0       |1        |
|(40,[8,18,33,34,35,36,37,38,39],[1.0,1.0,6.0,161.86,196.1,956.0,0.20835374149659863,0.21730019062527614,0.2176179306750065])        |0.0       |0        |
|(40,[8,18,34,35,36,37,38,39],[1.0,1.0,161.86,197.6,947.0,0.20835374149659863,0.21919381919634404,0.21527234818868907])              |0.0       |0        |
|(40,[8,18,33,34,35,36,37,38,39],[1.0,1.0,1.0,161.86,197.6,947.0

In [22]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          118221.0|
|       FP|           28846.0|
|       TN|          107879.0|
|       FN|           24294.0|
|Precision|0.8038581054893348|
|   Recall|0.8295337332912325|
|       F1|0.8164941191096132|
+---------+------------------+



In [23]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.9143883250778739


## Подбор гиперпараметров

In [24]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.3, 0.1])
             .addGrid(lr.maxIter, [10, 5])
             .addGrid(lr.threshold,[0.4, 0.3])
             .build())

cv = CrossValidator(estimator=pipeline, 
                    evaluator=BinaryClassificationEvaluator(), 
                    estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)

In [25]:
newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|(40,[8,18,34,35,3...|       0.0|        1|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,34,35,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        1|
|(40,[8,18,33,34,3...|       1.0|        1|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       1.0|        1|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0

In [26]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          141674.0|
|       FP|           64399.0|
|       TN|           72326.0|
|       FN|             841.0|
|Precision|0.6874942374789517|
|   Recall|0.9940988667859524|
|       F1|0.8128449631083112|
+---------+------------------+



In [27]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print("AUR2 = ", aur2)

AUR2 =  0.914386392883987
