### Environment config

In [None]:
#CoLab Only
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark==1.4.2 catboost==1.0.3

[K     |████████████████████████████████| 76.3 MB 51 kB/s 
[?25h

In [None]:
#CoLab Only
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"
import findspark
findspark.init()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.types import StructField, StructType

<a href='https://catboost.ai/en/docs/concepts/spark-cluster-configuration'>Catboost Spark Cluster config</a>

In [None]:
spark = SparkSession.builder\
    .master('local[*]')\
    .appName('CatBoostWithSpark')\
    .config("spark.jars.packages", "ai.catboost:catboost-spark_3.0_2.12:1.0.3")\
    .config("spark.executor.cores", "2")\
    .config("spark.task.cpus", "2")\
    .config("spark.driver.memory", "2g")\
    .config("spark.driver.memoryOverhead", "2g")\
    .config("spark.executor.memory", "2g")\
    .config("spark.executor.memoryOverhead", "2g")\
    .getOrCreate()

In [None]:
spark

Docs <a href='https://catboost.ai/docs/catboost-spark/3.0_2.12/latest/api/python/'>catboost-spark</a>

In [None]:
import catboost_spark

### Prepare DataSet

Download driver-stat.parquet

In [None]:
!wget -q https://github.com/AlexKbit/pyspark-catboost-example/raw/main/driver-stat.parquet

In [None]:
df = spark.read.parquet('driver-stat.parquet')

In [None]:
df.describe().show()

+-------+--------------------+------------------+------+---------+------------------+------------------+------------------+------------------+-------------------+
|summary|           driver_id|               age|   sex|car_class|driving_experience|speeding_penalties| parking_penalties|total_car_accident|   has_car_accident|
+-------+--------------------+------------------+------+---------+------------------+------------------+------------------+------------------+-------------------+
|  count|              231000|            231000|231000|   231000|            231000|            231000|            231000|            231000|             231000|
|   mean|            Infinity| 36.98296536796537|  null|     null|10.591186147186146|  7.56551948051948|2.1913549783549784|0.6427619047619048|  0.577922077922078|
| stddev|                 NaN|15.520878392934657|  null|     null| 8.797198158271309| 6.378106846000693|2.3833479092331666|0.8109388912838871|0.49389189681841367|
|    min|00002225cb8a4

In [None]:
df.show()

+--------------------+---+------+---------+------------------+------------------+-----------------+------------------+----------------+
|           driver_id|age|   sex|car_class|driving_experience|speeding_penalties|parking_penalties|total_car_accident|has_car_accident|
+--------------------+---+------+---------+------------------+------------------+-----------------+------------------+----------------+
|ab3e7f31ace042fd8...| 29|  male|        B|                 2|                 0|                0|                 0|               0|
|3be9de3ea32045888...| 25|female|        B|                 1|                 2|                0|                 0|               0|
|4b520b87244c4e8db...| 24|female|        D|                 2|                 1|                0|                 0|               0|
|643b83b2860349e28...| 22|  male|        B|                 1|                 2|                0|                 0|               0|
|ec08a7f8c2ce4940b...| 20|female|        C|     

In [None]:
TARGET_LABEL = 'has_car_accident'

In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol=TARGET_LABEL,
    predictionCol="prediction",
    metricName='f1')

In [None]:
train_df, test_df = df.randomSplit([0.75, 0.25])

### Train CatBoost with Pool

In [None]:
sex_indexer = StringIndexer(inputCol='sex',
                            outputCol="sex_index")
car_class_indexer = StringIndexer(inputCol='car_class',
                                  outputCol="car_class_index")
features = ["age", "sex_index", "car_class_index", "driving_experience",
            "speeding_penalties", "parking_penalties", "total_car_accident"]
assembler = VectorAssembler(inputCols=features, outputCol='features')

In [None]:
def prepare_vector(df: DataFrame)-> DataFrame:
  result_df = sex_indexer.fit(df).transform(df)
  result_df = car_class_indexer.fit(result_df).transform(result_df)
  result_df = assembler.transform(result_df)
  return result_df

In [None]:
train = prepare_vector(train_df)
test = prepare_vector(test_df)

In [None]:
train.show()

+--------------------+---+------+---------+------------------+------------------+-----------------+------------------+----------------+---------+---------------+--------------------+
|           driver_id|age|   sex|car_class|driving_experience|speeding_penalties|parking_penalties|total_car_accident|has_car_accident|sex_index|car_class_index|            features|
+--------------------+---+------+---------+------------------+------------------+-----------------+------------------+----------------+---------+---------------+--------------------+
|00002225cb8a44ebb...| 31|  male|        S|                15|                 2|                0|                 0|               0|      0.0|            4.0|[31.0,0.0,4.0,15....|
|0000c3a3477346d09...| 23|female|        B|                 5|                15|                3|                 1|               0|      1.0|            0.0|[23.0,1.0,0.0,5.0...|
|0000d27b07fa4cdfb...| 24|  male|        B|                 0|                12|    

In [None]:
train_pool = catboost_spark.Pool(train.select(['features', TARGET_LABEL]))
train_pool.setLabelCol(TARGET_LABEL)
train_pool.setFeaturesCol('features')

Pool_a697a621cd08

In [None]:
classifier = catboost_spark.CatBoostClassifier(featuresCol='features', labelCol=TARGET_LABEL)
classifier.setIterations(50)
classifier.setDepth(5)

CatBoostClassifier_67ea50afe67e

In [None]:
model = classifier.fit(train_pool)
predict = model.transform(test)
print(f'Model F1 = {evaluator.evaluate(predict)}')

Model F1 = 0.8682583196654725


In [None]:
predict.show()

+--------------------+---+------+---------+------------------+------------------+-----------------+------------------+----------------+---------+---------------+--------------------+--------------------+--------------------+----------+
|           driver_id|age|   sex|car_class|driving_experience|speeding_penalties|parking_penalties|total_car_accident|has_car_accident|sex_index|car_class_index|            features|       rawPrediction|         probability|prediction|
+--------------------+---+------+---------+------------------+------------------+-----------------+------------------+----------------+---------+---------------+--------------------+--------------------+--------------------+----------+
|0002bb0c520b4145b...| 20|  male|        D|                 9|                13|                4|                 1|               1|      1.0|            3.0|[20.0,1.0,3.0,9.0...|[-1.1225048477517...|[0.09578078839621...|       1.0|
|0002e524012748159...| 29|female|        C|             

In [None]:
model.saveNativeModel('catboost_native')

In [None]:
model.write().overwrite().save('catboost_spark')

### Create META data for features

In [None]:
def create_metadata(features, categ_nums):
      numericAttrs = []
      nominalAttrs = []
      for i, feature_name in enumerate(features):
        if feature_name in categ_nums:
            nominalAttrs.append({"num_vals": categ_nums[feature_name], "idx": i, "name": feature_name})
        else:
            numericAttrs.append({"idx": i, "name": feature_name})

      attrs = {}
      if numericAttrs:
          attrs["numeric"] = numericAttrs
      if nominalAttrs:
          attrs["nominal"] = nominalAttrs

      return {"ml_attr": {"attrs": attrs, "num_attrs": len(features)}}

In [None]:
feature_metadata = create_metadata(features, {'sex_index': 2, 'car_class_index': 5})
feature_metadata

{'ml_attr': {'attrs': {'nominal': [{'idx': 1,
     'name': 'sex_index',
     'num_vals': 2},
    {'idx': 2, 'name': 'car_class_index', 'num_vals': 5}],
   'numeric': [{'idx': 0, 'name': 'age'},
    {'idx': 3, 'name': 'driving_experience'},
    {'idx': 4, 'name': 'speeding_penalties'},
    {'idx': 5, 'name': 'parking_penalties'},
    {'idx': 6, 'name': 'total_car_accident'}]},
  'num_attrs': 7}}

### Train with META about categorical features

In [None]:
train_meta = train.withColumn("features_with_meta", col("features").alias("", metadata=feature_metadata))
test_meta = test.withColumn("features_with_meta", col("features").alias("", metadata=feature_metadata))

In [None]:
train_meta.schema.fields

[StructField(driver_id,StringType,true),
 StructField(age,IntegerType,true),
 StructField(sex,StringType,true),
 StructField(car_class,StringType,true),
 StructField(driving_experience,IntegerType,true),
 StructField(speeding_penalties,IntegerType,true),
 StructField(parking_penalties,IntegerType,true),
 StructField(total_car_accident,IntegerType,true),
 StructField(has_car_accident,IntegerType,true),
 StructField(sex_index,DoubleType,false),
 StructField(car_class_index,DoubleType,false),
 StructField(features,VectorUDT,true),
 StructField(features_with_meta,VectorUDT,true)]

In [None]:
train_meta.schema.fields[-1].metadata

{'ml_attr': {'attrs': {'nominal': [{'idx': 1,
     'name': 'sex_index',
     'num_vals': 2},
    {'idx': 2, 'name': 'car_class_index', 'num_vals': 5}],
   'numeric': [{'idx': 0, 'name': 'age'},
    {'idx': 3, 'name': 'driving_experience'},
    {'idx': 4, 'name': 'speeding_penalties'},
    {'idx': 5, 'name': 'parking_penalties'},
    {'idx': 6, 'name': 'total_car_accident'}]},
  'num_attrs': 7}}

In [None]:
classifier = catboost_spark.CatBoostClassifier(featuresCol='features_with_meta', labelCol=TARGET_LABEL)
classifier.setIterations(50)
classifier.setDepth(5)

CatBoostClassifier_880c96b1da1a

In [None]:
model = classifier.fit(train_meta)
predict = model.transform(test_meta)
print(f'Model F1 = {evaluator.evaluate(predict)}')

Model F1 = 0.9065764528064353


### Pipeline model with CatBoost
<a href='https://catboost.ai/docs/catboost-spark/3.0_2.12/latest/api/python/api/catboost_spark.CatBoostClassifier.html?highlight=catboostclassifier#catboostclassifier'>CatBoostClassifier</a>

In [None]:
sex_indexer = StringIndexer(inputCol='sex',
                            outputCol="sex_index")
car_class_indexer = StringIndexer(inputCol='car_class',
                                  outputCol="car_class_index")
features = ["age", "sex_index", "car_class_index", "driving_experience",
            "speeding_penalties", "parking_penalties", "total_car_accident"]
assembler = VectorAssembler(inputCols=features, outputCol='features')

In [None]:
classifier = catboost_spark.CatBoostClassifier(featuresCol='features', labelCol=TARGET_LABEL)
classifier.setIterations(50)
classifier.setDepth(5)

CatBoostClassifier_3cba290f028e

In [None]:
pipeline = Pipeline(stages=[sex_indexer, car_class_indexer, assembler, classifier])

In [None]:
p_model = pipeline.fit(train_df)

In [None]:
test_df.show()

+--------------------+---+------+---------+------------------+------------------+-----------------+------------------+----------------+
|           driver_id|age|   sex|car_class|driving_experience|speeding_penalties|parking_penalties|total_car_accident|has_car_accident|
+--------------------+---+------+---------+------------------+------------------+-----------------+------------------+----------------+
|00002225cb8a44ebb...| 31|  male|        S|                15|                 2|                0|                 0|               0|
|0000c3a3477346d09...| 23|female|        B|                 5|                15|                3|                 1|               0|
|0002932e07054b12b...| 29|female|        C|                 6|                 6|                2|                 0|               0|
|0002e524012748159...| 29|female|        C|                 6|                15|                5|                 1|               1|
|00032b6458bc42b3b...| 23|  male|        D|     

In [None]:
predictions = p_model.transform(test_df)

In [None]:
predictions.show()

+--------------------+---+------+---------+------------------+------------------+-----------------+------------------+----------------+---------+---------------+--------------------+--------------------+--------------------+----------+
|           driver_id|age|   sex|car_class|driving_experience|speeding_penalties|parking_penalties|total_car_accident|has_car_accident|sex_index|car_class_index|            features|       rawPrediction|         probability|prediction|
+--------------------+---+------+---------+------------------+------------------+-----------------+------------------+----------------+---------+---------------+--------------------+--------------------+--------------------+----------+
|00002225cb8a44ebb...| 31|  male|        S|                15|                 2|                0|                 0|               0|      1.0|            4.0|[31.0,1.0,4.0,15....|[1.15140076465038...|[0.90910879480527...|       0.0|
|0000c3a3477346d09...| 23|female|        B|             

In [None]:
print(f'Model F1 = {evaluator.evaluate(predictions)}')

Model F1 = 0.9065764528064353


In [None]:
type(p_model)

pyspark.ml.pipeline.PipelineModel

In [None]:
p_model.write().overwrite().save('catboost_pipeline')

### Catboost limitations
<a href='https://catboost.ai/en/docs/concepts/spark-known-limitations'>List of limitations</a>