Open
Description
I have the following code in a Databricks notebook running on a cluster of 8 machines:
from pyspark.sql import SparkSession
import random
from xgboost.spark import SparkXGBRanker
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
# Create the Spark session
spark = SparkSession.builder \
.appName("ExamplePySparkDataFrame") \
.getOrCreate()
# This list will become the dataframe
data = []
# Add some training data
for _ in range(1000):
row = (1973869211, random.randint(0, 1), random.random(), 0)
data.append(row)
# Add some validation data
for _ in range(1000):
row = (2973869211, random.randint(0, 1), random.random(), 1)
data.append(row)
# Create a dataframe from the list of synthetic data
columns = ["qid", "label", "feature", "is_validation"]
df = spark.createDataFrame(data, columns)
# Create vector assember
assembler = VectorAssembler(
inputCols=['feature'],
outputCol="features",
)
# Define the model
xgb_ranker = SparkXGBRanker(
features_col="features",
label_col="label",
qid_col="qid",
validation_indicator_col="is_validation",
num_workers=sc.defaultParallelism,
objective="rank:ndcg",
eval_metric="ndcg@5"
)
# Fit the model
pipeline = Pipeline(stages=[assembler, xgb_ranker])
pipeline_model = pipeline.fit(df)
The code produces this error:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(59, 37) finished unsuccessfully.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/python/lib/python3.12/site-packages/xgboost/spark/core.py", line 1082, in _train_booster
dtrain, dvalid = create_dmatrix_from_partitions(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/xgboost/spark/data.py", line 312, in create_dmatrix_from_partitions
cache_partitions(iterator, append_fn)
File "/databricks/python/lib/python3.12/site-packages/xgboost/spark/data.py", line 59, in cache_partitions
train = part.loc[~part[alias.valid], :]
~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 1067, in __getitem__
return self._getitem_tuple(key)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 1256, in _getitem_tuple
return self._getitem_tuple_same_dim(tup)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 924, in _getitem_tuple_same_dim
retval = getattr(retval, self.name)._getitem_axis(key, axis=i)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 1301, in _getitem_axis
return self._getitem_iterable(key, axis=axis)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 1239, in _getitem_iterable
keyarr, indexer = self._get_listlike_indexer(key, axis)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexing.py", line 1432, in _get_listlike_indexer
keyarr, indexer = ax._get_indexer_strict(key, axis_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexes/base.py", line 6070, in _get_indexer_strict
self._raise_if_missing(keyarr, indexer, axis_name)
File "/databricks/python/lib/python3.12/site-packages/pandas/core/indexes/base.py", line 6130, in _raise_if_missing
raise KeyError(f"None of [{key}] are in the [{axis_name}]")
KeyError: "None of [Int64Index([-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,\n -1, -1, -1, -1, -1, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,\n -2, -2, -2, -2, -2, -2, -2, -2, -2],\n dtype='int64')] are in the [index]"
Other Details:
- xgboost: 2.0.3
- Databricks runtime: 16.4 ML (the problem exists in all other runtime versions too)
- Worker type: i3.2xlarge (61 GB Memory, 8 Cores)
- Worker quantity: 8
- Spark: 3.5.2
- Scala: 2.12
Metadata
Metadata
Assignees
Labels
No labels