# `Промышленное машинное обучение на Spark`
## `Занятие 05: Feature Engineering`

### `Находнов Максим (nakhodnov17@gmail.com)`
#### `Москва, 2023`

О чём можно узнать из этого ноутбука:

* Accumulator/Broadcast
* Градиентный спуск
* Винзоризация

In [1]:
! pip3 install pyspark pyarrow


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.10 -m pip install --upgrade pip[0m


In [2]:
import numpy as np

import pyspark
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

from pyspark import SparkConf, SparkContext

conf = (
    SparkConf()
        .set('spark.ui.port', '4050')
        .setMaster('local[*]')
)
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/21 11:32:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### `Общие данные в Spark`

При работе в распределённых окружениях нужно учитывать, что разные воркеры имеют прямой доступ только к своим локальным данным. Как следствие, алгоритмы в таких системах должны учитывать отсутствие общей памяти в целом и проблемы синхронизации между отдельными процессами в частности.

Для решения этой проблемы в Spark предложены два средства: Accumulator и Broadcast.

#### `Accumulator`

Аккумуляторы в Spark представляют из себя общую переменную, которые отдельные воркеры могут обновлять, но не могут считывать (так как значение этой переменной однозначно не определено по причине необходимости дорогостоящей синхронизации между отдельными процессами). 

Такие переменные поддерживают единственную операцию: `+=` (inplace add, `.__iadd__`) — коммутативную, ассоциативную операцию сложения. После того, как воркеры перестанут изменять переменную её значение можно получить на драйвере через атрибут `.value`.

Пример использования аккумуляторов — подсчёт общих статистик в процессе вычислений, например, суммарное значение функции потерь (см. пример с GD ниже), общее число слов в датасете и так далее.

In [3]:
acc = sc.accumulator(value=0)
acc

Accumulator<id=0, value=0>

In [4]:
acc.value

0

In [5]:
rdd = sc.parallelize([1, 2, 3, -4, 5])
rdd.foreach(lambda x: acc.add(x))
acc.value

                                                                                

7

In [6]:
acc_sum = sc.accumulator(0)

def count(x):
    global acc_sum
    acc_sum += x
    
rdd.foreach(count)
acc_sum.value

7

In [7]:
acc_cnt = sc.accumulator(0)
rdd_02 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])

rdd.foreach(lambda x: acc_cnt.add(1))
rdd_02.foreach(lambda x: acc_cnt.add(1))
acc_cnt.value

13

Можно определять аккумуляторы собственного типа:

In [8]:
from pyspark.accumulators import AccumulatorParam

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return [0.0] * len(value)
    
    def addInPlace(self, value_left, value_right):
        for idx in range(len(value_left)):
             value_left[idx] += value_right[idx]
        return value_left
    
vector_acc = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())
vector_acc.value

[1.0, 2.0, 3.0]

In [9]:
def vector_add(x):
    global vector_acc
    vector_acc += [x] * 3
    
rdd = sc.parallelize([1, 2, 3])
rdd.foreach(vector_add)
vector_acc.value

[7.0, 8.0, 9.0]

#### `Broadcast`

Дополнением к WO общей памяти (аккумуляторам) являются RO переменные. Broadcast позволяет отправить на каждый воркер копию данных, которые затем можно использовать локально.

In [10]:
states = {"NY": "New York", "CA": "California", "FL": "Florida"}
broadcast_states = sc.broadcast(states)
broadcast_states.value

{'NY': 'New York', 'CA': 'California', 'FL': 'Florida'}

In [11]:
data = [
    ("James", "Smith", "USA", "CA"),
    ("Michael", "Rose", "USA", "NY"),
    ("Robert", "Williams", "USA", "CA"),
    ("Maria", "Jones", "USA", "FL")
]

rdd_03 = sc.parallelize(data)

def state_convert(code):
    return broadcast_states.value[code]

result = rdd_03.map(
    lambda x: (x[0], x[1], x[2], state_convert(x[3]))
).collect()
result

[('James', 'Smith', 'USA', 'California'),
 ('Michael', 'Rose', 'USA', 'New York'),
 ('Robert', 'Williams', 'USA', 'California'),
 ('Maria', 'Jones', 'USA', 'Florida')]

In [12]:
data = [
    ("James", "Smith", "USA", "CA"),
    ("Michael", "Rose", "USA", "NY"),
    ("Robert", "Williams", "USA", "CA"),
    ("Maria", "Jones", "USA", "FL")
]

rdd_03 = sc.parallelize(data)

def state_convert(code):
    return states[code]

result = rdd_03.map(
    lambda x: (x[0], x[1], x[2], state_convert(x[3]))
).collect()
result

[('James', 'Smith', 'USA', 'California'),
 ('Michael', 'Rose', 'USA', 'New York'),
 ('Robert', 'Williams', 'USA', 'California'),
 ('Maria', 'Jones', 'USA', 'Florida')]

#### `Broadcast JOIN`

Популярной вариантом использования Broadcast является объединение таблиц, одна из которых "маленького" размера. В таком случае может оказаться выгоднее отправить копию меньшей таблицы на каждый воркер и выполнить Join локально, нежели чем выполнять распределённое объединение таблиц. 

Нужно учитывать, что пересылка больших таблиц по сети может оказаться дорогостоящей, поэтому выбор между Broadcast Join и "обычным" Join зависит от конкретной конфигурации кластера.

In [13]:
# Можно задать размер DataFrame, при котором join будет автоматически происходить через broadcast этой таблицы
# Размер задаётся в байтах. В данном случае — 100Мб.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)

# Значение -1 отключает Broadcast Join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [14]:
path = "./m5-forecasting-accuracy"
! kaggle competitions download -c m5-forecasting-accuracy
! unzip m5-forecasting-accuracy.zip -d $path

In [15]:
# Зададим пути к файлам из датасета
file_calendar = f"{path}/calendar.csv"
file_validation = f"{path}/sales_train_validation.csv"
file_evaluation = f"{path}/sales_train_evaluation.csv"
file_prices = f"{path}/sell_prices.csv"
file_calendar = f"{path}/calendar.csv"

file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df_validation = (
    spark.read.format(file_type)
      .option("inferSchema", infer_schema)
      .option("header", first_row_is_header)
      .option("sep", delimiter)
      .load(file_validation)
)
df_evaluation = (
    spark.read.format(file_type)
      .option("inferSchema", infer_schema)
      .option("header", first_row_is_header)
      .option("sep", delimiter)
      .load(file_evaluation)
)
df_prices = (
    spark.read.format(file_type)
      .option("inferSchema", infer_schema)
      .option("header", first_row_is_header)
      .option("sep", delimiter)
      .load(file_prices)
)
df_calendar = (
    spark.read.format(file_type)
      .option("inferSchema", infer_schema)
      .option("header", first_row_is_header)
      .option("sep", delimiter)
      .load(file_calendar)
)

                                                                                

In [16]:
df_evaluation.limit(5).toPandas()

23/02/21 11:33:17 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,d_1,d_2,d_3,d_4,...,d_1932,d_1933,d_1934,d_1935,d_1936,d_1937,d_1938,d_1939,d_1940,d_1941
0,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,2,4,0,0,0,0,3,3,0,1
1,HOBBIES_1_002_CA_1_evaluation,HOBBIES_1_002,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,1,2,1,1,0,0,0,0,0
2,HOBBIES_1_003_CA_1_evaluation,HOBBIES_1_003,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,1,0,2,0,0,0,2,3,0,1
3,HOBBIES_1_004_CA_1_evaluation,HOBBIES_1_004,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,1,1,0,4,0,1,3,0,2,6
4,HOBBIES_1_005_CA_1_evaluation,HOBBIES_1_005,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,0,0,2,1,0,0,2,1,0


In [17]:
df_evaluation.select(
    df_evaluation.cat_id
).distinct().show()

+---------+
|   cat_id|
+---------+
|    FOODS|
|HOUSEHOLD|
|  HOBBIES|
+---------+



In [18]:
cat_id_hex =[
    ('FOODS', '0x001'),
    ('HOUSEHOLD', '0x002'),
    ('HOBBIES', '0x003')
]
small_df = spark.createDataFrame(data=cat_id_hex, schema=['cat_id', 'hex_code'])
small_df.show()

+---------+--------+
|   cat_id|hex_code|
+---------+--------+
|    FOODS|   0x001|
|HOUSEHOLD|   0x002|
|  HOBBIES|   0x003|
+---------+--------+



In [19]:
broadcast_join_df = df_evaluation.join(
  F.broadcast(small_df), small_df.cat_id == df_evaluation.cat_id
)
broadcast_join_df.limit(1).toPandas()

                                                                                

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,d_1,d_2,d_3,d_4,...,d_1934,d_1935,d_1936,d_1937,d_1938,d_1939,d_1940,d_1941,cat_id.1,hex_code
0,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,0,0,0,3,3,0,1,HOBBIES,0x003


In [20]:
broadcast_join_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [cat_id#3875], [cat_id#9791], Inner, BuildRight, false
   :- Filter isnotnull(cat_id#3875)
   :  +- FileScan csv [id#3872,item_id#3873,dept_id#3874,cat_id#3875,store_id#3876,state_id#3877,d_1#3878,d_2#3879,d_3#3880,d_4#3881,d_5#3882,d_6#3883,d_7#3884,d_8#3885,d_9#3886,d_10#3887,d_11#3888,d_12#3889,d_13#3890,d_14#3891,d_15#3892,d_16#3893,d_17#3894,d_18#3895,... 1923 more fields] Batched: false, DataFilters: [isnotnull(cat_id#3875)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/nakhodnov/HSE_DPO_Spark_2022/Seminars/Seminar 05/m5-foreca..., PartitionFilters: [], PushedFilters: [IsNotNull(cat_id)], ReadSchema: struct<id:string,item_id:string,dept_id:string,cat_id:string,store_id:string,state_id:string,d_1:...
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=208]
      +- Filter isnotnull(cat_id#9791)
         +- Scan ExistingRDD[cat_id#9791,hex_cod

In [21]:
join_df = df_evaluation.join(
  small_df, small_df.cat_id == df_evaluation.cat_id
)
join_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [cat_id#3875], [cat_id#9791], Inner
   :- Sort [cat_id#3875 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(cat_id#3875, 200), ENSURE_REQUIREMENTS, [plan_id=232]
   :     +- Filter isnotnull(cat_id#3875)
   :        +- FileScan csv [id#3872,item_id#3873,dept_id#3874,cat_id#3875,store_id#3876,state_id#3877,d_1#3878,d_2#3879,d_3#3880,d_4#3881,d_5#3882,d_6#3883,d_7#3884,d_8#3885,d_9#3886,d_10#3887,d_11#3888,d_12#3889,d_13#3890,d_14#3891,d_15#3892,d_16#3893,d_17#3894,d_18#3895,... 1923 more fields] Batched: false, DataFilters: [isnotnull(cat_id#3875)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/nakhodnov/HSE_DPO_Spark_2022/Seminars/Seminar 05/m5-foreca..., PartitionFilters: [], PushedFilters: [IsNotNull(cat_id)], ReadSchema: struct<id:string,item_id:string,dept_id:string,cat_id:string,store_id:string,state_id:string,d_1:...
   +- Sort [cat_id#9791 ASC NULLS FIRST], false, 0
      +- Ex

### `Spark RDD Gradient Descent`

$$
D = \{(x_{i}, y_{i}) | x_{i} \in \mathbb{R}^{d}, y \in \mathbb{R}\}_{1}^{n}
$$
$$
\hat{y}_{i} = \langle x, w \rangle + b
$$
$$
L_{i} = \frac{1}{2} (\hat{y}_{i} - y_{i})^{2}
$$
$$
\mathfrak{L}(w, b) = \frac{1}{n}\sum\limits_{i=1}^{n} L_{i} \longrightarrow \min_{w, b}
$$

Необходимо найти оптимальные $w \in \mathbb{R}^{d}, b \in \mathbb{R}$

Один из вариантов решения задачи: Градиентный Спуск (GD):

$$
w^{i+1} = w^{i} - \alpha \nabla_{w}\mathfrak{L}
$$
$$
b^{i+1} = b^{i} - \alpha \nabla_{b}\mathfrak{L}
$$

Модель данных:
$$
y_{i} \sim \mathcal{N}(\langle x_{i}, w^{*} \rangle, \sigma^{2})
$$

$$\nabla_{w} L = (\frac{\partial L}{\partial w_{1}}, ..., \frac{\partial L}{\partial w_{d}})$$


$$
\nabla_{w} \mathfrak{L} = \frac{1}{n} \sum\limits_{i=1}^{n} \nabla_{w}(\hat{y}_{i} - y_{i})(\hat{y}_{i} - y_{i}) = \frac{1}{n} \sum\limits_{i=1}^{n} (\hat{y}_{i} - y_{i}) \nabla_{w}\hat{y}_{i} = \frac{1}{n} \sum\limits_{i=1}^{n} (\hat{y}_{i} - y_{i}) x_{i} = \frac{1}{n} \sum\limits_{i=1}^{n} (\langle x_{i}, w \rangle - y_{i}) x_{i}
$$

In [22]:
X = np.random.randn(1000, 10)
w_star = np.random.randn(X.shape[1])
y = X.dot(w_star) + 0.001 * np.random.randn(X.shape[0])

In [23]:
from functools import partial

def gradient_descent(X, y, alpha=0.1, epochs=1):
    # YOUR CODE HERE:
    # 1. Init w
    # 2. In loop repeat:
    #     a. Calculate error, gradient
    #     b. Update w
    # 3. Return w
    
    X_rdd = sc.parallelize(X).cache()
    y_rdd = sc.parallelize(y).cache()
    n = X_rdd.count()
    d = X_rdd.take(1)[0].shape[0]
    
    # Кэшируем результат вычислений, чтобы не перевычислять его на каждой итерации
    X_y_rdd = X_rdd.zip(y_rdd).cache()
    w = np.zeros(d)
    for epoch in range(epochs):
        w_br = sc.broadcast(w)
        n_br = sc.broadcast(n)
        
        total_error = sc.accumulator(0.0)
        def grad_mapper(x_y, total_error):
            delta = (np.sum(x_y[0] * w_br.value) - x_y[1])
            error = (delta ** 2.0) / 2.0
            total_error.add(error / n_br.value)
            return x_y[0] * delta
        
        grad = X_y_rdd.map(partial(grad_mapper, total_error=total_error)).sum() / n
        w = w - alpha * grad
        
        if epoch % 3 == 0:
            print('Epoch: {0:d}, Loss {1:.3f}'.format(epoch, total_error.value / n))
    return w

In [24]:
gradient_descent(X, y, alpha=0.1, epochs=10)

Epoch: 0, Loss 0.006
Epoch: 3, Loss 0.003
Epoch: 6, Loss 0.002
Epoch: 9, Loss 0.001


array([ 0.42157966, -0.11659553, -0.68411146, -1.03660564, -0.80783741,
       -0.68547898,  1.01404414,  0.19389612, -0.18162573,  1.01462433])

In [25]:
w_star

array([ 0.56935807, -0.12347339, -0.99334668, -1.63083152, -1.24828304,
       -1.01660745,  1.57347421,  0.19636134, -0.25449749,  1.57294227])

### `Spark Winsorizing`

Винзоризация — это метод предобработки численных данных, при котором значения за пределами заданных квантилей заменяются на значения этих квантилей. 

Например, $X = \{100, 6, 52, 26, 8, 81, 52, 15, 2, 74, 93, 82, 36, 22, 74, 90, 97, 50, 4, 40, 1\}$.

Винзоризация для $0.1$ и $0.9$ квантилей выполняется следующим образом:
1. Определяем квантили: $q_{0.1} = 3, q_{0.9} = 93$
2. Заменяем все значения меньше $3$ на $3$ и больше $93$ на $93$: 

$$\hat{X} = \{93, 6, 52, 26, 8, 81, 52, 15, 3, 74, 93, 82, 36, 22, 74, 90, 97, 50, 4, 40, 3\}$$

Такая предобработка убирает экстремальные значения и выбросы, что приводит к более надёжному посчёту статистик по выборке (матожидание, дисперисия и так далее).

In [26]:
def winsorizing(
    df: pyspark.sql.dataframe.DataFrame, 
    column: str = 'sales',
    lower_percentile: float = 0.1,
    higher_percentile: float = 0.9
) -> pyspark.sql.dataframe.DataFrame:
    # YOUR CODE HERE:
    
    wspec = Window().partitionBy()
    
    lp_column = '_'.join([column, 'lower_percentile'])
    hp_column = '_'.join([column, 'higher_percentile'])
    df = df.withColumns({
        lp_column: F.percentile_approx(F.col(column), lower_percentile).over(wspec),
        hp_column: F.percentile_approx(F.col(column), higher_percentile).over(wspec)
    })
    
    df = (
        df
            .withColumn(
                '_'.join([column, 'winsorized']),
                F.when(
                    F.col(column) < F.col(lp_column),
                    F.col(lp_column)
                ).otherwise(
                    F.when(
                        F.col(column) > F.col(hp_column),
                        F.col(hp_column)
                    ).otherwise(
                        F.col(column)
                    )
                )
            )
    )
    
    return df

In [27]:
data = [
    (0.77347701,  ),
    (0.77617723, ),
    (-0.26191574,  ),
    (0.06015559, ),
    (-0.18058041,),
    (1.15605904, ),
    (-0.54163328,  ),
    (0.83280377,),
    (-0.69920523, ),
    (-0.33986035,),
    (-0.94114708, ),
    (-0.88438698,  ),
    (1.18682329,  ),
    (1.21287342, ),
    (-0.82575258,),
    (0.5895868, ),
    (-1.646899, ),
    (-1.5341987, ),
    (-0.94135006,  ),
    (0.5699716,)
]
df = spark.createDataFrame(data, ['sales'])
df.show()

+-----------+
|      sales|
+-----------+
| 0.77347701|
| 0.77617723|
|-0.26191574|
| 0.06015559|
|-0.18058041|
| 1.15605904|
|-0.54163328|
| 0.83280377|
|-0.69920523|
|-0.33986035|
|-0.94114708|
|-0.88438698|
| 1.18682329|
| 1.21287342|
|-0.82575258|
|  0.5895868|
|  -1.646899|
| -1.5341987|
|-0.94135006|
|  0.5699716|
+-----------+



In [28]:
winsorizing(df).show()

23/02/21 11:34:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/21 11:34:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/21 11:34:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/21 11:34:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/21 11:34:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-----------+----------------------+-----------------------+----------------+
|      sales|sales_lower_percentile|sales_higher_percentile|sales_winsorized|
+-----------+------