In [None]:
!pip install pyspark findspark

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

conf = SparkConf().set('spark.ui.port', '4050').set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')\
                  .set('spark.dynamicAllocation.enabled', 'true')\
                  .set('spark.shuffle.service.enabled', 'true') #трекер, чтобы возвращать ресурсы
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

Анализировать будет датасет отсюда https://www.kaggle.com/shelvigarg/credit-card-buyers

Definition

ID - Unique Identifier for a row

Gender - Gender of the Customer

Age - Age of the Customer (in Years)

Region_Code - Code of the Region for the customers

Occupation - Occupation Type for the customer

Channel_Code - Acquisition Channel Code for the Customer (Encoded)

Vintage - Vintage for the Customer (In Months)

Credit_Product - If the Customer has any active credit product (Home loan Personal loan, Credit Card etc.)

AvgAccountBalance - Average Account Balance for the Customer in last 12 Months

Is_Active - If the Customer is Active in last 3 Months

Загрузим данные и посмотрим, что там внутри

In [None]:
data = spark.read.csv('credit_card_data.csv', header=True, inferSchema=True)

In [None]:
data.printSchema()

In [None]:
data.show()

Посмотрим различные базовые вещи

In [None]:
from pyspark.sql.functions import col,isnan, when, count

In [None]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

Пропуски только в кредитном продукте, логично заметь на тип, что кредита нет

In [None]:
data.select(col('Credit_Product')).groupBy('Credit_Product').count().show()

In [None]:
data = data.fillna({'Credit_Product': 'No'})

Проверим

In [None]:
data.select(col('Credit_Product')).groupBy('Credit_Product').count().show()

Посмотри на данные с точки зрения дисбаланса классов

In [None]:
data.count()

In [None]:
import pyspark.sql.functions as F

In [None]:
data.select(col('Is_Lead'))\
    .groupBy('Is_Lead')\
    .count()\
    .withColumn('count', F.round(col('count') / data.count(), 2))\
    .show()

Ладно, достаточно, мы тут сейчас говорим про MLlib, всякие анализы - тема прошлого семинара

**Некоторые преобразования данных**

Начнем с простой обработки категориальных переменных

In [None]:
from pyspark.ml.feature import StringIndexer, IndexToString, OneHotEncoder

In [None]:
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
gender_indexer = gender_indexer.fit(data)
data = gender_indexer.transform(data)

data.show()

In [None]:
gender_indexer.labels

Обратная трансформация доступна через метод

In [None]:
converter = IndexToString(inputCol="GenderIndex", outputCol="originalGender")
data = converter.transform(data)
data.show()

Давайте аналогично поступим с каналом продаж и типом занятости

In [None]:
occupation_indexer = StringIndexer(inputCol="Occupation", outputCol="OccupationIndex")
occupation_indexer = occupation_indexer.fit(data)
data = occupation_indexer.transform(data)

channel_indexer = StringIndexer(inputCol="Channel_Code", outputCol="ChannelIndex")
channel_indexer = channel_indexer.fit(data)
data = channel_indexer.transform(data)

data.show()

In [None]:
print(f'Occupation len = {len(occupation_indexer.labels)}, Channel_code len = {len(channel_indexer.labels)}')

Тут по 4 категории, что самое простое, что приходит в голову? Правильно - OHE

In [None]:
ohe_encoder = OneHotEncoder(inputCols=["OccupationIndex", "ChannelIndex"],
                        outputCols=["OccupationVector", "ChannelVec"])
ohe_encoder = ohe_encoder.fit(data)
data = ohe_encoder.transform(data)

data.show()

In [None]:
ohe_encoder.categorySizes

Странный формат, не правда ли? Все из-за того, что тут у нас SparseVector

 На 4 категории нужен вектор размерности 3, а дальше храним позицию и 1 там, где нужная категория

In [None]:
data.select(col('OccupationVector')).head()

Теперь все надо собрать в одну структуру, чтобы можно было анализировать данные и строить модели

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
data.show(5)

In [None]:
feature_columns = [
                   'Age',
                   'Vintage',
                   'Avg_Account_Balance',
                   'GenderIndex',
                   'OccupationVector',
                   'ChannelVec'                 
]

In [None]:
df_va = VectorAssembler(inputCols = feature_columns, outputCol = 'features')
data = df_va.transform(data)

In [None]:
data.printSchema()

In [None]:
data.show()

В полученном features можно автоматичеки проанализировать все переменные и если у кого-то уникальных значений меньше заданного вами порога, то они автоматичсеки переведутся в индексы при помощи pyspark.ml.feature import VectorIndexer

**Статистика**

В ml pyspark есть некоторые статистические методы, которые можно использовать для анализа

Корреляция

In [None]:
from pyspark.ml.stat import Correlation

In [None]:
corr = Correlation.corr(data, 'features', method='pearson').collect()[0][0]

In [None]:
corr

In [None]:
corr.toArray()

Можно вычислить корреляцию спирмена

In [None]:
corr = Correlation.corr(data, 'features', method='spearman').collect()[0][0]
corr.toArray()

Можно использовать хи-квадрат тест для оценки независимости каждой переменной в features относительно целевого признака, но этот тест для категориальных переменных, поэтому для примера на одной фиче

In [None]:
from pyspark.ml.stat import ChiSquareTest, KolmogorovSmirnovTest, Summarizer

In [None]:
r = ChiSquareTest.test(data, "OccupationVector", "Is_Lead")

In [None]:
r.show()

KS-тест

In [None]:
data.select(
    F.mean(col('Age')).alias('mean_Age'),
    F.stddev(col('Age')).alias('std_Age')
).collect()

In [None]:
ks = KolmogorovSmirnovTest.test(data, 'Age', 'norm', 44, 15).first()

In [None]:
ks

Еще можно посчитать разные статистики

In [None]:
summarizer = Summarizer.metrics("mean", "count")
data.select(summarizer.summary(data.features)).show(truncate=False)

**Работа с фичами**

Квантизация

In [None]:
from pyspark.ml.feature import QuantileDiscretizer

Обучаем

In [None]:
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="Age", outputCol="Age_quant")
discretizer = discretizer.fit(data)

In [None]:
data = discretizer.transform(data)

In [None]:
data.select('Age', 'Age_quant')\
    .groupby('Age_quant').agg(
        F.min('Age').alias('min_age'),
        F.max('Age').alias('max_age'),
        F.count('Age').alias('count')
    )\
    .orderBy('Age_quant')\
    .show(5)

Заполнить пропуски можно через Imputer

Заполнять пропуски умеет только для числовых переменных, поэтому попробуем на игрушечном примере



In [None]:
from pyspark.ml.feature import Imputer

In [None]:
df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

#стратегия может быть 'mean', 'median', 'mode'
#через setMissingValue(0.0) можно сказать, что пропуски - это 0
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"], strategy='mean')
imputer = imputer.fit(df)
imputer.transform(df).show()

**Pipeline**

Как и в scikit-learn можно создавать пайплайны обработки данных

Мы много делали преобразований, давайте соберем все в 1 пайплайн

In [None]:
from pyspark.ml import Pipeline

In [None]:
#string в индесы
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
occupation_indexer = StringIndexer(inputCol="Occupation", outputCol="OccupationIndex")
channel_indexer = StringIndexer(inputCol="Channel_Code", outputCol="ChannelIndex")

#OHE
ohe_encoder = OneHotEncoder(inputCols=["OccupationIndex", "ChannelIndex"],
                        outputCols=["OccupationVector", "ChannelVec"])

#квантизация
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="Age", outputCol="Age_quant")

#собираем все в вектор
feature_columns = [
                   'Age',
                   'Vintage',
                   'Avg_Account_Balance',
                   'GenderIndex',
                   'OccupationVector',
                   'ChannelVec',
                   'Age_quant'                 
]
vector_assembler = VectorAssembler(inputCols = feature_columns, outputCol = 'features')

собираем все в пайплайн

In [None]:
pipeline = Pipeline(stages=[
                           gender_indexer,
                           occupation_indexer,
                           channel_indexer,
                           ohe_encoder,
                           discretizer,
                           vector_assembler,
])

Давайте заново загрузим данные и сделаем трансформацию

---



In [None]:
data = spark.read.csv('credit_card_data.csv', header=True, inferSchema=True)
data = data.fillna({'Credit_Product': 'No'})
pipeline = pipeline.fit(data)

In [None]:
transformed_data = pipeline.transform(data)

In [None]:
transformed_data.show()

In [None]:
transformed_data.select('Is_Lead', 'features').show(5)

**Модельки**

Пора нам уже что-то обучить, начнем с логрега

In [None]:
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel

In [None]:
lr = LogisticRegression(featuresCol='features', labelCol='Is_Lead', predictionCol='prediction',
                        maxIter=100, probabilityCol='proba')

lr = lr.fit(transformed_data)

Сохраним

In [None]:
lr.save('logreg_model')

Загрузка

In [None]:
lr2 = LogisticRegressionModel.load('logreg_model')

Коэффициенты и метрики

In [None]:
print("Coefficients: " + str(lr.coefficients))
print("Intercept: " + str(lr.intercept))

In [None]:
print("Coefficients: " + str(lr2.coefficients))
print("Intercept: " + str(lr2.intercept))

In [None]:
print(f'ROC_AUC = {lr.summary.areaUnderROC}')

In [None]:
lr.summary.recallByLabel

In [None]:
lr.params

In [None]:
lr.transform(transformed_data.select('Is_Lead', 'features')).show()

**Подбор параметров**

Тут нет всяких hyperopt, optuna...есть стандартная кросс-валидация и поиск по сетке

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

Для этого соберем все в пайплайн. Можно было "вложить" старый пайплайн в новый, но соберем все с самого начала

In [None]:
#string в индесы
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
occupation_indexer = StringIndexer(inputCol="Occupation", outputCol="OccupationIndex")
channel_indexer = StringIndexer(inputCol="Channel_Code", outputCol="ChannelIndex")

#OHE
ohe_encoder = OneHotEncoder(inputCols=["OccupationIndex", "ChannelIndex"],
                        outputCols=["OccupationVector", "ChannelVec"])

#квантизация
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="Age", outputCol="Age_quant")

#собираем все в вектор
feature_columns = [
                   'Age',
                   'Vintage',
                   'Avg_Account_Balance',
                   'GenderIndex',
                   'OccupationVector',
                   'ChannelVec',
                   'Age_quant'                 
]
vector_assembler = VectorAssembler(inputCols = feature_columns, outputCol = 'features')

lr = LogisticRegression(featuresCol='features', labelCol='Is_Lead', predictionCol='prediction',
                        maxIter=100, probabilityCol='proba')

In [None]:
pipeline = Pipeline(stages=[
                           gender_indexer,
                           occupation_indexer,
                           channel_indexer,
                           ohe_encoder,
                           discretizer,
                           vector_assembler,
                           lr
])

Сетка параметров

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(discretizer.numBuckets, [5, 10]) \
    .addGrid(lr.maxIter, [10, 20]) \
    .build()

Разобьем данные на train, test

In [None]:
train, test = data.randomSplit([0.7, 0.3], seed=7)

Описываем стратегию кросс-валидации

In [None]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',
                                                                  labelCol='Is_Lead', metricName='areaUnderROC'),
                          numFolds=2,
                          parallelism=2)

Гоняем сетку. Знаю, перебор по сетке прошлый век, но что поделать)

In [None]:
cvModel = crossval.fit(train)

In [None]:
cvModel.avgMetrics

Параметры

In [None]:
import numpy as np
print(cvModel.getEstimatorParamMaps()[np.argmax(cvModel.avgMetrics)])

Сделаем предикт

In [None]:
test_pred = cvModel.transform(test)

In [None]:
test_pred.show()

Проверим модель

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',
                                          labelCol='Is_Lead', metricName='areaUnderROC')

In [None]:
evaluator.evaluate(test_pred)

Сохраним пайплайн

In [None]:
cvModel.write().save('model')

вместо кросс-валидации можно взять TrainValidationSplit для подбора параметров, это train_test_split

**Ваша любимая домашка**

Кто проходил курс GPU прекрасно знают датасет.
Данные находятся в файле Train_Set_90621.csv
Amount Defaulted - эту переменную нужно удалить=)

Что ожидается? - творчество)

    1) Начните с анализа баланса классов, пропусков, статистик при помощи DataFrame API
    2) Посомтрите статистики, заполните пропуски при помощи уже MLlib
    3) Соберите пайплайн, похожий на наш, где будет обработка данных, обучение моделей и все при помощи Spark
    4) Разбейте данные на train/test + реализуйте подбор параметров одним из способов спарка
    5) Cохраниет пайплайн на диск
    6) Проверьте качество модели на отложенной test выборке


