# Лабораторная работа №2 Машинное обучение на больших данных
## Вариант: Задача регрессии - RandomForest, бинарной классификации - LogisticRegression


## 1. Инициализация PySpark фреймворка

In [2]:
import numpy as np
import pandas as pd
import os

Импорт библиотек Spark SQL и Spark ML

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

spark = SparkSession.builder.master("local[*]").getOrCreate()

Загрузка исходных данных

In [4]:
dataframe = spark.read.csv('new_csv/itineraries2.csv', inferSchema=True, header=True, sep=';')
dataframe.limit(5).toPandas()

Unnamed: 0,startingAirport,destinationAirport,isNonStop,isBasicEconomy,isRefundable,baseFare,totalFare,seatsRemaining,totalTravelDistance,travelDuration
0,ATL,BOS,True,False,False,217.67,248.6,9,947,149
1,ATL,BOS,True,False,False,217.67,248.6,4,947,150
2,ATL,BOS,True,False,False,217.67,248.6,9,947,150
3,ATL,BOS,True,False,False,217.67,248.6,8,947,152
4,ATL,BOS,True,False,False,217.67,248.6,9,947,154


# Задача регрессии

### Подготовка данных

In [5]:
data = dataframe.select(
    "startingAirport", 
    "destinationAirport", 
    "isNonStop", 
    "isBasicEconomy", 
    "isRefundable", 
    "baseFare",
    "totalFare", 
    "seatsRemaining", 
    "totalTravelDistance",
    ((col("travelDuration")).alias("label")))
data.show(10)

+---------------+------------------+---------+--------------+------------+--------+---------+--------------+-------------------+-----+
|startingAirport|destinationAirport|isNonStop|isBasicEconomy|isRefundable|baseFare|totalFare|seatsRemaining|totalTravelDistance|label|
+---------------+------------------+---------+--------------+------------+--------+---------+--------------+-------------------+-----+
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             9|                947|  149|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             4|                947|  150|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             9|                947|  150|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             8|                947|  152|
|            ATL|               BOS|     true|         

## Разделим данные

Используем 70% данных для обучения, а 30% оставим для тестирования.

In [6]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 651847  Testing Rows: 279698


Задача регрессии (Случайный лес)

In [10]:
## Создание столбца признаков для задачи регрессии
# StringIndexer - преобразование текстовых значений в категориальных столбцах в числовые индексы
# OneHotEncoder - OHE-кодирование числовых индексов от StringIndexer
# VectorAssembler - объединение всех категориальных признаков в один вектор
# VectorIndexer - автоматическое определение категориальных признаков вектора
# MinMaxScaler - масштабирование числовых признаков в диапазон от 0 до 1
strIdx = StringIndexer(inputCols=["startingAirport", "destinationAirport"], outputCols=["startingAirport_idx", "destinationAirport_idx"])
oneHotEnc = OneHotEncoder(inputCols=["startingAirport_idx", "destinationAirport_idx"], outputCols=["startingAirport_enc", "destinationAirport_enc"])
catVect = VectorAssembler(inputCols=["startingAirport_enc", "destinationAirport_enc", "isNonStop", "isBasicEconomy", "isRefundable", "seatsRemaining"], outputCol="reg_catFeatures")
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol="reg_idxCatFeatures")
numVect = VectorAssembler(inputCols=["baseFare", "totalFare", "totalTravelDistance"], outputCol="reg_numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='reg_normFeatures')
featVect = VectorAssembler(inputCols=["reg_idxCatFeatures", "reg_numFeatures", "reg_normFeatures"], outputCol="reg_features")

# Создание модели RandomForestRegressor
rf = RandomForestRegressor(labelCol="label", featuresCol="reg_features", numTrees=10)

# Создание и выполнение пайплайна для задачи регрессии
reg_pipeline = Pipeline(stages=[strIdx, oneHotEnc, catVect, catIdx, numVect, minMax, featVect, rf])
reg_model = reg_pipeline.fit(train)
reg_prediction = reg_model.transform(test)

# Вывод результатов
reg_prediction.select("reg_features", "prediction", "trueLabel").show(10, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------+-----------------+---------+
|reg_features                                                                                                                 |prediction       |trueLabel|
+-----------------------------------------------------------------------------------------------------------------------------+-----------------+---------+
|(40,[8,18,34,35,36,37,38,39],[1.0,1.0,161.86,196.1,956.0,0.20835374149659863,0.21730019062527614,0.2176179306750065])        |373.8983941909811|276      |
|(40,[8,18,33,34,35,36,37,38,39],[1.0,1.0,1.0,161.86,196.1,956.0,0.20835374149659863,0.21730019062527614,0.2176179306750065]) |373.8983941909811|252      |
|(40,[8,18,33,34,35,36,37,38,39],[1.0,1.0,2.0,161.86,196.1,956.0,0.20835374149659863,0.21730019062527614,0.2176179306750065]) |373.8983941909811|252      |
|(40,[8,18,33,34,35,36,37,38,39],[1.0,1.0,2.0,161.86,196.1,956.0

## Оценка качества модели

In [14]:
# RMSE - среднеквадратичное отклонение между фактическими и предсказанными значениями
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(reg_prediction)
print("RMSE = ", rmse)

RMSE =  108.97929136928654


In [15]:
# R² - доля общей дисперсии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(reg_prediction)
print("R² = ", r2)

R² =  0.7169901653137458


In [16]:
# MAE - абсолютное отклонение между фактическими и предсказанными значениями
mae_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(reg_prediction)
print(f"Mean Absolute Error (MAE): {mae}")

Mean Absolute Error (MAE): 77.01699282168157


In [17]:
# Стандартное отклонение
data.select(stddev('label')).show()

+------------------+
|     stddev(label)|
+------------------+
|204.51993614793133|
+------------------+



CrossValidator разделяет обучающий набор данных на заданное количество складок (фолдов), обучает модель на части данных и оценивает ее на оставшихся данных. Этот процесс повторяется для каждой комбинации гиперпараметров, и выбирается модель с наилучшей средней производительностью на всех фолдах.

In [18]:
## Настройка гиперпараметров модели
# maxDepth - максимальная глубина дерева, выбор наилучшего 2,5,10
# numTrees - количество деревьев в случайном лесу, выбор наилучшего из 5,10,20
param_grid = (ParamGridBuilder()
              .addGrid(rf.maxDepth, [2, 5, 10])
              .addGrid(rf.numTrees, [5, 10, 20])
              .build())

## Создание объекта для кросс-валидации
# estimator - оцениваемая модель (наш пайплайн c моделью и преобразованием данных)
# estimatorParamMaps - сетка параметров для оцениваемой модели
# evaluator - оценка качества модели на каждом этапе кросс-валидации (по умолчанию RMSE) 
# numFolds - количество складываний для кросс-валидации, т.е. на сколько частей будет разделен набор данных (в нашем случае разделение на 3 части и обучение 3 раза)
cv = CrossValidator(estimator=reg_pipeline,
                            estimatorParamMaps=param_grid,
                            evaluator=RegressionEvaluator(),
                            numFolds=3) 

# Обучение модели с использованием кросс-валидации
cv_model = cv.fit(train)

# Получение наилучшей модели после обучения с кросс-валидацией
best_cv_model = cv_model.bestModel

# Оценка производительности на тестовом наборе
cv_prediction = best_cv_model.transform(test)

In [19]:
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(cv_prediction)
print("RMSE = ", rmse)

RMSE =  100.50878933425463


In [20]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(cv_prediction)
print("R² = ", r2)

R² =  0.7592747468904791


In [22]:
mae_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(cv_prediction)
print(f"Mean Absolute Error (MAE): {mae}")

Mean Absolute Error (MAE): 69.20841925904261


# Задача бинарной классификации

## Подготовка данных


In [33]:
# 0 - для рейса с длительностью полета менее 400 минут,
# 1 - для рейса с длительностью полета более 400 минут
data = dataframe.select(
    "startingAirport", 
    "destinationAirport", 
    "isNonStop", 
    "isBasicEconomy", 
    "isRefundable", 
    "baseFare",
    "totalFare", 
    "seatsRemaining", 
    "totalTravelDistance",
    ((col("travelDuration") > 375).cast("Int").alias("label")))
data.show(10)

+---------------+------------------+---------+--------------+------------+--------+---------+--------------+-------------------+-----+
|startingAirport|destinationAirport|isNonStop|isBasicEconomy|isRefundable|baseFare|totalFare|seatsRemaining|totalTravelDistance|label|
+---------------+------------------+---------+--------------+------------+--------+---------+--------------+-------------------+-----+
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             9|                947|    0|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             4|                947|    0|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             9|                947|    0|
|            ATL|               BOS|     true|         false|       false|  217.67|    248.6|             8|                947|    0|
|            ATL|               BOS|     true|         

## Разделим данные

Используем 70% данных для обучения, а 30% оставим для тестирования..

In [34]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 651822  Testing Rows: 279723


## Вычисление отношения между классами

In [35]:
positive_count = train.filter(col("label") == 1).count()
negative_count = train.filter(col("label") == 0).count()
balance_ratio = positive_count / negative_count
print("Positive to Negative Class Ratio:", balance_ratio)

Positive to Negative Class Ratio: 1.0914388022922268


Если balance_ratio близко к 1, это свидетельствует о балансе. Если значительно больше или меньше 1, это может указывать на дисбаланс.

In [36]:
# Создание столбца признаков для задачи регрессии
strIdx = StringIndexer(inputCols=["startingAirport", "destinationAirport"], outputCols=["startingAirport_idx", "destinationAirport_idx"])
oneHotEnc = OneHotEncoder(inputCols=["startingAirport_idx", "destinationAirport_idx"], outputCols=["startingAirport_enc", "destinationAirport_enc"])
catVect = VectorAssembler(inputCols=["startingAirport_enc", "destinationAirport_enc", "isNonStop", "isBasicEconomy", "isRefundable", "seatsRemaining"], outputCol="reg_catFeatures")
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol="reg_idxCatFeatures")
numVect = VectorAssembler(inputCols=["baseFare", "totalFare", "totalTravelDistance"], outputCol="reg_numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='reg_normFeatures')
featVect = VectorAssembler(inputCols=["reg_idxCatFeatures", "reg_numFeatures", "reg_normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
pipeline = Pipeline(stages=[strIdx, oneHotEnc, catVect, catIdx, numVect, minMax, featVect, lr])

In [37]:
pipelineModel = pipeline.fit(train)

In [38]:
prediction = pipelineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------+----------+---------+
|features                                                                                                                      |prediction|trueLabel|
+------------------------------------------------------------------------------------------------------------------------------+----------+---------+
|(40,[8,18,34,35,36,37,38,39],[1.0,1.0,120.0,152.6,947.0,0.15140136054421768,0.1623849620643076,0.21527234818868907])          |0.0       |1        |
|(40,[8,18,34,35,36,37,38,39],[1.0,1.0,161.86,196.1,956.0,0.20835374149659863,0.21730019062527614,0.2176179306750065])         |0.0       |0        |
|(40,[8,18,33,34,35,36,37,38,39],[1.0,1.0,1.0,161.86,196.1,956.0,0.20835374149659863,0.21730019062527614,0.2176179306750065])  |0.0       |0        |
|(40,[8,18,33,34,35,36,37,38,39],[1.0,1.0,1.0,161.86,196.1,956.0,0.20835374149659863,0.2173001906252

In [39]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          123930.0|
|       FP|           30253.0|
|       TN|          103284.0|
|       FN|           22256.0|
|Precision|0.8037851124961896|
|   Recall| 0.847755599031371|
|       F1|0.8251850224224203|
+---------+------------------+



In [40]:
# Метрика Area Under the ROC Curve
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.9170446668967376


## Подбор гиперпараметров

In [42]:
## Настройка гиперпараметров
# regParam - коэфф регуляции
# maxIter - максимальное кол-во итераций при обучении
# threshold порог классификации
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.3, 0.2])
             .addGrid(lr.maxIter, [15, 10])
             .addGrid(lr.threshold,[0.45, 0.35])
             .build())

cv = CrossValidator(estimator=pipeline, 
                    evaluator=BinaryClassificationEvaluator(), 
                    estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)

In [43]:
newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|(40,[8,18,34,35,3...|       0.0|        1|
|(40,[8,18,34,35,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        1|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        1|
|(40,[8,18,34,35,3...|       0.0|        0|
|(40,[8,18,34,35,3...|       0.0|        1|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        1|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        1|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0|        0|
|(40,[8,18,33,34,3...|       0.0

In [44]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          144263.0|
|       FP|           59924.0|
|       TN|           73613.0|
|       FN|            1923.0|
|Precision|0.7065239216992267|
|   Recall|0.9868455255633234|
|       F1|0.8234824030390469|
+---------+------------------+



In [45]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print("AUR2 = ", aur2)

AUR2 =  0.917046791300733
