# `spark-xgboost` PySpark wrapper for XGBoost4J-Spark

This notebook is a brief example of how to use the XGBoost4J-Spark library from python. In it, we train a model to predict earning potential from the 'Adult' dataset by putting the model into a standard SparkML pipeline.

Prerequisites:
- Databricks cluster with Databricks Runtime for ML >= 6.6

_or_

- Spark cluster with Spark >= 2.4.5, PySpark >= 2.4.5, XGBoost >= 0.9, MLflow >= 1.8

In [6]:
from sparkxgb import XGBoostClassifier, XGBoostRegressor
from pprint import PrettyPrinter

from pyspark.sql.types import StringType

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
pp = PrettyPrinter()
print(pp)

<pprint.PrettyPrinter object at 0x7fde83dc0fd0>


In [1]:
import os
def get_running_jars(workdir):
    import glob
    return glob.glob("{}/*jar".format(workdir))

workspace=os.path.abspath(os.path.dirname(os.path.realpath('__file__')))
print(workspace)
spark_jars = get_running_jars("{}/../jars/".format(workspace))
print(spark_jars)

#os.environ['PYSPARK_SUBMIT_ARGS'] = "--jars {} pyspark-shell".format(" ,".join(spark_jars))
from pyspark.context import SparkContext

#sc = SparkSession.builder.config("spark.jars",",".join(spark_jars)).getOrCreate()
print(os.environ['PYSPARK_SUBMIT_ARGS'])
spark.sparkContext.getConf().getAll()


/mnt/e/WorkSpace/github/spark/spark-xgboost/examples
['/mnt/e/WorkSpace/github/spark/spark-xgboost/examples/../jars/xgboost4j-example_2.12-1.5.0-SNAPSHOT.jar', '/mnt/e/WorkSpace/github/spark/spark-xgboost/examples/../jars/xgboost4j-flink_2.12-1.5.0-SNAPSHOT.jar', '/mnt/e/WorkSpace/github/spark/spark-xgboost/examples/../jars/xgboost4j-spark_2.12-1.5.0-SNAPSHOT.jar', '/mnt/e/WorkSpace/github/spark/spark-xgboost/examples/../jars/xgboost4j_2.12-1.5.0-SNAPSHOT-tests.jar', '/mnt/e/WorkSpace/github/spark/spark-xgboost/examples/../jars/xgboost4j_2.12-1.5.0-SNAPSHOT.jar']
"--name" "PySparkShell" "--jars" "jars/xgboost4j-spark_2.12-1.5.0-SNAPSHOT.jar,jars/xgboost4j_2.12-1.5.0-SNAPSHOT.jar" "--py-files" "../jars/xgboost4j_2.12-1.5.0-SNAPSHOT.jar,../jars/xgboost4j_2.12-1.5.0-SNAPSHOT.jar" "pyspark-shell"


[('spark.driver.host', 'host.docker.internal'),
 ('spark.app.id', 'local-1628490682370'),
 ('spark.executor.id', 'driver'),
 ('spark.sql.warehouse.dir',
  'file:/mnt/e/WorkSpace/github/spark/spark-xgboost/examples/spark-warehouse'),
 ('spark.submit.pyFiles',
  '/mnt/e/WorkSpace/github/spark/spark-xgboost/jars/xgboost4j_2.12-1.5.0-SNAPSHOT.jar,/mnt/e/WorkSpace/github/spark/spark-xgboost/jars/xgboost4j_2.12-1.5.0-SNAPSHOT.jar'),
 ('spark.driver.port', '55747'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.files',
  'file:///mnt/e/WorkSpace/github/spark/spark-xgboost/jars/xgboost4j_2.12-1.5.0-SNAPSHOT.jar,file:///mnt/e/WorkSpace/github/spark/spark-xgboost/jars/xgboost4j_2.12-1.5.0-SNAPSHOT.jar'),
 ('spark.repl.local.jars',
  'file:/mnt/e/WorkSpace/github/spark/spark-xgboost/examples/jars/xgboost4j-spark_2.12-1.5.0-SNAPSHOT.jar,file:/mnt/e/WorkSpace/github/spark/spark-xgboost/examples/jars/xgboost4j_2.12-1.5.0-SNAPSHOT.jar'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.app.star

In [10]:
col_names = [
  "age", "workclass", "fnlwgt",
  "education", "education-num",
  "marital-status", "occupation",
  "relationship", "race", "sex",
  "capital-gain", "capital-loss",
  "hours-per-week", "native-country",
  "label"
]

# https://archive.ics.uci.edu/ml/machine-learning-databases/adult/
train_sdf, test_sdf = (
  spark.read.csv(
    path="../sparkxgb/tests/data/adult.data",
    inferSchema=True  
  )
  .toDF(*col_names)
  .repartition(200)
  .randomSplit([0.8, 0.2])
)

In [11]:
string_columns = [fld.name for fld in train_sdf.schema.fields if isinstance(fld.dataType, StringType)]
string_col_replacements = [fld + "_ix" for fld in string_columns]
string_column_map=list(zip(string_columns, string_col_replacements))
target = string_col_replacements[-1]
predictors = [fld.name for fld in train_sdf.schema.fields if not isinstance(fld.dataType, StringType)] + string_col_replacements[:-1]
pp.pprint(
  dict(
    string_column_map=string_column_map,
    target_variable=target,
    predictor_variables=predictors
  )
)

{'predictor_variables': ['age',
                         'fnlwgt',
                         'education-num',
                         'capital-gain',
                         'capital-loss',
                         'hours-per-week',
                         'workclass_ix',
                         'education_ix',
                         'marital-status_ix',
                         'occupation_ix',
                         'relationship_ix',
                         'race_ix',
                         'sex_ix',
                         'native-country_ix'],
 'string_column_map': [('workclass', 'workclass_ix'),
                       ('education', 'education_ix'),
                       ('marital-status', 'marital-status_ix'),
                       ('occupation', 'occupation_ix'),
                       ('relationship', 'relationship_ix'),
                       ('race', 'race_ix'),
                       ('sex', 'sex_ix'),
                       ('native-country', 'native-country_ix

In [12]:
si = [StringIndexer(inputCol=fld[0], outputCol=fld[1]) for fld in string_column_map]
va = VectorAssembler(inputCols=predictors, outputCol="features")
pipeline = Pipeline(stages=[*si, va])
fitted_pipeline = pipeline.fit(train_sdf.union(test_sdf))

                                                                                

In [22]:
train_sdf_prepared = fitted_pipeline.transform(train_sdf)
train_sdf_prepared.cache()
train_sdf_prepared.count()

                                                                                

25983

In [7]:
test_sdf_prepared = fitted_pipeline.transform(test_sdf)
test_sdf_prepared.cache()
test_sdf_prepared.count()

21/08/09 14:12:40 INFO FileSourceStrategy: Pushed Filters: 
21/08/09 14:12:40 INFO FileSourceStrategy: Post-Scan Filters: 
21/08/09 14:12:40 INFO FileSourceStrategy: Output Data Schema: struct<_c0: int, _c1: string, _c2: double, _c3: string, _c4: double ... 13 more fields>
21/08/09 14:12:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
21/08/09 14:12:40 INFO CodeGenerator: Code generated in 6.2161 ms
21/08/09 14:12:40 INFO CodeGenerator: Code generated in 7.0873 ms
21/08/09 14:12:40 INFO CodeGenerator: Code generated in 28.8682 ms
21/08/09 14:12:40 INFO MemoryStore: Block broadcast_40 stored as values in memory (estimated size 175.3 KiB, free 433.7 MiB)
21/08/09 14:12:40 INFO MemoryStore: Block broadcast_40_piece0 stored as bytes in memory (estimated size 27.9 KiB, free 433.7 MiB)
21/08/09 14:12:40 INFO BlockManagerInfo: Added broadcast_40_piece0 in memory on host.docker.

21/08/09 14:12:41 INFO MemoryStore: Block rdd_144_1 stored as values in memory (estimated size 13.9 KiB, free 433.8 MiB)
21/08/09 14:12:41 INFO MemoryStore: Block rdd_144_4 stored as values in memory (estimated size 13.1 KiB, free 433.8 MiB)
21/08/09 14:12:41 INFO MemoryStore: Block rdd_144_7 stored as values in memory (estimated size 13.1 KiB, free 433.7 MiB)
21/08/09 14:12:41 INFO MemoryStore: Block rdd_144_3 stored as values in memory (estimated size 17.1 KiB, free 433.8 MiB)
21/08/09 14:12:41 INFO BlockManagerInfo: Added rdd_144_1 in memory on host.docker.internal:58317 (size: 13.9 KiB, free: 434.3 MiB)
21/08/09 14:12:41 INFO BlockManagerInfo: Added rdd_144_4 in memory on host.docker.internal:58317 (size: 13.1 KiB, free: 434.3 MiB)
21/08/09 14:12:41 INFO BlockManagerInfo: Added rdd_144_7 in memory on host.docker.internal:58317 (size: 13.1 KiB, free: 434.3 MiB)
21/08/09 14:12:41 INFO MemoryStore: Block rdd_144_5 stored as values in memory (estimated size 14.9 KiB, free 433.7 MiB)
21

21/08/09 14:12:41 INFO ShuffleBlockFetcherIterator: Getting 1 (12.2 KiB) non-empty blocks including 1 (12.2 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/08/09 14:12:41 INFO ShuffleBlockFetcherIterator: Getting 1 (12.2 KiB) non-empty blocks including 1 (12.2 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/08/09 14:12:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 6 ms
21/08/09 14:12:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 10 ms
21/08/09 14:12:41 INFO MemoryStore: Block rdd_144_18 stored as values in memory (estimated size 10.5 KiB, free 427.4 MiB)
21/08/09 14:12:41 INFO Executor: Finished task 13.0 in stage 30.0 (TID 3634). 3751 bytes result sent to driver
21/08/09 14:12:41 INFO MemoryStore: Block rdd_144_17 stored as values in memory (estimated size 12.7 KiB, free 427.4 MiB)
21/08/09 14:12:41 INFO Executor: Finished task 15.0 in stage 30.0 (TID 3636). 3751 bytes result sent to driver
21/08/09 14:12:41 I

21/08/09 14:12:42 INFO Executor: Running task 34.0 in stage 30.0 (TID 3655)
21/08/09 14:12:42 INFO TaskSetManager: Starting task 36.0 in stage 30.0 (TID 3657) (host.docker.internal, executor driver, partition 36, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:42 INFO Executor: Running task 36.0 in stage 30.0 (TID 3657)
21/08/09 14:12:42 INFO MemoryStore: Block rdd_144_32 stored as values in memory (estimated size 15.6 KiB, free 431.3 MiB)
21/08/09 14:12:42 INFO Executor: Finished task 27.0 in stage 30.0 (TID 3648). 3794 bytes result sent to driver
21/08/09 14:12:42 INFO TaskSetManager: Starting task 37.0 in stage 30.0 (TID 3658) (host.docker.internal, executor driver, partition 37, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:42 INFO BlockManagerInfo: Added rdd_144_32 in memory on host.docker.internal:58317 (size: 15.6 KiB, free: 433.9 MiB)
21/08/09 14:12:42 INFO ShuffleBlockFetcherIterator: Getting 1 (12.2 KiB) non-empty blocks including 1

21/08/09 14:12:42 INFO Executor: Running task 49.0 in stage 30.0 (TID 3670)
21/08/09 14:12:42 INFO MemoryStore: Block rdd_144_37 stored as values in memory (estimated size 16.5 KiB, free 433.1 MiB)
21/08/09 14:12:42 INFO Executor: Finished task 43.0 in stage 30.0 (TID 3664). 3751 bytes result sent to driver
21/08/09 14:12:42 INFO TaskSetManager: Finished task 40.0 in stage 30.0 (TID 3661) in 125 ms on host.docker.internal (executor driver) (40/200)
21/08/09 14:12:42 INFO TaskSetManager: Finished task 42.0 in stage 30.0 (TID 3663) in 128 ms on host.docker.internal (executor driver) (41/200)
21/08/09 14:12:42 INFO TaskSetManager: Finished task 34.0 in stage 30.0 (TID 3655) in 220 ms on host.docker.internal (executor driver) (42/200)
21/08/09 14:12:42 INFO TaskSetManager: Starting task 50.0 in stage 30.0 (TID 3671) (host.docker.internal, executor driver, partition 50, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:42 INFO TaskSetManager: Finished task 44.0 in stage 3

21/08/09 14:12:42 INFO ShuffleBlockFetcherIterator: Getting 1 (13.5 KiB) non-empty blocks including 1 (13.5 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/08/09 14:12:42 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
21/08/09 14:12:42 INFO TaskSetManager: Starting task 68.0 in stage 30.0 (TID 3689) (host.docker.internal, executor driver, partition 68, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:42 INFO TaskSetManager: Finished task 59.0 in stage 30.0 (TID 3680) in 106 ms on host.docker.internal (executor driver) (60/200)
21/08/09 14:12:42 INFO TaskSetManager: Finished task 61.0 in stage 30.0 (TID 3682) in 83 ms on host.docker.internal (executor driver) (61/200)
21/08/09 14:12:42 INFO MemoryStore: Block rdd_144_62 stored as values in memory (estimated size 14.9 KiB, free 428.9 MiB)
21/08/09 14:12:42 INFO Executor: Running task 68.0 in stage 30.0 (TID 3689)
21/08/09 14:12:42 INFO BlockManagerInfo: Added rdd_144_62 in memory

21/08/09 14:12:42 INFO MemoryStore: Block rdd_144_80 stored as values in memory (estimated size 11.6 KiB, free 430.6 MiB)
21/08/09 14:12:42 INFO MemoryStore: Block rdd_144_81 stored as values in memory (estimated size 9.5 KiB, free 432.7 MiB)
21/08/09 14:12:42 INFO TaskSetManager: Finished task 76.0 in stage 30.0 (TID 3697) in 108 ms on host.docker.internal (executor driver) (76/200)
21/08/09 14:12:42 INFO TaskSetManager: Starting task 83.0 in stage 30.0 (TID 3704) (host.docker.internal, executor driver, partition 83, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:42 INFO TaskSetManager: Starting task 84.0 in stage 30.0 (TID 3705) (host.docker.internal, executor driver, partition 84, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:42 INFO Executor: Running task 83.0 in stage 30.0 (TID 3704)
21/08/09 14:12:42 INFO Executor: Running task 84.0 in stage 30.0 (TID 3705)
21/08/09 14:12:42 INFO BlockManagerInfo: Added rdd_144_78 in memory on host.doc

21/08/09 14:12:42 INFO TaskSetManager: Finished task 93.0 in stage 30.0 (TID 3714) in 118 ms on host.docker.internal (executor driver) (96/200)
21/08/09 14:12:42 INFO TaskSetManager: Finished task 95.0 in stage 30.0 (TID 3716) in 107 ms on host.docker.internal (executor driver) (97/200)
21/08/09 14:12:42 INFO MemoryStore: Block rdd_144_98 stored as values in memory (estimated size 15.6 KiB, free 430.5 MiB)
21/08/09 14:12:42 INFO MemoryStore: Block rdd_144_100 stored as values in memory (estimated size 13.5 KiB, free 432.5 MiB)
21/08/09 14:12:42 INFO ShuffleBlockFetcherIterator: Getting 1 (13.5 KiB) non-empty blocks including 1 (13.5 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/08/09 14:12:42 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
21/08/09 14:12:42 INFO BlockManagerInfo: Added rdd_144_98 in memory on host.docker.internal:58317 (size: 15.6 KiB, free: 433.0 MiB)
21/08/09 14:12:42 INFO ShuffleBlockFetcherIterator: Getting 1 (12.2 KiB) non-e

21/08/09 14:12:43 INFO Executor: Finished task 109.0 in stage 30.0 (TID 3730). 3751 bytes result sent to driver
21/08/09 14:12:43 INFO Executor: Running task 112.0 in stage 30.0 (TID 3733)
21/08/09 14:12:43 INFO MemoryStore: Block rdd_144_110 stored as values in memory (estimated size 13.5 KiB, free 432.4 MiB)
21/08/09 14:12:43 INFO Executor: Finished task 99.0 in stage 30.0 (TID 3720). 3794 bytes result sent to driver
21/08/09 14:12:43 INFO Executor: Finished task 111.0 in stage 30.0 (TID 3732). 3708 bytes result sent to driver
21/08/09 14:12:43 INFO Executor: Running task 113.0 in stage 30.0 (TID 3734)
21/08/09 14:12:43 INFO TaskSetManager: Finished task 101.0 in stage 30.0 (TID 3722) in 235 ms on host.docker.internal (executor driver) (105/200)
21/08/09 14:12:43 INFO BlockManagerInfo: Added rdd_144_110 in memory on host.docker.internal:58317 (size: 13.5 KiB, free: 432.8 MiB)
21/08/09 14:12:43 INFO ShuffleBlockFetcherIterator: Getting 1 (12.2 KiB) non-empty blocks including 1 (12.2 K

21/08/09 14:12:43 INFO BlockManagerInfo: Added rdd_144_113 in memory on host.docker.internal:58317 (size: 15.9 KiB, free: 432.7 MiB)
21/08/09 14:12:43 INFO ShuffleBlockFetcherIterator: Getting 1 (12.2 KiB) non-empty blocks including 1 (12.2 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/08/09 14:12:43 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
21/08/09 14:12:43 INFO BlockManagerInfo: Added rdd_144_121 in memory on host.docker.internal:58317 (size: 13.5 KiB, free: 432.7 MiB)
21/08/09 14:12:43 INFO ShuffleBlockFetcherIterator: Getting 1 (12.2 KiB) non-empty blocks including 1 (12.2 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/08/09 14:12:43 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
21/08/09 14:12:43 INFO TaskSetManager: Starting task 126.0 in stage 30.0 (TID 3747) (host.docker.internal, executor driver, partition 126, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:43 INFO Exec

21/08/09 14:12:43 INFO TaskSetManager: Finished task 113.0 in stage 30.0 (TID 3734) in 541 ms on host.docker.internal (executor driver) (121/200)
21/08/09 14:12:43 INFO TaskSetManager: Starting task 137.0 in stage 30.0 (TID 3758) (host.docker.internal, executor driver, partition 137, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:43 INFO Executor: Running task 137.0 in stage 30.0 (TID 3758)
21/08/09 14:12:43 INFO ShuffleBlockFetcherIterator: Getting 1 (12.2 KiB) non-empty blocks including 1 (12.2 KiB) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
21/08/09 14:12:43 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
21/08/09 14:12:43 INFO Executor: Finished task 130.0 in stage 30.0 (TID 3751). 3708 bytes result sent to driver
21/08/09 14:12:43 INFO BlockManagerInfo: Added rdd_144_128 in memory on host.docker.internal:58317 (size: 12.9 KiB, free: 432.5 MiB)
21/08/09 14:12:43 INFO MemoryStore: Block rdd_144_135 stored as values in memory (

21/08/09 14:12:43 INFO Executor: Finished task 148.0 in stage 30.0 (TID 3769). 3708 bytes result sent to driver
21/08/09 14:12:43 INFO MemoryStore: Block rdd_144_150 stored as values in memory (estimated size 11.2 KiB, free 429.8 MiB)
21/08/09 14:12:43 INFO TaskSetManager: Finished task 140.0 in stage 30.0 (TID 3761) in 187 ms on host.docker.internal (executor driver) (144/200)
21/08/09 14:12:43 INFO TaskSetManager: Finished task 146.0 in stage 30.0 (TID 3767) in 87 ms on host.docker.internal (executor driver) (145/200)
21/08/09 14:12:43 INFO TaskSetManager: Finished task 145.0 in stage 30.0 (TID 3766) in 112 ms on host.docker.internal (executor driver) (146/200)
21/08/09 14:12:43 INFO Executor: Running task 152.0 in stage 30.0 (TID 3773)
21/08/09 14:12:43 INFO MemoryStore: Block rdd_144_147 stored as values in memory (estimated size 13.1 KiB, free 431.8 MiB)
21/08/09 14:12:43 INFO Executor: Running task 151.0 in stage 30.0 (TID 3772)
21/08/09 14:12:43 INFO Executor: Finished task 144.

21/08/09 14:12:44 INFO TaskSetManager: Starting task 171.0 in stage 30.0 (TID 3792) (host.docker.internal, executor driver, partition 171, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:44 INFO Executor: Running task 170.0 in stage 30.0 (TID 3791)
21/08/09 14:12:44 INFO MemoryStore: Block rdd_144_167 stored as values in memory (estimated size 14.8 KiB, free 429.5 MiB)
21/08/09 14:12:44 INFO Executor: Running task 171.0 in stage 30.0 (TID 3792)
21/08/09 14:12:44 INFO Executor: Running task 169.0 in stage 30.0 (TID 3790)
21/08/09 14:12:44 INFO TaskSetManager: Starting task 172.0 in stage 30.0 (TID 3793) (host.docker.internal, executor driver, partition 172, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:44 INFO BlockManagerInfo: Added rdd_144_167 in memory on host.docker.internal:58317 (size: 14.8 KiB, free: 432.1 MiB)
21/08/09 14:12:44 INFO Executor: Finished task 165.0 in stage 30.0 (TID 3786). 3751 bytes result sent to driver
21/08/09 14:12:

21/08/09 14:12:44 INFO Executor: Finished task 173.0 in stage 30.0 (TID 3794). 3751 bytes result sent to driver
21/08/09 14:12:44 INFO BlockManagerInfo: Added rdd_144_177 in memory on host.docker.internal:58317 (size: 15.2 KiB, free: 431.9 MiB)
21/08/09 14:12:44 INFO BlockManagerInfo: Added rdd_144_180 in memory on host.docker.internal:58317 (size: 14.4 KiB, free: 431.9 MiB)
21/08/09 14:12:44 INFO BlockManagerInfo: Added rdd_144_178 in memory on host.docker.internal:58317 (size: 12.3 KiB, free: 431.9 MiB)
21/08/09 14:12:44 INFO MemoryStore: Block rdd_144_179 stored as values in memory (estimated size 13.3 KiB, free 429.4 MiB)
21/08/09 14:12:44 INFO TaskSetManager: Starting task 183.0 in stage 30.0 (TID 3804) (host.docker.internal, executor driver, partition 183, NODE_LOCAL, 4442 bytes) taskResourceAssignments Map()
21/08/09 14:12:44 INFO MemoryStore: Block rdd_144_181 stored as values in memory (estimated size 10.5 KiB, free 431.4 MiB)
21/08/09 14:12:44 INFO BlockManagerInfo: Added rdd

21/08/09 14:12:44 INFO BlockManagerInfo: Added rdd_144_195 in memory on host.docker.internal:58317 (size: 14.3 KiB, free: 431.7 MiB)
21/08/09 14:12:44 INFO TaskSetManager: Finished task 197.0 in stage 30.0 (TID 3818) in 48 ms on host.docker.internal (executor driver) (196/200)
21/08/09 14:12:44 INFO MemoryStore: Block rdd_144_199 stored as values in memory (estimated size 14.5 KiB, free 429.1 MiB)
21/08/09 14:12:44 INFO TaskSetManager: Finished task 196.0 in stage 30.0 (TID 3817) in 60 ms on host.docker.internal (executor driver) (197/200)
21/08/09 14:12:44 INFO BlockManagerInfo: Added rdd_144_199 in memory on host.docker.internal:58317 (size: 14.5 KiB, free: 431.6 MiB)
21/08/09 14:12:44 INFO MemoryStore: Block rdd_144_198 stored as values in memory (estimated size 12.4 KiB, free 431.2 MiB)
21/08/09 14:12:44 INFO Executor: Finished task 199.0 in stage 30.0 (TID 3820). 3751 bytes result sent to driver
21/08/09 14:12:44 INFO BlockManagerInfo: Added rdd_144_198 in memory on host.docker.in

6473

In [16]:
xgbParams = dict(
  eta=0.1,
  maxDepth=2,
  missing=0.0,
  objective="binary:logistic",
  numRound=5,
  numWorkers=2
)

xgb = (
  XGBoostClassifier(**xgbParams)
  .setFeaturesCol("features")
  .setLabelCol("label_ix")
)

bce = BinaryClassificationEvaluator(
  rawPredictionCol="rawPrediction",
  labelCol="label_ix"
)
print(xgb)
print(bce)

XGBoostClassifier_e2a902a4c7ec
BinaryClassificationEvaluator_97ee6fb88ccf


In [15]:
param_grid = (
  ParamGridBuilder()
  .addGrid(xgb.eta, [1e-1, 1e-2, 1e-3])
  .addGrid(xgb.maxDepth, [2, 4, 8])
  .build()
)

cv = CrossValidator(
  estimator=xgb,
  estimatorParamMaps=param_grid,
  evaluator=bce,#mce,
  numFolds=5
)
print(cv)

CrossValidator_c7049e7f5fdb


In [14]:
import mlflow
import mlflow.spark

spark_model_name = "best_model_spark"

with mlflow.start_run():
  model = cv.fit(train_sdf_prepared)
  best_params = dict(
    eta_best=model.bestModel.getEta(),
    maxDepth_best=model.bestModel.getMaxDepth()
  )
  mlflow.log_params(best_params)
  
  mlflow.spark.log_model(fitted_pipeline, "featuriser")
  mlflow.spark.log_model(model, spark_model_name)

  metrics = dict(
    roc_test=bce.evaluate(model.transform(test_sdf_prepared))
  )
  mlflow.log_metrics(metrics)

ModuleNotFoundError: No module named 'mlflow'