# Задание по теме PySpark

Для произвольно выбранного датасета провести обработку данных и построить предсказательную модель с использованием функционала PySpark. <br>

Мы выбрали датасет Walmart.csv и с помощью pyspark вместо pandas+sklearn решали задачу регрессии, то есть по имеющимся данным строили модель, описывающую пятничные продажи в зависимости от набора параметров.

- План работы: <br>
Следует загрузить файл csv датасета, используя pyspark.<br>
Произвести предобработку данных (выкинуть строки с пропусками). <br>
Выделить независимые переменные - характеристики (features) модели.<br>
Выделить колонку целевой (target) переменной - суммы пятничных продаж.<br>
Разделить данные на тренировочный и тестовый датасет.<br>
Создать модель (можно с использованием пайплайна).<br>
Подобрать гиперпараметры.<br>
Провести проверку модели на тестовой выборке. Рассчитать метрики на тестовой выборке. <br>

Загрузка необходимых библиотек

In [37]:
# Загрузка необходимых библиотек
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import joblib

Костыль для работы в VS Code (вроде и без него работало, но оставили на всякий случай):

In [38]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

Запуск сессии pyspark:

In [39]:
# Инициализация SparkSession
spark = SparkSession.builder \
    .appName("Walmart Sales Prediction") \
    .getOrCreate()

Загрузка данных

In [40]:
# Загрузка данных
data = spark.read.csv('Walmart.csv', header=True, inferSchema=True)

# Просмотр первых 15 строк
data.show(15)

# Проверка типов данных
print("Data types:")
data.printSchema()

+-----+----------+----------+------------+-----------+----------+-----------+------------+
|Store|      Date|     Sales|Holiday_Flag|Temperature|Fuel_Price|        CPI|Unemployment|
+-----+----------+----------+------------+-----------+----------+-----------+------------+
|    1|05-02-2010| 1643690.9|           0|      42.31|     2.572|211.0963582|       8.106|
|    1|12-02-2010|1641957.44|           1|      38.51|     2.548|211.2421698|       8.106|
|    1|19-02-2010|1611968.17|           0|      39.93|     2.514|211.2891429|       8.106|
|    1|26-02-2010|1409727.59|           0|      46.63|     2.561|211.3196429|       8.106|
|    1|05-03-2010|1554806.68|           0|       46.5|     2.625|211.3501429|       8.106|
|    1|12-03-2010|1439541.59|           0|      57.79|     2.667|211.3806429|       8.106|
|    1|19-03-2010|1472515.79|           0|      54.58|      2.72| 211.215635|       8.106|
|    1|26-03-2010|1404429.92|           0|      51.45|     2.732|211.0180424|       8.106|

In [41]:
# Выкидываем пропуски на тот случай, если они есть. Реально их нету :)
data.dropna()

DataFrame[Store: int, Date: string, Sales: double, Holiday_Flag: int, Temperature: double, Fuel_Price: double, CPI: double, Unemployment: double]

Среди данных есть колонка с датами. Для построения модели регрессии дата не нужна.
Остальные параметры - номер магазина, совпадение с праздником, температура, стоимость топлива, индекс потребительских цен, уровень безработицы, - являются потенциально важными для модели.

In [42]:
# Удаление колонки с датами
data = data.drop('Date')

In [43]:
# Предполагаем, что 'Sales' - это целевая переменная, а остальные - признаки
feature_columns = [column for column in data.columns if (column != 'Sales')]
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
data = assembler.transform(data)

# Разделение данных на тренировочную и тестовую выборки
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42) # в качестве затравочного случайного значения, естественно, используем 42 :)

# Определение модели и пайплайна
rf = RandomForestRegressor(featuresCol='features', labelCol='Sales')
pipeline = Pipeline(stages=[rf])

Поиск оптимальных значений гиперпараметров 

In [None]:
# Настройка сетки гиперпараметров
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [100, 150]) \
    .addGrid(rf.maxDepth, [15, 20]) \
    .addGrid(rf.minInstancesPerNode, [2, 5]) \
    .build()

# Кросс-валидация
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol='Sales', metricName='rmse'),
                          parallelism=6, # since we have 6 cores in CPU here
                          numFolds=5)

Обучение модели и выбор наилучшего набора параметров

In [45]:
# Обучение модели
cv_model = crossval.fit(train_data)

Можем посмотреть набор наилучших параметров

In [46]:
# Получение наилучшей модели из CrossValidator
best_model = cv_model.bestModel

# Получение параметров модели
rf_model = best_model.stages[0]  # RandomForestRegressor находится на первом месте в пайплайне

print("Best Parameters:")
print(f"Num Trees: {rf_model.getNumTrees}")
print(f"Max Depth: {rf_model.getMaxDepth()}")
print(f"Min Instances per Node: {rf_model.getMinInstancesPerNode()}")

Best Parameters:
Num Trees: 150
Max Depth: 20
Min Instances per Node: 2


Оценка модели на тестовой выборке

In [47]:
# Оценка модели на тестовой выборке
predictions = cv_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol='Sales', predictionCol='prediction', metricName='r2')
r2 = evaluator.evaluate(predictions)
rmse_evaluator = RegressionEvaluator(labelCol='Sales', predictionCol='prediction', metricName='rmse')
rmse = rmse_evaluator.evaluate(predictions)

print(f"R^2 Score: {r2}")
print(f"RMSE: {rmse}")

# Завершение сессии Spark
spark.stop()

R^2 Score: 0.8311193160437121
RMSE: 234101.61370698604


Выводы: <br>
На основе датасета Walmart, описывающего пятничные продажи в 45 магазинах соответствующей сети за период в несколько лет, и с помощью библиотеки pyspark составлена модель регрессии (методом случайного леса), позволяющая прогнозировать продажи для каждого из этих магазинов в зависимости от следующих параметров: 
- совпадение даты с праздничными днями
- температура воздуха
- стоимость топлива
- текущий индекс потребительских цен
- текущий уровень безработицы <br>

Поиск наилучших гиперпараметров шел c помощью GridSearch на весьма ограниченном наборе гиперпараметров. Впрочем, с учетом того, что данная задача решалась ранее в pandas + sklearn и с применением поиска гиперпараметров с помощью optuna и hyperopt, мы имели представление о более-менее оптимальных значениях гиперпараметров и использовали близкие к ним значения в сетке, по которой pyspark должен был подобрать оптимальные гиперпараметры. <br>
 В результате наилучшие результаты дала модель с <br>
- количеством деревьев: 150, <br>
- глубиной леса: 20, <br>
- минимальным числом образцов в узле, необходимым для разветвления дерева: 2. <br>
При этом объясненная дисперсия составила 0.83 (скромненько), среднеквадратичная ошибка предсказания на тестовом наборе данных 234 тыс. (вообще ужас какой-то). <br>
Отметим, что ранее, в случае подбора параметров с помощью optuna и работы в библиотеках pandas + sklearn наилучшими параметрами были: <br> 
- 'n_estimators': 144, <br>
- 'max_depth': 20, <br>
- 'min_samples_split': 2, <br>
которые дали объясненную дисперсию на уровне 0.999 и среднеквадратичную ошибку предсказания на тестовой выборке порядка 8 тыс. денежных единиц. Это еще раз показывает, что на небольших датасетах лучше не применять pyspark, потому как там отлично работают pandas + sklearn.