In [1]:
import findspark
import pyspark
import pyspark.sql
import pyspark.sql.functions as F
import pyspark.ml as ml

from itertools import chain

findspark.init('/opt/spark')

In [2]:
spark = (
    pyspark
    .sql
    .SparkSession
    .builder
    .config('spark.jars', '/usr/share/java/mysql-connector-java.jar')
    .master('local[8]')
    .appName('sber')
    .getOrCreate()
)

spark

22/09/02 03:00:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/02 03:00:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = (
    spark
    .read
    .format('jdbc')
    .option('url', 'jdbc:mysql://localhost:3306/sber')
    .option('driver', 'com.mysql.cj.jdbc.Driver')
    .option('dbtable', 'neo')
    .option('user', 'sber')
    .option('password', 'sber65537')
    .load()
)

df.dtypes

[('id', 'bigint'),
 ('name', 'string'),
 ('est_diameter_min', 'double'),
 ('est_diameter_max', 'double'),
 ('relative_velocity', 'double'),
 ('miss_distance', 'double'),
 ('orbiting_body', 'string'),
 ('sentry_object', 'boolean'),
 ('absolute_magnitude', 'double'),
 ('hazardous', 'boolean')]

Избавляюсь от неинформативных признаков

In [4]:
df = df.drop('name', 'orbiting_body', 'sentry_object')

In [5]:
feature_columns = df.columns
feature_columns.remove('hazardous')

In [6]:
vecAssembler = ml.feature.VectorAssembler(outputCol='features')
vecAssembler.setInputCols(feature_columns)

df = vecAssembler.transform(df)

In [7]:
df = df.withColumn('hazardous', df.hazardous.cast('int'))

In [8]:
total_count = df.count()
class_weights = {x['hazardous']: x['count'] / total_count for x in df.groupBy('hazardous').count().collect()}

In [9]:
mapping_expr = F.create_map([F.lit(x) for x in chain(*class_weights.items())])
df = df.withColumn('weight', mapping_expr[F.col('hazardous')])

In [10]:
df = df.withColumn('val', F.when(F.rand() > 0.9, 1).otherwise(0))
df = df.withColumn('val', df.val.astype('boolean'))

In [11]:
df = df.repartition(8)

In [12]:
clf = ml.classification.GBTClassifier(
    featuresCol='features',
    labelCol='hazardous',
    weightCol='weight',
    validationIndicatorCol='val',
    validationTol=0.01
)

paramGrid = (
    ml
    .tuning
    .ParamGridBuilder()
    .addGrid(clf.maxDepth, [2, 4, 8])
    .addGrid(clf.maxIter, [512, 1024, 2048])
    .addGrid(clf.stepSize, [0.01, 0.1, 0.5])
    .build()
)

crossval = ml.tuning.CrossValidator(
    estimator=clf,
    estimatorParamMaps=paramGrid,
    evaluator=ml.evaluation.BinaryClassificationEvaluator(labelCol='hazardous'),
    numFolds=3,
    parallelism=8
)

cv = crossval.fit(df)

[Stage 6:>                                                          (0 + 1) / 1]

22/09/02 03:01:30 WARN BlockManager: Block rdd_25_0 already exists on this machine; not re-adding it
22/09/02 03:01:30 WARN BlockManager: Block rdd_25_0 already exists on this machine; not re-adding it
22/09/02 03:01:30 WARN BlockManager: Block rdd_25_0 already exists on this machine; not re-adding it
22/09/02 03:01:30 WARN BlockManager: Block rdd_25_0 already exists on this machine; not re-adding it
22/09/02 03:01:30 WARN BlockManager: Block rdd_25_0 already exists on this machine; not re-adding it
22/09/02 03:01:30 WARN BlockManager: Block rdd_25_0 already exists on this machine; not re-adding it
22/09/02 03:01:30 WARN BlockManager: Block rdd_25_0 already exists on this machine; not re-adding it


                                                                                

In [13]:
df_test = df.where(df.val == True)

In [21]:
df_test = cv.bestModel.transform(df_test)

In [27]:
evaluator = ml.evaluation.BinaryClassificationEvaluator()
evaluator.setLabelCol('hazardous')
evaluator.setMetricName('areaUnderROC')
evaluator.evaluate(df_test)

0.9252390679310267

CatBoost победил

В заключении хочу сказать, что в идеале модель нужно выбирать, бустрапируя из тестовой выборки и строя доверительный интервал. В идеале тестовая выборка должна быть одной и той же для всех моделей. Метрика roc_auc считай что метрика по умолчанию. Но для бизнеса чаще актуальнее precision и recall. Так же early_stopping использует в данном случае тестовую выборку, что является небольшим даталиком. Нужно было создавать 3 датасета: Train, Val и Test и использовать Val для early_stopping. Но так как пример игрушечный, я решил пренебречь такими деталями