In [None]:
import os
import sys
import pickle
from typing import List, Callable, TypeVar

import numpy as np
import pandas as pd


os.environ["JAVA_HOME"] = "<insert path>"
os.environ["SPARK_HOME"] = "<insert path>"
os.environ["HADOOP_CONF_DIR"] = "<insert path>"
os.environ["PYSPARK_PYTHON"] = "<insert path>"
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python", "lib", "pyspark.zip"))
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python", "lib", "py4j-0.10.9.7-src.zip"))

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql import Window

In [None]:
conf = (SparkConf()
    .setAll([
        ("spark.executor.cores", "2"),
        ("spark.executor.memory", "15G"),
        ("spark.dynamicAllocation.enabled", "true"),
        ("spark.dynamicAllocation.maxExecutors", "10"),
        ("spark.dynamicAllocation.cachedExecutorIdleTimeout", "30m"),
        ("spark.shuffle.service.enabled", "true"),
        ("spark.sql.execution.arrow.pyspark.enabled", "true"),
        ("spark.sql.catalogImplementation", "hive"),
    ])
)

In [None]:
spark = (SparkSession.builder
    .master("yarn")
    .appName("spark-uniform-shuffle")
    .config(conf=conf)
    .getOrCreate()
)

In [None]:
KEY_TYPE = "integer"

TYPE_MAPPING = {
    "long": "int64",
    "integer": "int32",
    "short": "int16",
}

def get_key_list(spark: SparkSession,
                 num_keys: int,
                 key_type: str = KEY_TYPE) -> List[int]:
    """
    Generating key list for uniform shuffle
    Генерация списка ключей для равномерного перемешивания

    Arguments
    _________
    spark: SparkSession object
    num_keys: Number of desired keys for shuffle
    key_type: Any type of integer containing values of result list

    Returns
    _______
    key_list: List of generated keys
    """
    win_spec = (Window
        .partitionBy("mod")
        .orderBy("id")
    )
    
    key_list = (spark
        .range(1_000_000, numPartitions=2)
        .select(
            F.col("id").cast(key_type)
        )
        .select(
            F.col("id"),
            
            F.when(
                F.hash("id") % num_keys >= 0,                 
                F.hash("id") % num_keys
            ).otherwise(
                F.hash("id") % num_keys + num_keys
            ).alias("mod"),
        )
        .select(
            F.col("id"),
            F.row_number().over(win_spec).alias("rn"),
        )
        .where(
            F.col("rn") == 1
        )
        .rdd.map(
            lambda row: row["id"]
        )
        .collect()
    )

    return key_list

### Объяснение

##### Блок кода 1
```python
.range(1_000_000, numPartitions=2)
.select(
    F.col("id").cast(key_type)
)
```
Значение hash-функции в spark зависит от типа аргумента и вычисляется по алгоритму Murmur3 (имплементирован в классе `org.apache.spark.unsafe.hash.Murmur3_x86_32`), поэтому необходимо зафиксировать тип генерируемой последовательности "на берегу". Тип `key_type` должен вмещать значения последовательности из выражения `spark.range`

In [7]:
Murmur3_x86_32 = spark._jvm.org.apache.spark.unsafe.hash.Murmur3_x86_32

In [8]:
Murmur3_x86_32.hashInt(101, 42)

-818933188

In [9]:
(spark
    .createDataFrame(
        [(101,)],
        schema="id integer"
    )
    .select(
        F.col("id"),
        F.hash("id"),
    )
    .show(1, False)
)

+---+----------+
|id |hash(id)  |
+---+----------+
|101|-818933188|
+---+----------+



##### Блок кода 2
```python
F.when(
    F.hash("id") % num_keys >= 0,                 
    F.hash("id") % num_keys
).otherwise(
    F.hash("id") % num_keys + num_keys
).alias("mod")
```
Значение хэш-функции является беззнаковым 4-х байтным целым числом (`uint32`), в то время как Spark поддерживает только знаковые 4-х байтные целые числа (`int32`), поэтому результат хэш-функции может быть отрицательным из-за переполнения разрядов `int32`.  
Вычисление остатка от деления в Java не является беззнаковой операцией, т.к. не является строго арифметической, как в Python, поэтому поле `mod`, задуманное как остаток от деления результата хэш-функции на число партиций, требует добавления числа партиций в случае отрицательного значения хэш-функции.

##### Блок кода 3
```python
win_spec = (Window
    .partitionBy("mod")
    .orderBy("id")
)

.select(
    F.col("id"),
    F.row_number().over(win_spec).alias("rn"),
)
.where(
    F.col("rn") == 1
)
```
Отбор по одному значению поля `id` на каждое уникальное значение поля `mod`, т.о. формируется результирующий список сгенерированных значений для равномерного шаффлинга

## Мульти-класс скоринг на Spark с применением равномерного перемешивания

In [None]:
DEFAULT_SHUFFLE_PARTS = int(spark.conf.get("spark.sql.shuffle.partitions"))

scoring_df = spark.table("<insert scoring table name here>")

with open("<path to model>", "rb") as f:
    model = pickle.load(f)

with open("<path to model features list>", "rb") as f:
    features = pickle.load(f)

In [None]:
ModelType = TypeVar("ModelObject")
PandasUDFType = Callable[[pd.DataFrame], pd.DataFrame]

def predict(model: ModelType,
            features: List[str],
            num_classes: int,
            score_column_name: str) -> PandasUDFType:
    """
    Функция-обёртка, параметризирующая функцию pandas-udf

    Arguments
    _________
    model: ML-model object
    features: ML-model features list
    num_classes: Number of scoring classes
    score_column_name: Resulting class index

    Returns
    _______
    predict_udf: Parametrized pandas udf
    """
    schema = f"""
        client_id long,
        report_dt string,
        {',\n'.join([f'class{i + 1}_proba float' for i in range(num_classes)])},
        {score_column_name} byte

    """

    @F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
    def predict_udf(pdf: pd.DataFrame) -> pd.DataFrame:
        """
        Функция мульти-класс инференса ML-модели

        Arguments
        _________
        pdf: Dataframe with client id, client features, report_dt etc.

        Returns
        _______
        result_pdf: Dataframe with scoring results
        """
        X = pd.DataFrame(
            pdf["features"].tolist(),
            columns=features,
        )

        pred = pd.DataFrame(
            model.predict_proba(X),
            columns=[f"class{i + 1}_proba" for i in range(num_classes)],
        ).astype("float32")

        pred_class = pd.Series(
            model.predict(X),
            name=score_column_name,
        ).astype("int8")

        result_pdf = pd.concat(
            objs=[
                feats.loc[:, ["client_id", "report_dt"]],
                pred,
                pred_class,
            ],
            axis=1,
        )
        return result_pdf
    return predict_udf

In [None]:
def scoring(spark: SparkSession,
            df: DataFrame,
            model: ModelType,
            features: List[str],
            num_classes: int,
            num_parts: int = DEFAULT_SHUFFLE_PARTS,
            score_column_name: str = "score") -> DataFrame:
    """
    Функция возвращает spark DataFrame с результатами инференса
    и сервисными полями (i.e. id клиента, дата скоринга, etc.)

    Arguments
    _________
    spark: SparkSession object
    df: DataFrame to score
    model: ML-model object
    features: ML-model features list
    num_classes: Number of scoring classes
    num_parts: Desired number of parts to split scoring dataframe
    score_column_name: Resulting class index

    Returns
    _______
    result_pdf: Dataframe with scoring results
    """
    predict_udf = predict(
        model,
        features,
        num_classes,
        score_column_name,
    )
    # Shuffle keys generation
    key_list = get_key_list(spark, num_parts, KEY_TYPE)
    # Shuffle keys mapping
    key_mapping = (spark
        .createDataFrame(
            [(i, key) for i, key in enumerate(key_list)],
            schema=f"""
                part_num integer, 
                key {KEY_TYPE}
            """
        )
    )

    result_df = (df
        .repartition(num_parts)
        .select(
            F.col("client_id"),
            F.col("report_dt"),
            F.array(features).alias("features"),
            F.spark_partition_id().alias("part_num"),
        )
        .join(
            F.broadcast(key_mapping),
            ["part_num"]
        )
        .select(
            F.col("client_id"),
            F.col("report_dt"),
            F.col("features"),
            F.col("key"),
        )
        .repartition(num_parts, "key")
        .groupBy("key")
        .apply(predict_udf)
    )

    return result_df

In [None]:
result_df = scoring(
    spark,
    df=scoring_df,
    model=model,
    features=features,
    num_classes=5,
    num_parts=101,
)

In [None]:
(result_df.write
    .mode("append")
    .format("parquet")
    .partitionBy("report_dt")
    .saveAsTable("<insert result table here>")
)