In [0]:
# Databricks notebook source

from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from xgboost.spark import SparkXGBClassifier

# Sample data
data = [
    (1, 1.0, 0.1, 0.2, 0.3),
    (0, 0.2, 0.1, 0.4, 0.4),
    (1, 0.3, 0.2, 0.5, 0.5),
    (0, 0.4, 0.3, 0.6, 0.6),
    (1, 0.5, 0.4, 0.7, 0.7),
]

# Create DataFrame
columns = ["label", "feature1", "feature2", "feature3", "feature4"]
df = spark.createDataFrame(data, columns)

# Assemble features into a single vector column
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3", "feature4"],
    outputCol="features"
)

# Define the XGBoost classifier
xgb_classifier = SparkXGBClassifier(
    features_Col="features",
    label_Col="label"
)

# Create a pipeline
pipeline = Pipeline(stages=[assembler, xgb_classifier])

# Train the model
model = pipeline.fit(df)

# Make predictions
predictions = model.transform(df)

# Show predictions
display(predictions.select("label", "features", "prediction"))

# COMMAND ----------

# print(model.stages)

# COMMAND ----------

2024-08-19 08:24:48,992 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'features_Col': 'features', 'label_Col': 'label', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'features_Col': 'features', 'label_Col': 'label', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-4179265285182555>, line 36[0m
[1;32m     33[0m pipeline [38;5;241m=[39m Pipeline(stages[38;5;241m=[39m[assembler, xgb_classifier])
[1;32m     35[0m [38;5;66;03m# Train the model[39;00m
[0;32m---> 36[0m model [38;5;241m=[39m pipeline[38;5;241m.[39mfit(df)
[1;32m     38[0m [38;5;66;03m# Make predictions[39;00m
[1;32m     39[0m predictions [38;5;241m=[39m model[38;5;241m.[39mtransform(df)

File [0;32m/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30[0m, in [0;36m_create_patch_function.<locals>.patched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m call_succeeded [38;5;241m=[39m [38;5;28;01mFalse[39;00m
[1;32m     29[0m [38;5;28;01mtry[39;00m:
[0;32m---> 30[0m     result [38;5;241m=[39m original_method([38;5;28mself

In [0]:
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.linalg import Vectors
df_train = spark.createDataFrame([
    (Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
    (Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1, False, 2.0),
    (Vectors.dense(4.0, 5.0, 6.0), 0, True, 1.0),
    (Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, True, 2.0),
], ["features", "label", "isVal", "weight"])
df_test = spark.createDataFrame([
    (Vectors.dense(1.0, 2.0, 3.0), ),
], ["features"])
xgb_classifier = SparkXGBClassifier(max_depth=5, missing=0.0,
    validation_indicator_col='isVal', weight_col='weight',
    early_stopping_rounds=1, eval_metric='logloss')
xgb_clf_model = xgb_classifier.fit(df_train)
xgb_clf_model.transform(df_test).show()

2024-08-19 06:44:50,216 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 5, 'eval_metric': 'logloss', 'nthread': 1}
	train_call_kwargs_params: {'early_stopping_rounds': 1, 'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': 0.0}


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2551999439718109>, line 15[0m
[1;32m      9[0m df_test [38;5;241m=[39m spark[38;5;241m.[39mcreateDataFrame([
[1;32m     10[0m     (Vectors[38;5;241m.[39mdense([38;5;241m1.0[39m, [38;5;241m2.0[39m, [38;5;241m3.0[39m), ),
[1;32m     11[0m ], [[38;5;124m"[39m[38;5;124mfeatures[39m[38;5;124m"[39m])
[1;32m     12[0m xgb_classifier [38;5;241m=[39m SparkXGBClassifier(max_depth[38;5;241m=[39m[38;5;241m5[39m, missing[38;5;241m=[39m[38;5;241m0.0[39m,
[1;32m     13[0m     validation_indicator_col[38;5;241m=[39m[38;5;124m'[39m[38;5;124misVal[39m[38;5;124m'[39m, weight_col[38;5;241m=[39m[38;5;124m'[39m[38;5;124mweight[39m[38;5;124m'[39m,
[1;32m     14[0m     early_stopping_rounds[38;5;241m=[39m[38;5;241m1[39m, eval_metric[38;5;241m=[39m[38;5;12

In [0]:
# from pyspark.ml.feature import VectorAssembler
# from pyspark.ml import Pipeline
# from pyspark.ml.classification import RandomForestClassifier


# data = [
#     (1, 1.0, 0.1, 0.2, 0.3),
#     (0, 0.2, 0.1, 0.4, 0.4),
#     (1, 0.3, 0.2, 0.5, 0.5),
#     (0, 0.4, 0.3, 0.6, 0.6),
#     (1, 0.5, 0.4, 0.7, 0.7)
# ]

# columns = ["label", "feature1", "feature2", "feature3", "feature4"]
# df = spark.createDataFrame(data, columns)

# assembler = VectorAssembler(
#     inputCols=["feature1", "feature2", "feature3", "feature4"], 
#     outputCol="features"
# )

# rf = RandomForestClassifier(
#     featuresCol="features",
#     labelCol="label",
#     seed=1712
# )

# pipeline = Pipeline(stages=[assembler, rf])
# model = pipeline.fit(df)

# predictions = model.transform(df)

# display(predictions.select("label", "prediction", "probability"))

Downloading artifacts:   0%|          | 0/25 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

label,prediction,probability
1,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.15, 0.85))"
0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.65, 0.35))"
1,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.0, 1.0))"
0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8, 0.2))"
1,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.25, 0.75))"
