# `Промышленное машинное обучение на Spark`
## `Занятие 03: Основы Spark`

О чём можно узнать из этого ноутбука:

* DataFrame и SQL API
* Базовые операции в Spark

### `Устанавливаем необходимые зависимости`

In [1]:
! pip3 install -q pyspark pyarrow parquet-tools

  Preparing metadata (setup.py) ... [?25l[?25hdone
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/59.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.6/59.6 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.5/139.5 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.4/13.4 MB[0m [31m74.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.4/84.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for halo (setup.py) ... [?25l[?25hdone
  Building wheel for thrift (setup.py) ... [?25l[?25hdone


### `Готовим SparkContext`

Как следует из лекции объект SparkContext является точкой входа для работы со Spark кластером.

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

# Создаём конфигурационный класс с параметрами подключения
conf = (
    SparkConf()
        # Указываем URL master ноды Spark кластера
        # Можно использовать local mode, указав `local[<number_cores>]`
        # В таком случае вся обработка будет происходить на текущем компьютере
        # При этом, это может давать преимущество ввиду наличия параллелизма по ядрам компьютера
        .setMaster('local[*]')
)
# Создаём точку доступа на кластер. Позволят использовать RDD API
sc = SparkContext(conf=conf)
# Точка доступа для использования DataFrame API
spark = SparkSession(sc)

# По завершении программы нужно обязательно выполнить остановку подключения для освобождения занятых ресурсов
# sc.stop()

### `Загрузка данных`

Скачаем данные, с которыми будем в дальнейшем работать

In [3]:
import json
import requests
import urllib

folder_url = 'https://disk.yandex.lt/d/JnDy1h48pJI7IA'
file_url = '/m5-forecating-accuracy.zip'
# запрос ссылки на скачивание
response = requests.get('https://cloud-api.yandex.net/v1/disk/public/resources/download',
                 params={'public_key': folder_url, 'path': file_url})
# 'парсинг' ссылки на скачивание
data_link = response.json()['href']
data_link

'https://downloader.disk.yandex.ru/disk/ffb942464634e02584278b4582f9145d10aee2bf645c8d071229f0c695847458/67cf39e1/qo_oUU0UNiEpD-z6-zCIgdRB33v8cVyWATClBo2FTm36Wiufn6dFhyr1IoTkBcuIDOqKvX5GKvyJZKr2SKucCw%3D%3D?uid=0&filename=m5-forecating-accuracy.zip&disposition=attachment&hash=J5ovyMT7FWTmxkOjFLIMj3wuXKK82PqrkEXM2lsA3isVM3La/mWxkoo43Q/uGJZ5q/J6bpmRyOJonT3VoXnDag%3D%3D%3A/m5-forecating-accuracy.zip&limit=0&content_type=application%2Fzip&owner_uid=1199758960&fsize=48326531&hid=d3329dd040d2a0b0116f34e36f7142dc&media_type=compressed&tknv=v2'

Скачайте, данные по ссылке, которую вы получите после запуска предыдущей ячейки.

Для скачивания необходимо ввести команду: wget -O m5-forecating-accuracy.zip \<YOUR_DATA_LINK\>

P.S.: Ссылка с github может быть уже недействительной, поэтому нужно обязательно запустить у себя ячейку выше

In [4]:
# Вставьте здесь свой data_link
! wget -O m5-forecasting-accuracy.zip 'https://downloader.disk.yandex.ru/disk/ffb942464634e02584278b4582f9145d10aee2bf645c8d071229f0c695847458/67cf39e1/qo_oUU0UNiEpD-z6-zCIgdRB33v8cVyWATClBo2FTm36Wiufn6dFhyr1IoTkBcuIDOqKvX5GKvyJZKr2SKucCw%3D%3D?uid=0&filename=m5-forecating-accuracy.zip&disposition=attachment&hash=J5ovyMT7FWTmxkOjFLIMj3wuXKK82PqrkEXM2lsA3isVM3La/mWxkoo43Q/uGJZ5q/J6bpmRyOJonT3VoXnDag%3D%3D%3A/m5-forecating-accuracy.zip&limit=0&content_type=application%2Fzip&owner_uid=1199758960&fsize=48326531&hid=d3329dd040d2a0b0116f34e36f7142dc&media_type=compressed&tknv=v2'

--2025-03-10 15:14:15--  https://downloader.disk.yandex.ru/disk/ffb942464634e02584278b4582f9145d10aee2bf645c8d071229f0c695847458/67cf39e1/qo_oUU0UNiEpD-z6-zCIgdRB33v8cVyWATClBo2FTm36Wiufn6dFhyr1IoTkBcuIDOqKvX5GKvyJZKr2SKucCw%3D%3D?uid=0&filename=m5-forecating-accuracy.zip&disposition=attachment&hash=J5ovyMT7FWTmxkOjFLIMj3wuXKK82PqrkEXM2lsA3isVM3La/mWxkoo43Q/uGJZ5q/J6bpmRyOJonT3VoXnDag%3D%3D%3A/m5-forecating-accuracy.zip&limit=0&content_type=application%2Fzip&owner_uid=1199758960&fsize=48326531&hid=d3329dd040d2a0b0116f34e36f7142dc&media_type=compressed&tknv=v2
Resolving downloader.disk.yandex.ru (downloader.disk.yandex.ru)... 77.88.21.127, 2a02:6b8::2:127
Connecting to downloader.disk.yandex.ru (downloader.disk.yandex.ru)|77.88.21.127|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://s1064sas.storage.yandex.net/rdisk/ffb942464634e02584278b4582f9145d10aee2bf645c8d071229f0c695847458/67cf39e1/qo_oUU0UNiEpD-z6-zCIgdRB33v8cVyWATClBo2FTm36Wiufn6dFhyr1IoTkB

В этом проекте нужно работать с данными для предсказания спроса: [M5 Forecasting](https://www.kaggle.com/competitions/m5-forecasting-accuracy/data).

In [5]:
path = './m5-forecasting-accuracy'

In [6]:
import zipfile
with zipfile.ZipFile('./m5-forecasting-accuracy.zip', 'r') as zip_ref:
    zip_ref.extractall('./m5-forecasting-accuracy')

In [7]:
%ls $path

calendar.csv  sales_train_evaluation.csv  sample_submission.csv
[0m[01;34m__MACOSX[0m/     sales_train_validation.csv  sell_prices.csv


In [10]:
# Посмотрим несколько строк датасета calendar.csv
!head $path/calendar.csv

date,wm_yr_wk,weekday,wday,month,year,d,event_name_1,event_type_1,event_name_2,event_type_2,snap_CA,snap_TX,snap_WI
2011-01-29,11101,Saturday,1,1,2011,d_1,,,,,0,0,0
2011-01-30,11101,Sunday,2,1,2011,d_2,,,,,0,0,0
2011-01-31,11101,Monday,3,1,2011,d_3,,,,,0,0,0
2011-02-01,11101,Tuesday,4,2,2011,d_4,,,,,1,1,0
2011-02-02,11101,Wednesday,5,2,2011,d_5,,,,,1,0,1
2011-02-03,11101,Thursday,6,2,2011,d_6,,,,,1,1,1
2011-02-04,11101,Friday,7,2,2011,d_7,,,,,1,0,0
2011-02-05,11102,Saturday,1,2,2011,d_8,,,,,1,1,1
2011-02-06,11102,Sunday,2,2,2011,d_9,SuperBowl,Sporting,,,1,1,1


Атрибуты датасета **calendar.csv**.

Датасет содержит календарные данные, которые помогают связать продажи товаров с конкретными днями, неделями и событиями.

**date**: Дата записи данных.

**wm_yr_wk**: Календарная неделя в формате года и номера недели (год-неделя).

**weekday**: Название дня недели.

**wday**: Порядковый номер дня недели.

**month**: Порядковый номер месяца.

**year**: Год наблюдения.

**d**: Идентификатор дня в формате последовательности (например, d_1, d_2 и т.д.).

**event_name_1**: Название первичного события (праздники, акции, особые события).

**event_type_1**: Тип первичного события (например, праздник, спортивное событие).

**event_name_2**: Название вторичного события (если в этот день несколько значимых событий).

**event_type_2**: Тип вторичного события.

**snap_CA, snap_TX, snap_WI**: Индикаторы (0 или 1) программы SNAP (льготная покупка продуктов) в соответствующих штатах (Калифорния, Техас, Висконсин). Показатель 1 означает, что в указанный день были доступны льготы SNAP.

In [11]:
# Посмотрим несколько строк датасета sales_train_validation.csv
!head $path/sales_train_validation.csv

id,item_id,dept_id,cat_id,store_id,state_id,d_1,d_2,d_3,d_4,d_5,d_6,d_7,d_8,d_9,d_10,d_11,d_12,d_13,d_14,d_15,d_16,d_17,d_18,d_19,d_20,d_21,d_22,d_23,d_24,d_25,d_26,d_27,d_28,d_29,d_30,d_31,d_32,d_33,d_34,d_35,d_36,d_37,d_38,d_39,d_40,d_41,d_42,d_43,d_44,d_45,d_46,d_47,d_48,d_49,d_50,d_51,d_52,d_53,d_54,d_55,d_56,d_57,d_58,d_59,d_60,d_61,d_62,d_63,d_64,d_65,d_66,d_67,d_68,d_69,d_70,d_71,d_72,d_73,d_74,d_75,d_76,d_77,d_78,d_79,d_80,d_81,d_82,d_83,d_84,d_85,d_86,d_87,d_88,d_89,d_90,d_91,d_92,d_93,d_94,d_95,d_96,d_97,d_98,d_99,d_100,d_101,d_102,d_103,d_104,d_105,d_106,d_107,d_108,d_109,d_110,d_111,d_112,d_113,d_114,d_115,d_116,d_117,d_118,d_119,d_120,d_121,d_122,d_123,d_124,d_125,d_126,d_127,d_128,d_129,d_130,d_131,d_132,d_133,d_134,d_135,d_136,d_137,d_138,d_139,d_140,d_141,d_142,d_143,d_144,d_145,d_146,d_147,d_148,d_149,d_150,d_151,d_152,d_153,d_154,d_155,d_156,d_157,d_158,d_159,d_160,d_161,d_162,d_163,d_164,d_165,d_166,d_167,d_168,d_169,d_170,d_171,d_172,d_173,d_174,d_175,d_176,d_177,d_

Атрибуты датасета **sales_train_validation.csv**.

Данный датасет содержит историю продаж товаров в розничных магазинах Walmart. Используется для анализа спроса, прогнозирования продаж и оптимизации запасов. Датасет для валидации.

**id**: Уникальный идентификатор товара в конкретном магазине.

**item_id**: Идентификатор товара.

**dept_id**: Идентификатор отдела, к которому относится товар.

**cat_id**: Категория товара.

**store_id**: Идентификатор магазина, в котором был продан товар.

**state_id**: Штат, в котором расположен магазин.

**d_1, d_2, ..., d_n**: Ежедневные данные о продажах данного товара в указанном магазине (количество проданных единиц за каждый день). Каждый атрибут соответствует одному дню, начиная с первого дня периода наблюдения.

In [12]:
# Посмотрим несколько строк датасета sales_train_evaluation.csv
!head $path/sales_train_evaluation.csv

id,item_id,dept_id,cat_id,store_id,state_id,d_1,d_2,d_3,d_4,d_5,d_6,d_7,d_8,d_9,d_10,d_11,d_12,d_13,d_14,d_15,d_16,d_17,d_18,d_19,d_20,d_21,d_22,d_23,d_24,d_25,d_26,d_27,d_28,d_29,d_30,d_31,d_32,d_33,d_34,d_35,d_36,d_37,d_38,d_39,d_40,d_41,d_42,d_43,d_44,d_45,d_46,d_47,d_48,d_49,d_50,d_51,d_52,d_53,d_54,d_55,d_56,d_57,d_58,d_59,d_60,d_61,d_62,d_63,d_64,d_65,d_66,d_67,d_68,d_69,d_70,d_71,d_72,d_73,d_74,d_75,d_76,d_77,d_78,d_79,d_80,d_81,d_82,d_83,d_84,d_85,d_86,d_87,d_88,d_89,d_90,d_91,d_92,d_93,d_94,d_95,d_96,d_97,d_98,d_99,d_100,d_101,d_102,d_103,d_104,d_105,d_106,d_107,d_108,d_109,d_110,d_111,d_112,d_113,d_114,d_115,d_116,d_117,d_118,d_119,d_120,d_121,d_122,d_123,d_124,d_125,d_126,d_127,d_128,d_129,d_130,d_131,d_132,d_133,d_134,d_135,d_136,d_137,d_138,d_139,d_140,d_141,d_142,d_143,d_144,d_145,d_146,d_147,d_148,d_149,d_150,d_151,d_152,d_153,d_154,d_155,d_156,d_157,d_158,d_159,d_160,d_161,d_162,d_163,d_164,d_165,d_166,d_167,d_168,d_169,d_170,d_171,d_172,d_173,d_174,d_175,d_176,d_177,d_

Атрибуты датасета **sales_train_evaluation.csv**.

Данный датасет содержит историю продаж товаров в розничных магазинах Walmart. Используется для анализа спроса, прогнозирования продаж и оптимизации запасов. Датасет для тренировки.

**id**: Уникальный идентификатор товара в конкретном магазине.

**item_id**: Идентификатор товара.

**dept_id**: Идентификатор отдела, к которому относится товар.

**cat_id**: Категория товара.

**store_id**: Идентификатор магазина, в котором был продан товар.

**state_id**: Штат, в котором расположен магазин.

**d_1, d_2, ..., d_n**: Ежедневные данные о продажах данного товара в указанном магазине (количество проданных единиц за каждый день). Каждый атрибут соответствует одному дню, начиная с первого дня периода наблюдения.

In [13]:
# Посмотрим несколько строк датасета sell_prices.csv
!head $path/sell_prices.csv

store_id,item_id,wm_yr_wk,sell_price
CA_1,HOBBIES_1_001,11325,9.58
CA_1,HOBBIES_1_001,11326,9.58
CA_1,HOBBIES_1_001,11327,8.26
CA_1,HOBBIES_1_001,11328,8.26
CA_1,HOBBIES_1_001,11329,8.26
CA_1,HOBBIES_1_001,11330,8.26
CA_1,HOBBIES_1_001,11331,8.26
CA_1,HOBBIES_1_001,11332,8.26
CA_1,HOBBIES_1_001,11333,8.26


Атрибуты датасета **sell_prices.csv**.

Датасет содержит информацию о динамике цен на товары в различных магазинах за определенные недели.

**store_id** – идентификатор магазина, в котором продаётся товар.

**item_id** – уникальный идентификатор конкретного товара.

**wm_yr_wk** – календарная неделя года, к которой относится указанная цена (в формате Walmart-календаря).

**sell_price** – розничная цена продажи товара в указанном магазине в течение соответствующей недели.

In [14]:
# Зададим пути к файлам из датасета
file_calendar = f"{path}/calendar.csv"
file_validation = f"{path}/sales_train_validation.csv"
file_evaluation = f"{path}/sales_train_evaluation.csv"
file_prices = f"{path}/sell_prices.csv"

# Формат данных — CSV
file_type = "csv"
# Зададим параметры, как интерпретировать загруженные данные
# Определять типы колонок автоматически
infer_schema = "true"
# Интерпретируем первую строку в файле, как названия колонок
first_row_is_header = "true"
# Задаём разделитель между значениями колонок
delimiter = ","

# загружаем в Spark данные валидации
df_validation = (
    spark.read.format(file_type)
      .option("inferSchema", infer_schema)
      .option("header", first_row_is_header)
      .option("sep", delimiter)
      .load(file_validation)
    # Также, можно указывать пути в hdfs или базы данных, например, Hive
#       .load('hdfs:///path_to_data/...')
)

# загружаем данные для теста
df_evaluation = (
    spark.read.format(file_type)
      .option("inferSchema", infer_schema)
      .option("header", first_row_is_header)
      .option("sep", delimiter)
      .load(file_evaluation)
)
# загружаем данные с ценами
df_prices = (
    spark.read.format(file_type)
      .option("inferSchema", infer_schema)
      .option("header", first_row_is_header)
      .option("sep", delimiter)
      .load(file_prices)
)

# Возьмём первые 10 строк pyspark.sql.dataframe.DataFrame
# И выполним action-преобразование для преобразования в pandas.DataFrame
df_validation.limit(10).toPandas()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,d_1,d_2,d_3,d_4,...,d_1904,d_1905,d_1906,d_1907,d_1908,d_1909,d_1910,d_1911,d_1912,d_1913
0,HOBBIES_1_001_CA_1_validation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,1,3,0,1,1,1,3,0,1,1
1,HOBBIES_1_002_CA_1_validation,HOBBIES_1_002,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
2,HOBBIES_1_003_CA_1_validation,HOBBIES_1_003,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,2,1,2,1,1,1,0,1,1,1
3,HOBBIES_1_004_CA_1_validation,HOBBIES_1_004,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,1,0,5,4,1,0,1,3,7,2
4,HOBBIES_1_005_CA_1_validation,HOBBIES_1_005,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,2,1,1,0,1,1,2,2,2,4
5,HOBBIES_1_006_CA_1_validation,HOBBIES_1_006,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,1,0,1,0,0,0,2,0,0
6,HOBBIES_1_007_CA_1_validation,HOBBIES_1_007,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,0,0,1,0,1,0,0,1,1
7,HOBBIES_1_008_CA_1_validation,HOBBIES_1_008,HOBBIES_1,HOBBIES,CA_1,CA,12,15,0,0,...,0,0,1,37,3,4,6,3,2,1
8,HOBBIES_1_009_CA_1_validation,HOBBIES_1_009,HOBBIES_1,HOBBIES,CA_1,CA,2,0,7,3,...,0,0,1,1,6,0,0,0,0,0
9,HOBBIES_1_010_CA_1_validation,HOBBIES_1_010,HOBBIES_1,HOBBIES,CA_1,CA,0,0,1,0,...,1,0,0,0,0,0,0,2,0,2


### `Spark DataFrame API`

* [Quickstart](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html)
* [Документация](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html)

In [15]:
emp_data = [
    (1, 'Smith', 10),
    (2, 'Rose', 20),
    (3, 'Williams', 10),
    (4, 'Jones', 30),
    (5, 'Jones', None),
]
emp_columns = ['emp_id', 'name', 'dept_id']

emp_df = spark.createDataFrame(emp_data, emp_columns)
emp_df

DataFrame[emp_id: bigint, name: string, dept_id: bigint]

In [16]:
# выведем тип датафрейма
type(emp_df)

Попробуем посмотреть, что находится в emp_df

In [17]:
emp_df

DataFrame[emp_id: bigint, name: string, dept_id: bigint]

Вывод DataFrame не показывает его содержимое, так как оно ещё не было вычислено, вычисления в Spark происходят только в момент вызова функций типа action.

Примеры action:
* `count()` — подсчитывает число строк в DataFrame
* `toPandas()` — преобразует Spark DataFrame в pandas DataFrame
* `collect()` — выполняет вычисление текущего Spark DataFrame и возвращает результат
* `show()` — `collect()` + pretty print результата

In [18]:
emp_df.show()

+------+--------+-------+
|emp_id|    name|dept_id|
+------+--------+-------+
|     1|   Smith|     10|
|     2|    Rose|     20|
|     3|Williams|     10|
|     4|   Jones|     30|
|     5|   Jones|   NULL|
+------+--------+-------+



Получим базовую информация о данных — названия колонок и тип данных, хранящийся в них

In [19]:
# DataFrame.columns - показывает названия колонок внутри spark-dataframe
print(emp_df.columns)
# DataFrame.schema - показывает их типы
emp_df.schema

['emp_id', 'name', 'dept_id']


StructType([StructField('emp_id', LongType(), True), StructField('name', StringType(), True), StructField('dept_id', LongType(), True)])

Многие методы дублируются по аналогии с `pandas.DataFrame`

In [20]:
# Уберём строки, содержащие хотя бы одно NULL значение
emp_df.dropna().show()

+------+--------+-------+
|emp_id|    name|dept_id|
+------+--------+-------+
|     1|   Smith|     10|
|     2|    Rose|     20|
|     3|Williams|     10|
|     4|   Jones|     30|
+------+--------+-------+



DataFrame состоит из колонок. Получение колонки возможно через атрибуты (точечная нотация) или через индексацию:

In [21]:
emp_df.name, emp_df['name']

(Column<'name'>, Column<'name'>)

Вместо самих значений колонок, в соответствии с принципом "ленивых" вычислений, возвращаются ссылки на них. Такие колонки могут участвовать в символьных вычислениях. Например, к ним можно применять арифметические, булевы операции.

In [22]:
# cозданим новую колонку с данными, на основе данных из колонок dep_id и emp_id,
# в соответствии с операцийми ниже
column_expr = (emp_df.dept_id - 20) / 10 > emp_df.emp_id
column_expr

Column<'(((dept_id - 20) / 10) > emp_id)'>

Так как не было выполненно ни одной операции тип action c новой колонкой, при выводе выдаётся только её символическая запись. Чтобы исправить это, запустим операцию типа-action show(), которая делает вывод посчитанного результата.

Полученные **колоночные выражения** (**column expression**) можно вычислять:

In [23]:
emp_df.select(column_expr).show()

+--------------------------------+
|(((dept_id - 20) / 10) > emp_id)|
+--------------------------------+
|                           false|
|                           false|
|                           false|
|                           false|
|                            NULL|
+--------------------------------+



Колонку можно переименовать, используя метод .alias(\<new_name\>) для данной колонки

In [24]:
emp_df.select((emp_df.dept_id ** 2).alias('dept_id squared')).show()

+---------------+
|dept_id squared|
+---------------+
|          100.0|
|          400.0|
|          100.0|
|          900.0|
|           NULL|
+---------------+



Для DataFrame доступны SQL подобные операции, например, `join`:

In [25]:
dept_data = [
    ('Finance', 10),
    ('Marketing', 20),
    ('Sales', 30),
    ('IT', 40),
]
dept_columns = ['dept_name', 'dept_id']

# создаём spark-датафрейм
dept_df = spark.createDataFrame(dept_data, dept_columns)
dept_df.show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



In [26]:
# через метод .join() объеденим emp_df с dept_df по ключю dept_id
emp_df.join(dept_df, how='inner', on=['dept_id']).show()

+-------+------+--------+---------+
|dept_id|emp_id|    name|dept_name|
+-------+------+--------+---------+
|     10|     1|   Smith|  Finance|
|     10|     3|Williams|  Finance|
|     20|     2|    Rose|Marketing|
|     30|     4|   Jones|    Sales|
+-------+------+--------+---------+



В предыдущем примере объединение таблиц происходило только по пересекающимся (параметр how='inner') значениям dept_id, теперь объденим таблицы по всевозможным ключам (параметр how='outer').

In [27]:
emp_df.join(dept_df, how='outer', on=['dept_id']).show()

+-------+------+--------+---------+
|dept_id|emp_id|    name|dept_name|
+-------+------+--------+---------+
|   NULL|     5|   Jones|     NULL|
|     10|     1|   Smith|  Finance|
|     10|     3|Williams|  Finance|
|     20|     2|    Rose|Marketing|
|     30|     4|   Jones|    Sales|
|     40|  NULL|    NULL|       IT|
+-------+------+--------+---------+



Также, доступна фильтрация и сортировка:

In [29]:
# метод where отвечает за фильрации, внутри которого указываются условия фильтрации
# метод sort производит сортировку по полю/полям, которые указаны в его аргументах
(
    emp_df
      .join(dept_df, how='outer', on=['dept_id'])
      # Обратите внимание на колоночное выражение в фильтре
      .where((emp_df['name'] == 'Smith') | (emp_df['name'] == 'Rose'))
      .sort('dept_id')
      .show()
)

+-------+------+-----+---------+
|dept_id|emp_id| name|dept_name|
+-------+------+-----+---------+
|     10|     1|Smith|  Finance|
|     20|     2| Rose|Marketing|
+-------+------+-----+---------+



Работа с колонками обычно выполняется через колоночные выражения. Их можно использовать, например, для выполнения join:

In [31]:
emp_columns_renamed = ['emp_id', 'name', 'emp_dept_id']

emp_renamed_df = spark.createDataFrame(emp_data, emp_columns_renamed)
emp_renamed_df.show()

+------+--------+-----------+
|emp_id|    name|emp_dept_id|
+------+--------+-----------+
|     1|   Smith|         10|
|     2|    Rose|         20|
|     3|Williams|         10|
|     4|   Jones|         30|
|     5|   Jones|       NULL|
+------+--------+-----------+



In [32]:
# теперь вместо параметра  on=['dept_id'] в методе join используем колоночное выражение
emp_renamed_df.join(
    dept_df, emp_renamed_df.emp_dept_id == dept_df.dept_id,  how='inner'
).show()

+------+--------+-----------+---------+-------+
|emp_id|    name|emp_dept_id|dept_name|dept_id|
+------+--------+-----------+---------+-------+
|     1|   Smith|         10|  Finance|     10|
|     3|Williams|         10|  Finance|     10|
|     2|    Rose|         20|Marketing|     20|
|     4|   Jones|         30|    Sales|     30|
+------+--------+-----------+---------+-------+



Переименование колонок также возможно:

In [33]:
# .withColumnRenamed(<old_name>, <new_name>)
(
    emp_renamed_df
      .withColumnRenamed('emp_dept_id', 'dept_id')
      .join(
          dept_df, 'dept_id',  how='inner'
      )
      .show()
)

+-------+------+--------+---------+
|dept_id|emp_id|    name|dept_name|
+-------+------+--------+---------+
|     10|     1|   Smith|  Finance|
|     10|     3|Williams|  Finance|
|     20|     2|    Rose|Marketing|
|     30|     4|   Jones|    Sales|
+-------+------+--------+---------+



Для преобразования колонок в модуле `pyspark.sql.functions` содержится большой набор вспомогательных функций. Например:
* Вспомогательные: `lit`, `col`, ...
* Поэлементные математические функции: `cos`, `sin`, `round`, ...
* Поэлементные функции для работы датами и временем: `dayofmonth`, ...
* Агрегаторы: `sum`, `mean`, ...
* Функции для работы с коллекциями (сложными данными, хранящимся в колонке): `array_sort`, `concat`, ...
* Сортировки: `asc`, ...
* Строковые функции: `concat_ws`, `lower`, `split`, ...
* Оконные функции: `lag`, ...
* Преобразования с пользовательскими функциями: `udf_pandas`, ...

**Всегда перед написанием кода нужно подумать, нет ли уже готовой функции в данном модуле. Использование готовых функции существенно влияет на скорость вычислений.**

In [34]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType

Часто для применения функций нужно поменять тип колонки, за что отвечает метод .cast(\<NEW_COLUMN_TYPE\>)

In [35]:
emp_with_date = (
    emp_df
        .dropna()
        .withColumn(
            'hire_date',
            # Конструируем дату в формате yyyy-mm-dd
            F.concat_ws(
                '-',
                # Придумываем год
                (1990 + emp_df.dept_id).cast(StringType()),
                # Придумываем месяц
                F.concat(F.lit('0'), emp_df.emp_id.cast(StringType())),
                # Придумываем день
                emp_df.dept_id.cast(StringType())
            ).cast(DateType())
        )
)
emp_with_date.show()

+------+--------+-------+----------+
|emp_id|    name|dept_id| hire_date|
+------+--------+-------+----------+
|     1|   Smith|     10|2000-01-10|
|     2|    Rose|     20|2010-02-20|
|     3|Williams|     10|2000-03-10|
|     4|   Jones|     30|2020-04-30|
+------+--------+-------+----------+



In [38]:
emp_with_date.select(
    emp_with_date.hire_date,
    # берём от колонки с датой только значение года
    F.year(emp_with_date.hire_date),
    # заменяем все вхождения smith на смит и переименовываем её в processed_name
    F.regexp_replace(F.lower(emp_with_date.name), 'smith', 'cмит').alias('processed_name')
).show()

+----------+---------------+--------------+
| hire_date|year(hire_date)|processed_name|
+----------+---------------+--------------+
|2000-01-10|           2000|          cмит|
|2010-02-20|           2010|          rose|
|2000-03-10|           2000|      williams|
|2020-04-30|           2020|         jones|
+----------+---------------+--------------+



### `Spark SQL API`

Работа с таблицами в Spark возможна не только в Pandas-подобном интерфейсе, также поддерживается SQL выражения для обработки.

In [None]:
add_data = [
    (1, '1523 Main St', 'SFO', 'CA'),
    (2, '3453 Orange St', 'SFO', 'NY'),
    (3, '34 Warner St', 'Jersey', 'NJ'),
    (4, '221 Cavalier St', 'Newark', 'DE'),
    (5, '789 Walnut St', 'Sandiago', 'CA')
]
add_columns = ['emp_id', 'address', 'city', 'state']
# создадим тестовую таблицу
add_df = spark.createDataFrame(add_data, add_columns)
add_df.show()

+------+---------------+--------+-----+
|emp_id|        address|    city|state|
+------+---------------+--------+-----+
|     1|   1523 Main St|     SFO|   CA|
|     2| 3453 Orange St|     SFO|   NY|
|     3|   34 Warner St|  Jersey|   NJ|
|     4|221 Cavalier St|  Newark|   DE|
|     5|  789 Walnut St|Sandiago|   CA|
+------+---------------+--------+-----+



Spark позволяет использовать DataFrame в качестве таблиц в регулярных SQL запросах:

In [None]:
# создаём таблицу с именем EMP на основе датафрейма emp_df
emp_df.createOrReplaceTempView('EMP')
# создаём таблицу с именем DEPT на основе датафрейма dept_df
dept_df.createOrReplaceTempView('DEPT')
# создаём таблицу с именем DEPT на основе датафрейма add_df
add_df.createOrReplaceTempView('ADD')

In [None]:
# выполняем сам SQL запрос над данными таблицами
spark.sql('''
    select * from EMP e, DEPT d, ADD a
    where e.dept_id == d.dept_id and e.emp_id == a.emp_id
''').show()

                                                                                                                                                                                     

+------+--------+-------+---------+-------+------+---------------+------+-----+
|emp_id|    name|dept_id|dept_name|dept_id|emp_id|        address|  city|state|
+------+--------+-------+---------+-------+------+---------------+------+-----+
|     1|   Smith|     10|  Finance|     10|     1|   1523 Main St|   SFO|   CA|
|     2|    Rose|     20|Marketing|     20|     2| 3453 Orange St|   SFO|   NY|
|     3|Williams|     10|  Finance|     10|     3|   34 Warner St|Jersey|   NJ|
|     4|   Jones|     30|    Sales|     30|     4|221 Cavalier St|Newark|   DE|
+------+--------+-------+---------+-------+------+---------------+------+-----+



### `Ещё базовые операции над Spark DataFrame`

In [None]:
data = [
    ('James', 'Sales', 3000),
    ('Michael', 'Sales', 4600),
    ('Robert', 'Sales', 4100),
    ('Maria', 'Finance', 3000),
    ('James', 'Sales', 3000),
    ('Scott', 'Finance', 3300),
    ('Jen', 'Finance', 3900),
    ('Jeff', ' Marketing', 3000),
    ('Kumar', 'Marketing', 2000),
    ('Saif', 'Sales', 4100),
]
columns = ['Name', 'Dept', 'Salary']

df = spark.createDataFrame(data, columns)
df.show()

+-------+----------+------+
|   Name|      Dept|Salary|
+-------+----------+------+
|  James|     Sales|  3000|
|Michael|     Sales|  4600|
| Robert|     Sales|  4100|
|  Maria|   Finance|  3000|
|  James|     Sales|  3000|
|  Scott|   Finance|  3300|
|    Jen|   Finance|  3900|
|   Jeff| Marketing|  3000|
|  Kumar| Marketing|  2000|
|   Saif|     Sales|  4100|
+-------+----------+------+



In [None]:
# метод .distinct отбирает только уникальные строки в датафрейме
df.distinct().show()

+-------+----------+------+
|   Name|      Dept|Salary|
+-------+----------+------+
|Michael|     Sales|  4600|
|  James|     Sales|  3000|
| Robert|     Sales|  4100|
|  Maria|   Finance|  3000|
|    Jen|   Finance|  3900|
|  Scott|   Finance|  3300|
|  Kumar| Marketing|  2000|
|   Saif|     Sales|  4100|
|   Jeff| Marketing|  3000|
+-------+----------+------+



In [None]:
# метод count() считаем число строк в датафрейме
# так как count относится к action операциям, то метод show можно не вызывать
df.distinct().count()

9

Также, возможно использовать группировку и агрегаты:

In [None]:
# сгрупировать данные по департаменту и посчитать суммарную зарплату всех сотрудникаов в нём
df.groupBy('Dept').sum().collect()

[Row(Dept='Sales', sum(Salary)=18800),
 Row(Dept='Finance', sum(Salary)=10200),
 Row(Dept=' Marketing', sum(Salary)=3000),
 Row(Dept='Marketing', sum(Salary)=2000)]

### `IO операции`

После того как было произведены все операции с данными нужно сохранить результаты их работы на диск

In [None]:
base_statistics = df.select(
    F.min('Salary').alias('min_salary'),
    F.mean('Salary').alias('mean_salary'),
    F.max('Salary').alias('max_salary')
)
# Пока никаких вычислений не произошло
base_statistics

DataFrame[min_salary: bigint, mean_salary: double, max_salary: bigint]

In [None]:
# записываем данные на диск в CSV формате формате
base_statistics.write.csv('./base_statistics.csv', header=True)
# записываем данные на диск в специальном spark-формате parquet
base_statistics.write.parquet('./base_statistics.parquet')

                                                                                                                                                                                     

In [None]:
%ls ./base_statistics.csv/
%pycat ./base_statistics.csv/part-00000-5f1b82c2-0fe0-4305-8c71-067297fd29b2-c000.csv

part-00000-cb924d66-c4d7-4a7d-9c4a-9991b308fa9a-c000.csv  _SUCCESS
Error: no such file, variable, URL, history range or macro


Для просмотра parquet-файлов нужны специльные инструменты

In [None]:
%ls ./base_statistics.parquet/
! parquet-tools inspect ./base_statistics.parquet/part-00000-17fd217a-ca74-4556-bb0b-991cee8924dc-c000.snappy.parquet

part-00000-17fd217a-ca74-4556-bb0b-991cee8924dc-c000.snappy.parquet  _SUCCESS

############ file meta data ############
created_by: parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)
num_columns: 3
num_rows: 1
num_row_groups: 1
format_version: 1.0
serialized_size: 789
[0m
[0m
############ Columns ############
min_salary
mean_salary
max_salary

############ Column(min_salary) ############
name: min_salary
path: min_salary
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: -5%)

############ Column(mean_salary) ############
name: mean_salary
path: mean_salary
max_definition_level: 1
max_repetition_level: 0
physical_type: DOUBLE
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: -5%)

############ Column(max_salary) ############
name: max_salary
path: max_salary
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
lo

Теперь произвём операцию чтения сохранённых данных

In [None]:
loaded_df = (
    spark.read
        .format('csv')
        .option("inferSchema", True)
        .option("header", True)
        .option("sep", ',')
        .load('./base_statistics.csv')
)
loaded_df, loaded_df.show()

+----------+-----------+----------+
|min_salary|mean_salary|max_salary|
+----------+-----------+----------+
|      2000|     3400.0|      4600|
+----------+-----------+----------+



(DataFrame[min_salary: int, mean_salary: double, max_salary: int], None)

Как видно из вывода все данные были сохранены корректно

##### `Завершение работы`

После завершения работы не забываем осовободить ресурсы и остановить текущую сессию spark

In [None]:
sc.stop()

### `Практика`

In [40]:
!head hr_data.csv

Age,MonthlyIncome,Department,Gender,Education,Attrition
41,5993,Sales,Female,2,Yes
49,5130,Research & Development,Male,1,No
37,2090,Research & Development,Male,2,Yes
33,2909,Research & Development,Female,4,No
27,3468,Research & Development,Male,1,No
32,3068,Research & Development,Male,2,No
59,2670,Research & Development,Female,3,No
30,2693,Research & Development,Male,1,No
38,9526,Research & Development,Male,3,No


In [46]:
from pyspark.sql import SparkSession

# Создание SparkSession
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

# Чтение CSV с заголовками
df = spark.read.csv("hr_data.csv", header=True, inferSchema=True)

# Вывести 5 строк
df.show(5)

+---+-------------+--------------------+------+---------+---------+
|Age|MonthlyIncome|          Department|Gender|Education|Attrition|
+---+-------------+--------------------+------+---------+---------+
| 41|         5993|               Sales|Female|        2|      Yes|
| 49|         5130|Research & Develo...|  Male|        1|       No|
| 37|         2090|Research & Develo...|  Male|        2|      Yes|
| 33|         2909|Research & Develo...|Female|        4|       No|
| 27|         3468|Research & Develo...|  Male|        1|       No|
+---+-------------+--------------------+------+---------+---------+
only showing top 5 rows



In [None]:
# 1) Найти минимальный, максимальный и средний возраст сотрудников.

# 2) Подсчитать количество сотрудников в каждом отделе.

# 3) Рассчитать средний доход сотрудников в зависимости от уровня образования.

# 4) Рассчитать средний возраст сотрудников, которые уволились (Attrition = Yes) и которые остались (Attrition = No).

# 5) Вычислить средний доход по полу и сравнить разницу.

# 6) Найти отдел с наибольшим процентом уволенных сотрудников.