# **Оконные функции в PySpark**
Полезные ссылки:

*   https://sparkbyexamples.com/pyspark/pyspark-window-functions/
*   https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Window.html





In [None]:
# Импорт библиотек
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

In [None]:
# Создаем SparkSession
spark = SparkSession.builder.appName("WindowFunctionsExample").getOrCreate()

In [None]:
# Пример данных
data = [("A", "2023-01-01", 100),
        ("A", "2023-01-02", 150),
        ("A", "2023-01-03", 150),
        ("A", "2023-01-04", 350),
        ("B", "2023-01-01", 200),
        ("B", "2023-01-02", 300),
        ("B", "2023-01-03", 250),
        ("C", "2023-01-01", 450),
        ("C", "2023-01-02", 600),
        ("C", "2023-01-03", 500),
        ("C", "2023-01-04", 850),]
columns = ["category", "date", "sales"]

In [None]:
# Создаем DataFrame
sales_df = spark.createDataFrame(data, columns)

In [None]:
# Смотрим наш DataFrame
sales_df.show()

In [None]:
sales_df.limit(10).toPandas()

## **row_number()**
В PySpark функция row_number() нужна для создания уникального номера строки (индекса) для каждой записи в пределах заданного окна (набора данных). В отличие от rank(), функция row_number() не пропускает номера при одинаковых значениях: она присваивает уникальный номер каждой строке, даже если значения в выбранной колонке совпадают.

In [None]:
# Определяем оконную спецификацию
window_spec = Window.partitionBy("category").orderBy("sales")

In [None]:
# Добавляем row number колонку
df_with_row_num = sales_df.withColumn("row_number", F.row_number().over(window_spec))

In [None]:
# Пример результата работы
df_with_row_num.show()

## **rank()**
Функция rank() в PySpark используется для присвоения рангов (позиций) каждой строке в пределах определённого окна (набора данных). Ранги присваиваются в зависимости от значения одной или нескольких колонок, указанных в Window-спецификации, которая определяет порядок строк.

Особенность функции rank() в том, что при наличии одинаковых значений (например, если две строки имеют одинаковое значение в колонке, по которой происходит ранжирование) этим строкам присваивается одинаковый ранг, а следующий ранг будет пропущен.

In [None]:
# Определение окна
window_spec = Window.partitionBy("category").orderBy("sales")

In [None]:
# Добавляем rank колонку
df_with_rank = sales_df.withColumn("rank", F.rank().over(window_spec))

In [None]:
# Пример результата работы
df_with_rank.show()

## **lag()**
Функция lag() в PySpark используется для получения значения из предыдущей строки в пределах заданного окна (окна данных). Она позволяет «заглянуть» на определённое количество строк назад и извлечь значение из другой строки, что полезно для выполнения вычислений на основе значений предыдущих строк, например, для расчета разниц или нахождения тенденций.

In [None]:
# Определение окна
window_spec = Window.partitionBy("category").orderBy("date")

In [None]:
# Добавляем lag колонку
df_with_lag = sales_df.withColumn("Previous_Value", F.lag("sales", 1).over(window_spec))

In [None]:
# Пример результата работы
df_with_lag.show()

## **lead()**
Функция lead() в PySpark нужна для получения значения из следующей строки в пределах определённого окна данных. Она позволяет «заглянуть вперёд» на заданное количество строк и извлечь значение из будущей строки. Это полезно для выполнения расчетов, где нужно сравнить текущее значение с последующим, например, для анализа тенденций или прогнозирования.

In [None]:
# Определение окна
window_spec = Window.partitionBy("category").orderBy("date")

In [None]:
# Добавляем lead колонку
df_with_lead = sales_df.withColumn("Next_Value", F.lead("sales", 2).over(window_spec))

In [None]:
df_with_lead.show()

## **rowsBetween**
Метод rowsBetween в PySpark используется для определения диапазона строк в пределах окна (window), с которыми будут производиться вычисления для каждой строки. Это позволяет гибко задавать, сколько строк до и после текущей строки должно быть включено в окно для выполнения агрегаций или вычислений.

**Как использовать rowsBetween в PySpark**

С rowsBetween можно указать:


1.   Начало диапазона (например, Window.unboundedPreceding или отрицательное число для строк перед текущей).
2.   Конец диапазона (например, Window.unboundedFollowing или положительное число для строк после текущей).

**Примеры значений:**



*   Window.unboundedPreceding – включает все строки от начала окна до текущей строки.
*   Window.unboundedFollowing – включает все строки от текущей строки до конца окна.
*   0 – указывает на текущую строку.
*   Отрицательное число (-1, -2, и т.д.) – включает строки перед текущей.
*   Положительное число (1, 2, и т.д.) – включает строки после текущей.










In [None]:
# Определение окна (обратите внимание где мы вызываем rowsBetween)
window_spec = Window.partitionBy("category").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [None]:
# Вычисляем кумулятивные продажи
sales_df = sales_df.withColumn("cumulative_sales", F.sum("sales").over(window_spec))
sales_df.show()

# **pandas_udf**
В PySpark, pandas_udf — это способ написания пользовательских функций с использованием библиотеки Pandas, что позволяет обрабатывать данные более эффективно и с минимальными накладными расходами на сериализацию. Они особенно полезны для операций, требующих быстрого выполнения, потому что они работают с данными векторно, а не построчно, как обычные UDF (User Defined Functions).

**Типы pandas_udf**



*   SCALAR: Принимает Pandas Series и возвращает Pandas Series. Это похоже на использование обычного UDF, но векторизовано.
*   GROUPED_MAP: Применяется к каждой группе данных и возвращает DataFrame.
*   GROUPED_AGG: Используется для агрегации данных в группах.






In [None]:
# Импорт библиотек
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
import pandas as pd

In [None]:
# Инициализация Spark сессии
spark = SparkSession.builder.appName("PandasUDFExample").getOrCreate()

### **Пример 1: pandas_udf типа SCALAR (возведение в квадрат)**

In [None]:
# Пример данных
data = [(1,), (2,), (3,), (4,)]
columns = ["Value"]

# Создаем DataFrame
df = spark.createDataFrame(data, columns)

# Определение pandas UDF функции для возведения в квадрат
@pandas_udf(IntegerType(), functionType=PandasUDFType.SCALAR)
def square(x: pd.Series) -> pd.Series:
    return x ** 2

# Применение pandas UDF
df_with_square = df.withColumn("Square_Value", square(df["Value"]))

# Вывод результата
df_with_square.show()

### **Пример 2: pandas_udf типа SCALAR (умножение двух столбцов)**

In [None]:
# Пример данных
data = [(2.0, 3.0), (4.0, 5.0), (6.0, 7.0)]
columns = ["ColumnA", "ColumnB"]
df = spark.createDataFrame(data, columns)

# Определение pandas UDF функции для умножения двух столбцов
@pandas_udf(DoubleType())
def multiply_columns(colA: pd.Series, colB: pd.Series) -> pd.Series:
    return colA * colB

# Применение pandas UDF для создания нового столбца
df_with_product = df.withColumn("Product", multiply_columns(df["ColumnA"], df["ColumnB"]))

# Вывод результата
df_with_product.show()

### **Пример 3: pandas_udf типа GROUPED_MAP (группировка)**

In [None]:
# Пример данных
data = [("Alice", "Math", 80),
        ("Alice", "Science", 85),
        ("Bob", "Math", 90),
        ("Bob", "Science", 75)]
columns = ["Name", "Subject", "Score"]
df = spark.createDataFrame(data, columns)

# Определение схемы для возвращаемого DataFrame
schema = StructType([
    StructField("Name", StringType()),
    StructField("Average_Score", DoubleType())
])

# Определение pandas UDF для расчета средней оценки по группам
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def calculate_average(pdf: pd.DataFrame) -> pd.DataFrame:
    result = pdf.groupby("Name").agg({"Score": "mean"}).reset_index()
    result.columns = ["Name", "Average_Score"]
    return result

# Применение функции к каждой группе
df_avg = df.groupBy("Name").apply(calculate_average)

# Вывод результата
df_avg.show()