In [0]:
%pip install xgboost==2.0.3 sklearn2pmml==0.124.0

In [0]:
import xgboost
import sklearn2pmml

print(f"xgboost version: {xgboost.__version__}")
print(f"sklearn2pmml version: {sklearn2pmml.__version__}")

# Training a XGBoost SparkXGBClassifier

Runtime DBR 16.4 LTS

In [0]:
from sklearn.datasets import load_iris
import pandas as pd

# Load the iris dataset
iris = load_iris()

# Create a pandas DataFrame
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df['target'] = iris.target
iris_df['species'] = iris_df['target'].map({0: 'setosa', 1: 'versicolor', 2: 'virginica'})

# Convert to Spark DataFrame
iris_spark_df = spark.createDataFrame(iris_df)

# Display the dataset
display(iris_spark_df)

In [0]:
from pyspark.ml.feature import VectorAssembler
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import mlflow
import mlflow.spark

# Prepare features using VectorAssembler
feature_cols = ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
iris_features = assembler.transform(iris_spark_df)

# Split data into train (80%) and test (20%)
train_df, test_df = iris_features.randomSplit([0.8, 0.2], seed=42)

print(f"Training set size: {train_df.count()}")
print(f"Test set size: {test_df.count()}")

# Start MLflow run
with mlflow.start_run(run_name="iris_spark_xgboost") as run:
    # Train XGBoost model using SparkXGBClassifier
    spark_xgb = SparkXGBClassifier(
        features_col="features",
        label_col="target",
        num_workers=2,
        max_depth=5,
        n_estimators=100,
        learning_rate=0.1,
        seed=42
    )
    
    # Train the model
    model = spark_xgb.fit(train_df)
    
    # Make predictions
    predictions = model.transform(test_df)
    
    # Evaluate the model
    evaluator = MulticlassClassificationEvaluator(
        labelCol="target",
        predictionCol="prediction",
        metricName="accuracy"
    )
    
    accuracy = evaluator.evaluate(predictions)
    
    # Log parameters and metrics
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("max_depth", 5)
    mlflow.log_param("learning_rate", 0.1)
    mlflow.log_param("num_workers", 2)
    mlflow.log_metric("accuracy", accuracy)
    
    # Log the model
    mlflow.spark.log_model(model, "spark_xgboost_model")
    
    print(f"\nModel Accuracy: {accuracy:.4f}")
    print(f"MLflow Run ID: {run.info.run_id}")

# Display predictions with species names
display(predictions.select("features", "target", "prediction", "probability", "species"))

# Turn the SparkXGBClassifier model into a XGBClassifier


In [0]:
xgboost_model = model.get_booster()

In [0]:
from xgboost import XGBClassifier
import numpy as np

# Create an XGBClassifier with the same parameters
xgb_classifier = XGBClassifier(
    n_estimators=100,
    max_depth=5,
    learning_rate=0.1,
    random_state=42,
    eval_metric='mlogloss',
    objective='multi:softprob',
    num_class=3
)

# Set the underlying booster from the Spark model
xgb_classifier._Booster = xgboost_model

# Set required sklearn attributes for the classifier
xgb_classifier.n_classes_ = 4
#xgb_classifier.classes_ = np.array([0, 1, 2])
xgb_classifier._n_features = 4

print(f"✓ Converted SparkXGBClassifierModel to XGBClassifier")
print(f"  Model type: {type(xgb_classifier)}")
print(f"  Number of classes: {xgb_classifier.n_classes_}")
print(f"  Classes: {xgb_classifier.classes_}")
print(f"  Number of features: {xgb_classifier._n_features}")

# Test prediction on a sample
X_sample = test_df.sample(False, 0.1)
predictions = xgb_classifier.predict(X_sample[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)']].toPandas())
print(f"\nSample predictions: {predictions}")
predictions = model.transform(X_sample)
print(f"\nSample predictions: {predictions[['prediction']].toPandas()}")

# Save the intermediate XGBClassifier to pmml

In [0]:
from sklearn2pmml import sklearn2pmml
from sklearn2pmml.pipeline import PMMLPipeline

# Create a PMML pipeline with the xgb_classifier
pipeline = PMMLPipeline([
    ("classifier", xgb_classifier)
])

# Export to PMML file
pmml_file_path = "xgboost_iris_model.pmml"

try:
    sklearn2pmml(pipeline, pmml_file_path, with_repr=True)
    print(f"✓ Model successfully exported to PMML: {pmml_file_path}")
    
    # Verify file was created
    import os
    if os.path.exists(pmml_file_path):
        file_size = os.path.getsize(pmml_file_path)
        print(f"  File size: {file_size:,} bytes")
except Exception as e:
    print(f"Error exporting to PMML: {e}")
    print(f"\nError details: {type(e).__name__}")

In [0]:
import mlflow

# Get the run ID from the previous MLflow run
run_id = run.info.run_id

# Log the PMML file as an artifact to the existing run
with mlflow.start_run(run_id=run_id):
    mlflow.log_artifact(pmml_file_path, artifact_path="spark_xgboost_model")
    print(f"✓ PMML file uploaded to MLflow run: {run_id}")
    print(f"  Artifact path: spark_xgboost_model/xgboost_iris_model.pmml")
    print(f"  File: {pmml_file_path}")

# Display the MLflow run URL
print(f"\nView in MLflow: #mlflow/experiments/{mlflow.get_experiment(run.info.experiment_id).experiment_id}/runs/{run_id}")