# 1. Инициализация PySpark фреймворка

In [1]:
import numpy as np
import pandas as pd
import os

Импорт библиотек Spark SQL и Spark ML

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

spark = SparkSession.builder.master("local[*]").getOrCreate()

Загрузка исходных данных

In [3]:
dataframe = spark.read.csv('filtered_data/filtered_data.csv', inferSchema=True, header=True, sep=';')
dataframe.limit(5).toPandas()

Unnamed: 0,timestamp,site_id,period_id,actual_consumption,actual_pv,load_00,load_19,load_38,load_57,load_76,load_95,pv_00,pv_19,pv_38,pv_57,pv_76,pv_95
0,2014-07-19 18:45:00,1,0,51.625703,22.712489,52.816828,53.4722,53.393106,53.391791,53.389783,53.398708,18.321836,2.266848,1.448687,55.566863,96.485692,24.636241
1,2014-07-19 19:30:00,1,0,52.281257,6.618605,51.452796,54.065122,53.393111,53.391791,53.389783,53.400716,6.339899,1.460266,1.288886,76.16395,96.338334,7.928666
2,2014-07-19 20:00:00,1,0,50.719565,1.452209,51.313898,53.389783,53.389783,53.389783,53.389783,53.398708,0.936193,1.01362,1.200396,78.826993,96.411283,3.299011
3,2014-07-19 20:15:00,1,0,51.901162,0.580877,51.950475,53.389783,53.927008,53.407345,53.395327,53.398708,0.219761,0.996226,1.19695,77.633953,90.381203,2.130662
4,2014-07-19 21:00:00,1,0,51.250007,0.0,52.21882,53.389783,53.389783,53.826278,53.398708,53.398708,0.143507,1.02761,1.840154,98.784155,86.598387,1.246205


# Задача регрессии

### Подготовка данных

Выберем подмножество столбцов для использования в качестве признаков. В качестве метки выбран столбец load_95

In [24]:
data = dataframe.select(
    "timestamp", 
    "site_id", 
    "period_id", 
    "actual_consumption", 
    "actual_pv", 
    "load_00", 
    ((col("load_95")).alias("label"))).withColumn("timestamp", unix_timestamp("timestamp").cast(DoubleType()))
data.show(10)

+-----------+-------+---------+------------------+------------------+------------------+------------------+
|  timestamp|site_id|period_id|actual_consumption|         actual_pv|           load_00|             label|
+-----------+-------+---------+------------------+------------------+------------------+------------------+
|1.4057955E9|      1|        0| 51.62570299494799| 22.71248932566911| 52.81682785868848| 53.39870789573359|
|1.4057982E9|      1|        0| 52.28125674965801| 6.618605013254157|51.452796259410526|53.400716200165164|
|   1.4058E9|      1|        0| 50.71956514220455|1.4522094578011435| 51.31389786752856| 53.39870789573359|
|1.4058009E9|      1|        0| 51.90116154382357|0.5808771932574699| 51.95047496345374| 53.39870789573359|
|1.4058036E9|      1|        0| 51.25000680775122|               0.0|  52.2188201830341| 53.39870789573359|
|1.4058099E9|      1|        0| 51.79032626969339|               0.0| 51.85754836350091| 53.39870789573359|
|1.4058108E9|      1|       

## Разделим данные

Используем 70% данных для обучения, а 30% оставим для тестирования. В данных тестирования столбе ц*binary_load_00l переименован  в trueLale*l, поэтому можно использовать его позже для сравнения прогнозируемых меток с известными фактическими значениями.

In [5]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 536294  Testing Rows: 228916


Задача регрессии (Случайный лес)

In [6]:
# Создание столбца признаков для задачи регрессии
catVect = VectorAssembler(inputCols=["timestamp", "site_id", "period_id"], outputCol="reg_catFeatures")
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(), outputCol="reg_idxCatFeatures")
numVect = VectorAssembler(inputCols=["actual_consumption", "actual_pv", "load_00"], outputCol="reg_numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(), outputCol='reg_normFeatures')
featVect = VectorAssembler(inputCols=["reg_idxCatFeatures", "reg_numFeatures", "reg_normFeatures"], outputCol="reg_features")

# Создание модели RandomForestRegressor
rf = RandomForestRegressor(labelCol="label", featuresCol="reg_features", numTrees=10)

# Создание и выполнение пайплайна для задачи регрессии
reg_pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, rf])
reg_model = reg_pipeline.fit(train)
reg_prediction = reg_model.transform(test)

# Вывод результатов
reg_prediction.select("reg_features", "prediction", "trueLabel").show(10, truncate=False)

+----------------------------------------------------------------------------------------------------------+-----------------+------------------+
|reg_features                                                                                              |prediction       |trueLabel         |
+----------------------------------------------------------------------------------------------------------+-----------------+------------------+
|[1.4058081E9,0.0,0.0,52.34192321577942,0.0,52.20040455667014,0.17756123215952097,0.0,0.49814911666225853] |68.48646233200081|53.41126047214196 |
|[1.4058171E9,0.0,0.0,52.13998826477083,0.0,51.93873162109794,0.17687427350186596,0.0,0.4974220089981448]  |64.16993687783966|53.39870789573359 |
|[1.4058198E9,0.0,0.0,52.094058511085656,0.0,52.30283050132776,0.1767180259489763,0.0,0.4984337264909433]  |68.48646233200081|53.40601078066252 |
|[1.4058216E9,0.0,0.0,52.509805763424545,0.0,52.50681618232782,0.17813234859576924,0.0,0.4990005392249399] |68.4864623320008

In [7]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(reg_prediction)
print("RMSE = ", rmse)

RMSE =  23.95271205534175


In [8]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(reg_prediction)
print("R² = ", r2)

R² =  0.8472003245635126


In [25]:
data.select(stddev('label')).show()

+-----------------+
|    stddev(label)|
+-----------------+
|61.16348168455061|
+-----------------+



CrossValidator разделяет обучающий набор данных на заданное количество складок (фолдов), обучает модель на части данных и оценивает ее на оставшихся данных. Этот процесс повторяется для каждой комбинации гиперпараметров, и выбирается модель с наилучшей средней производительностью на всех фолдах.

In [9]:
param_grid = (ParamGridBuilder()
              .addGrid(rf.maxDepth, [2, 5, 10])
              .addGrid(rf.numTrees, [5, 10, 20])
              .build())

cv = CrossValidator(estimator=reg_pipeline,
                            estimatorParamMaps=param_grid,
                            evaluator=RegressionEvaluator(),
                            numFolds=3) 

# Обучение и подбор гиперпараметров
cv_model = cv.fit(train)

best_cv_model = cv_model.bestModel

# Оценка производительности на тестовом наборе
cv_prediction = best_cv_model.transform(test)

In [10]:
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = reg_evaluator.evaluate(cv_prediction)
print("RMSE = ", rmse)

RMSE =  22.17533114797848


In [11]:
# Расчет метрик для задачи регрессии
reg_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="r2")
r2 = reg_evaluator.evaluate(cv_prediction)
print("R² = ", r2)

R² =  0.869035595586062


# Задача бинарной классификации

## Подготовка данных

Выберем подмножество столбцов для использования в качестве признаков и создадим логическое поле метки с именем label со значениями 1 или 0. В частности, *1* для показателей нагрузок более 35 в *load_95* и *0* для нагрузок менее 35.

In [12]:
data = dataframe.select(
    "timestamp", 
    "site_id", 
    "period_id", 
    "actual_consumption", 
    "actual_pv", 
    "load_00", 
    ((col("load_95") > 35).cast("Int").alias("label"))).withColumn("timestamp", unix_timestamp("timestamp").cast(DoubleType()))
data.show(10)

+-----------+-------+---------+------------------+------------------+------------------+-----+
|  timestamp|site_id|period_id|actual_consumption|         actual_pv|           load_00|label|
+-----------+-------+---------+------------------+------------------+------------------+-----+
|1.4057955E9|      1|        0| 51.62570299494799| 22.71248932566911| 52.81682785868848|    1|
|1.4057982E9|      1|        0| 52.28125674965801| 6.618605013254157|51.452796259410526|    1|
|   1.4058E9|      1|        0| 50.71956514220455|1.4522094578011435| 51.31389786752856|    1|
|1.4058009E9|      1|        0| 51.90116154382357|0.5808771932574699| 51.95047496345374|    1|
|1.4058036E9|      1|        0| 51.25000680775122|               0.0|  52.2188201830341|    1|
|1.4058099E9|      1|        0| 51.79032626969339|               0.0| 51.85754836350091|    1|
|1.4058108E9|      1|        0|52.460696718996665|               0.0|52.347502377808425|    1|
|1.4058117E9|      1|        0|  51.8311171697882|

## Разделим данные

Используем 70% данных для обучения, а 30% оставим для тестирования. В данных тестирования столбе ц*binary_load_00l переименован в* trueLablel, поэтому можно использовать его позже для сравнения прогнозируемых меток с известными фактическими значениями.

In [13]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 535153  Testing Rows: 230057


## Вычисление отношения между классами

In [14]:
positive_count = train.filter(col("label") == 1).count()
negative_count = train.filter(col("label") == 0).count()
balance_ratio = positive_count / negative_count
print("Positive to Negative Class Ratio:", balance_ratio)

Positive to Negative Class Ratio: 0.9988757204166931


Если balance_ratio близко к 1, это свидетельствует о балансе. Если значительно больше или меньше 1, это может указывать на дисбаланс.

In [15]:
catVect = VectorAssembler(inputCols = ["timestamp", "site_id", "period_id"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["actual_consumption", "actual_pv", "load_00"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, lr])

In [16]:
pipelineModel = pipeline.fit(train)

In [17]:
prediction = pipelineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

+----------------------------------------------------------------------------------+----------+---------+
|features                                                                          |prediction|trueLabel|
+----------------------------------------------------------------------------------+----------+---------+
|[1.4057964E9,0.0,0.0,0.18124589583040562,0.21960544110674235,0.5016218086850048]  |0.0       |1        |
|[1.4057982E9,0.0,0.0,0.1775816510563425,0.08714959621020989,0.4960717458348025]   |0.0       |1        |
|[1.4058E9,0.0,0.0,0.17225985610424724,0.019121773788672157,0.49568579042428357]   |0.0       |1        |
|[1.4058018E9,0.0,0.0,0.17575130028423444,9.495333290799344E-4,0.4973996261598298] |0.0       |1        |
|[1.4058027E9,0.0,0.0,0.17675226271600672,0.0,0.49580713444411095]                 |0.0       |1        |
|[1.4058045E9,0.0,0.0,0.1771920320512456,0.0,0.4976705981716822]                   |0.0       |1        |
|[1.4058063E9,0.0,0.0,0.17699095375742943,0.0,

In [18]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          102826.0|
|       FP|            1326.0|
|       TN|          113727.0|
|       FN|           12178.0|
|Precision|0.9872686074199247|
|   Recall|0.8941080310250078|
|       F1|0.9383817919655406|
+---------+------------------+



In [19]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.9925859525257447


## Подбор гиперпараметров

In [20]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.3, 0.1])
             .addGrid(lr.maxIter, [10, 5])
             .addGrid(lr.threshold,[0.4, 0.3])
             .build())

cv = CrossValidator(estimator=pipeline, 
                    evaluator=BinaryClassificationEvaluator(), 
                    estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)

In [21]:
newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[1.4057964E9,0.0,...|       0.0|        1|
|[1.4057982E9,0.0,...|       0.0|        1|
|[1.4058E9,0.0,0.0...|       0.0|        1|
|[1.4058018E9,0.0,...|       0.0|        1|
|[1.4058027E9,0.0,...|       0.0|        1|
|[1.4058045E9,0.0,...|       0.0|        1|
|[1.4058063E9,0.0,...|       0.0|        1|
|[1.4058072E9,0.0,...|       0.0|        1|
|[1.4058081E9,0.0,...|       0.0|        1|
|[1.4058099E9,0.0,...|       0.0|        1|
|[1.4058117E9,0.0,...|       0.0|        1|
|[1.4058126E9,0.0,...|       0.0|        1|
|[1.4058153E9,0.0,...|       0.0|        1|
|[1.4058198E9,0.0,...|       0.0|        1|
|[1.4058216E9,0.0,...|       0.0|        1|
|[1.4058261E9,0.0,...|       0.0|        1|
|[1.405827E9,0.0,0...|       0.0|        1|
|[1.4058279E9,0.0,...|       0.0|        1|
|[1.4058306E9,0.0,...|       0.0|        1|
|[1.4058315E9,0.0,...|       0.0

In [22]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          114038.0|
|       FP|           13586.0|
|       TN|          101467.0|
|       FN|             966.0|
|Precision|0.8935466683382436|
|   Recall|0.9916002921637508|
|       F1|0.9400234103236229|
+---------+------------------+



In [23]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print("AUR2 = ", aur2)

AUR2 =  0.9925864078388127
