#  Масштабируемое прогнозирование с использованием scikit-learn и PySpark Pandas UDFs -   User-Defined Functions

Одна из проблем заключается в том, что передача данных между а) процессами Spark Scala на основе Scala, которые отправляют данные между машинами и могут эффективно выполнять преобразования, и б) процесс Python (например, для прогнозирования с помощью scikit-learn) несет некоторые накладные расходы из-за сериализации и межпроцессной коммуникации. Одним из решений для этого является пользовательские функции (UDF) в API DataFrame от PySpark. Вы можете использовать API DataFrame для эффективного выполнения большинства операций на Scala (без необходимости писать на Scala!), Но затем вызывать Python UDF, которые несут накладные расходы на Scala-Python только тогда, когда это необходимо.

Обычные UDF PySpark работают с одним значением каждый момент времени, что приводит к большому объему накладных расходов на Scala-Python. Недавно PySpark добавил Pandas UDFs, которые эффективно конвертируют куски столбцов DataFrame в объекты Pandas Series, через Apache Arrow, чтобы избежать значительных иoverhead обычных UDF. Имея специальные UDF, Pandas Series сохраняют преобразование между представлениями с плавающей запятой Python и NumPy для scikit-learn, как это было бы необходимо для обычного UDF.

### Начальные установки 

In [1]:
!pip install pyarrow



In [2]:
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
import pandas as pd
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, StringType, ArrayType

### Генерирование синтетических данных

In [3]:
# Make some fake data and train a model.
n_samples_test = 100000
n_samples_train = 1000
n_samples_all = n_samples_train + n_samples_test
n_features = 50

X, y = make_classification(n_samples=n_samples_all, n_features=n_features, random_state=123)
X_train, X_test, y_train, y_test = \
    train_test_split(X, y, test_size=n_samples_test, random_state=45)

# Use pandas to put the test data in parquet format to illustrate how to load it up later.
# In real usage, the data might be on S3, Azure Blog Storage, HDFS, etc.
column_names = [f'feature{i}' for i in range(n_features)]
(
    pd.DataFrame(X_test, columns=column_names)
    .reset_index()
    .rename(columns={'index': 'id'})
    .to_parquet('unlabeled_data')
)

### Обучение модели на  scikit-learn

In [4]:
param_grid = {'n_estimators': [100], 'max_depth': [2, 4, None]}
gs_rf = GridSearchCV(
    RandomForestClassifier(random_state=42),
    param_grid=param_grid,
    scoring='roc_auc'
).fit(X_train, y_train)
print('ROC AUC: %.3f' % gs_rf.best_score_)



ROC AUC: 0.959


### Создание Spark драйвера

In [5]:
sc = pyspark.SparkContext(appName="foo")
sqlContext = pyspark.SQLContext(sc)

### Загрузим данные и посмотрим как они выглядят в формате parquet.

В реальном применении обычно приходится делать целый набор ETL процедур после чтения сырых данных, но здесь  это просто загрузка.

In [6]:
df_unlabeled = sqlContext.read.parquet('unlabeled_data')
df_unlabeled

DataFrame[id: bigint, feature0: double, feature1: double, feature2: double, feature3: double, feature4: double, feature5: double, feature6: double, feature7: double, feature8: double, feature9: double, feature10: double, feature11: double, feature12: double, feature13: double, feature14: double, feature15: double, feature16: double, feature17: double, feature18: double, feature19: double, feature20: double, feature21: double, feature22: double, feature23: double, feature24: double, feature25: double, feature26: double, feature27: double, feature28: double, feature29: double, feature30: double, feature31: double, feature32: double, feature33: double, feature34: double, feature35: double, feature36: double, feature37: double, feature38: double, feature39: double, feature40: double, feature41: double, feature42: double, feature43: double, feature44: double, feature45: double, feature46: double, feature47: double, feature48: double, feature49: double, __index_level_0__: bigint]

### Вычисление предикций с использование regular UDF -

Сначала попробуем regular UDF.  При этом будет десериализована одна строка (например, instance, sample, record) в каждый момент времени, сделано прогнозирование, и  prediction будет возвращена, затем будет сериализована послана  обратно в Spark, чтобы скомбинироваться с со всеми другими predictions.

In [7]:
@F.udf(returnType=DoubleType())
def predict_udf(*cols):
    # cols will be a tuple of floats here.
    return float(gs_rf.predict_proba((cols,))[0, 1])


In [8]:
%%time
df_pred_a = df_unlabeled.select(
    F.col('id'),
    predict_udf(*column_names).alias('prediction')
)
df_pred_a.take(5)

CPU times: user 132 ms, sys: 16 ms, total: 148 ms
Wall time: 39.7 s


### Вычисление предикций с использованием Pandas UDF

Теперь используем Pandas UDF (например, vectorized UDF).  В этом случае Spark будет посылать кортеж (tuple) из pandas Series objects с многими строками в один момент времени.  Кортеж будет иметь одну  Series per column/feature, для того чтобы они могли быть переданы UDF.  Обратите внимание, что один из этих Series objects не будут содержать features для всех строк сразу, потому что Spark распределяет  datasets по многим workers.  Мы будем здесь использовать partition size по умолчанию, но эта величина может быть настроена.

In [9]:
@F.pandas_udf(returnType=DoubleType())
def predict_pandas_udf(*cols):
    # cols will be a tuple of pandas.Series here.
    X = pd.concat(cols, axis=1)
    return pd.Series(gs_rf.predict_proba(X)[:, 1])


In [10]:
%%time
df_pred_b = df_unlabeled.select(
    F.col('id'),
    predict_pandas_udf(*column_names).alias('prediction')
)
df_pred_b.take(5)

CPU times: user 124 ms, sys: 12 ms, total: 136 ms
Wall time: 3.78 s


### Вычисление  multiclass predictions

Выше мы просто возвращали одну серию предсказаний для положительного класса, который работает для одной бинарной или зависимых переменных. В Pandas UDF можно также разместить многоклассовые или многосегментные модели. При этом будет возвращен набор списков чисел вместо одного ряда чисел.

In [11]:
@F.pandas_udf(returnType=ArrayType(DoubleType()))
def predict_pandas_udf(*cols):
    X = pd.concat(cols, axis=1)
    return pd.Series(row.tolist() for row in gs_rf.predict_proba(X))


In [12]:
%%time
df_pred_multi = (
    df_unlabeled.select(
        F.col('id'),
        predict_pandas_udf(*column_names).alias('predictions')
    )
    # Select each item of the prediction array into its own column.
    .select(
        F.col('id'),
        *[F.col('predictions')[i].alias(f'prediction_{c}')
          for i, c in enumerate(gs_rf.classes_)]
    )
)
df_pred_multi.take(5)

CPU times: user 116 ms, sys: 16 ms, total: 132 ms
Wall time: 1.98 s


In [13]:
!pip install psutil



In [14]:
import psutil
psutil.cpu_count()

12

In [15]:
psutil.virtual_memory()

svmem(total=33569599488, available=14229962752, percent=57.6, used=18906247168, free=11311476736, active=19825520640, inactive=1309306880, buffers=1173848064, cached=2178027520, shared=16244736, slab=880906240)

In [16]:
sc.parallelize(range(0,10)).count()
sc.parallelize(range(0,20)).count()

20

# End of Slides