# 1. Инициализация PySpark фреймворка

In [1]:
import numpy as np
import pandas as pd
import os

Импорт библиотек Spark SQL и Spark ML

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

spark = SparkSession.builder.master("local[*]").getOrCreate()

Загрузка исходных данных

In [3]:
dataframe = spark.read.csv('filtered_data/filtered_data.csv', inferSchema=True, header=True, sep=';')
dataframe.limit(5).toPandas()

Unnamed: 0,timestamp,site_id,period_id,actual_consumption,actual_pv,load_00,load_19,load_38,load_57,load_76,load_95,pv_00,pv_19,pv_38,pv_57,pv_76,pv_95
0,2014-07-26 00:00:00,1,1,52.791303,0.0,53.083976,54.697004,53.738351,53.721872,54.154559,53.833967,0.170069,1.033124,56.012696,87.486744,9.815387,1.246214
1,2014-07-26 00:45:00,1,1,53.279074,0.0,53.270727,54.142639,53.746133,53.670674,54.898471,54.09777,0.170069,1.033124,67.235425,43.068871,2.825965,1.246214
2,2014-07-27 00:00:00,1,1,53.851506,0.0,53.913683,53.495005,54.775921,54.127523,54.686806,54.167568,0.170069,1.033124,60.532578,65.293437,6.004561,1.246214
3,2014-07-27 23:00:00,1,1,53.677345,0.0,53.761261,54.739164,85.821453,89.562035,80.436553,54.546472,0.170069,1.033124,33.672274,77.220921,12.140466,1.246214
4,2014-07-27 23:30:00,1,1,54.530195,0.0,54.018036,54.862038,91.571323,89.54265,80.405592,54.550434,0.170069,1.033124,38.496298,76.540738,6.389259,1.246214


##### 2. Подготовка данных

Выберем подмножество столбцов для использования в качестве признаков и создадим логическое поле метки с именем label со значениями 1 или 0. В частности, *1* для показателей нагрузок более 120 в *load_95* и *0* для нагрузок менее 120.

In [4]:
data = dataframe.select(
    "timestamp", 
    "site_id", 
    "period_id", 
    "actual_consumption", 
    "actual_pv", 
    "load_00", 
    ((col("load_95") > 5).cast("Int").alias("label"))).withColumn("timestamp", unix_timestamp("timestamp").cast(DoubleType()))
data.show(10)

+-----------+-------+---------+------------------+---------+------------------+-----+
|  timestamp|site_id|period_id|actual_consumption|actual_pv|           load_00|label|
+-----------+-------+---------+------------------+---------+------------------+-----+
|1.4063328E9|      1|        1|52.791302965157286|      0.0| 53.08397592004902|    1|
|1.4063355E9|      1|        1|53.279074149918486|      0.0|53.270726785432736|    1|
|1.4064192E9|      1|        1| 53.85150644791569|      0.0| 53.91368322423372|    1|
| 1.406502E9|      1|        1|53.677345300942264|      0.0|53.761260685969894|    1|
|1.4065038E9|      1|        1| 54.53019504155204|      0.0| 54.01803649121244|    1|
|1.4065056E9|      1|        2|54.009711154557934|      0.0| 54.43423188305628|    1|
|1.4065074E9|      1|        2| 53.56197103939957|      0.0| 53.60636743071373|    1|
|1.4065092E9|      1|        2| 53.66302849483053|      0.0| 53.43418449452174|    1|
|1.4069376E9|      1|        2| 54.27777135390649|    

## Разделим данные

Используем 70% данных для обучения, а 30% оставим для тестирования. В данных тестирования столбе ц*binary_load_00l переименован в* trueLablel, поэтому можно использовать его позже для сравнения прогнозируемых меток с известными фактическими значениями.

In [5]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 178564  Testing Rows: 76255


## Вычисление отношения между классами

In [6]:
positive_count = data.filter(col("label") == 1).count()
negative_count = data.filter(col("label") == 0).count()
balance_ratio = positive_count / negative_count
print("Positive to Negative Class Ratio:", balance_ratio)

Positive to Negative Class Ratio: 1.0297509996654506


Если balance_ratio близко к 1, это свидетельствует о балансе. Если значительно больше или меньше 1, это может указывать на дисбаланс.

Задача регрессии (Случайный лес)

In [7]:
# Создание столбца признаков для задачи регрессии
catVect = VectorAssembler(inputCols=["timestamp", "site_id", "period_id"], outputCol="reg_catFeatures")
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol="reg_idxCatFeatures")
numVect = VectorAssembler(inputCols=["actual_consumption", "actual_pv"], outputCol="reg_numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='reg_normFeatures')
featVect = VectorAssembler(inputCols=["reg_idxCatFeatures", "reg_numFeatures", "reg_normFeatures"], outputCol="reg_features")

# Создание модели RandomForestRegressor
rf = RandomForestRegressor(labelCol="load_00", featuresCol="reg_features", numTrees=10)

# Создание и выполнение пайплайна для задачи регрессии
reg_pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, rf])
reg_model = reg_pipeline.fit(train)
reg_prediction = reg_model.transform(test)

# Вывод результатов
reg_prediction.select("reg_features", "prediction", "load_00").show(10, truncate=False)

+---------------------------------------------------------------------+-----------------+------------------+
|reg_features                                                         |prediction       |load_00           |
+---------------------------------------------------------------------+-----------------+------------------+
|[1.3579614E9,13.0,0.0,5.580898705869913,0.0,0.023718226046800996,0.0]|7.641006302523164|6.5605630823838155|
|[1.3579623E9,13.0,0.0,7.4411982744932175,0.0,0.0318380397777904,0.0] |8.06078795352457 |7.10327551657503  |
|[1.358019E9,13.0,0.0,7.4411982744932175,0.0,0.0318380397777904,0.0]  |8.06078795352457 |7.178341268091895 |
|[1.3580325E9,13.0,0.0,7.4411982744932175,0.0,0.0318380397777904,0.0] |8.06078795352457 |7.652302695032091 |
|[1.3580334E9,13.0,0.0,7.4411982744932175,0.0,0.0318380397777904,0.0] |8.06078795352457 |7.696045778793563 |
|[1.3580379E9,13.0,0.0,7.4411982744932175,0.0,0.0318380397777904,0.0] |8.06078795352457 |7.399507696792104 |
|[1.3580577E9,13.0,

In [8]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="load_00", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(reg_prediction)
print("RMSE = ", rmse)

RMSE =  2.54711469272741


In [9]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="load_00", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(reg_prediction)
print("R² = ", r2)

R² =  0.9720313765480223


CrossValidator разделяет обучающий набор данных на заданное количество складок (фолдов), обучает модель на части данных и оценивает ее на оставшихся данных. Этот процесс повторяется для каждой комбинации гиперпараметров, и выбирается модель с наилучшей средней производительностью на всех фолдах.

In [10]:
# param_grid = ParamGridBuilder() \
#     .addGrid(rf.maxDepth, [5, 10, 15]) \
#     .addGrid(rf.numTrees, [10, 25, 50]) \
#     .addGrid(rf.minInstancesPerNode, [1, 2, 4]) \
#     .build()

# rf_cv = CrossValidator(
#     estimator=reg_pipeline, 
#     evaluator=RegressionEvaluator(), 
#     estimatorParamMaps=param_grid, 
#     numFolds=2
# )

# model = rf_cv.fit(train)

In [11]:
reg_evaluator = RegressionEvaluator(labelCol="load_00", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(reg_prediction)
print("RMSE = ", rmse)

RMSE =  2.54711469272741


Задача бинарной классификации (логистическая регрессия)

In [12]:
catVect = VectorAssembler(inputCols = ["timestamp", "site_id", "period_id"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["actual_consumption", "actual_pv"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, lr])

In [13]:
pipelineModel = pipeline.fit(train)

In [14]:
prediction = pipelineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

+-----------------------------------------------------------------+----------+---------+
|features                                                         |prediction|trueLabel|
+-----------------------------------------------------------------+----------+---------+
|[1.3579614E9,13.0,0.0,0.023718226046800996,0.0]                  |1.0       |1        |
|[1.3579623E9,13.0,0.0,0.0318380397777904,0.0]                    |1.0       |1        |
|[1.358019E9,13.0,0.0,0.0318380397777904,0.0]                     |1.0       |1        |
|[1.3580325E9,13.0,0.0,0.0318380397777904,0.0]                    |1.0       |1        |
|[1.3580334E9,13.0,0.0,0.0318380397777904,0.0]                    |1.0       |1        |
|[1.3580379E9,13.0,0.0,0.0318380397777904,0.0]                    |1.0       |1        |
|[1.3580577E9,13.0,0.0,0.0277781329122957,0.0]                    |1.0       |1        |
|[1.358109E9,13.0,0.0,0.0318380397777904,0.0]                     |1.0       |1        |
|[1.3581099E9,13.0,0.

In [15]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           31550.0|
|       FP|             266.0|
|       TN|           37439.0|
|       FN|            7000.0|
|Precision|0.9916394267035454|
|   Recall|0.8184176394293126|
|       F1| 0.896739902793963|
+---------+------------------+



In [16]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.9634936333344856


In [None]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, 
                                                                                            [0.4, 0.3]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)

In [None]:
newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

In [None]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

In [None]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print("AUR2 = ", aur2)