In [1]:
import os, warnings, sys, logging
import mlflow
import pandas as pd
import numpy as np
from datetime import date
# use the GPU-native implementation
from spark_rapids_ml.classification import RandomForestClassifier

from pyspark.sql import SparkSession

import pprint

# Force-clear any hanging Py4J connections
from py4j.java_gateway import java_import

In [2]:
USERNAME = os.environ["PROJECT_OWNER"]
DBNAME = "DEMO_"+USERNAME
CONNECTION_NAME = "go01-aw-dl"
STORAGE =  os.environ["DATA_STORAGE"] 
DATE = date.today()

RAPIDS_JAR = "/home/cdsw/rapids-4-spark_2.12-25.10.0.jar"


LOCAL_PACKAGES = "/home/cdsw/.local/lib/python3.10/site-packages"
# This is where the specific CUDA 12 NVRTC library lives
NVRTC_LIB_PATH = f"{LOCAL_PACKAGES}/nvidia/cuda_nvrtc/lib"
WRITABLE_CACHE_DIR = "/tmp/cupy_cache"



In [3]:
spark = SparkSession.builder \
    .appName("Spark-Rapids-32GB-Final") \
    .config("spark.executor.resource.gpu.vendor", "nvidia.com") \
    .config("spark.executor.resource.gpu.discoveryScript", "/home/cdsw/spark-rapids-ml/getGpusResources.sh") \
    .config("spark.executorEnv.LD_LIBRARY_PATH", f"{NVRTC_LIB_PATH}:{os.environ.get('LD_LIBRARY_PATH', '')}") \
    .config("spark.executorEnv.PYTHONPATH", LOCAL_PACKAGES) \
    .config("spark.executorEnv.CUPY_CACHE_DIR", WRITABLE_CACHE_DIR) \
    .config("spark.driverEnv.CUPY_CACHE_DIR", WRITABLE_CACHE_DIR) \
    .config("spark.driver.memory", "12g") \
    .config("spark.driver.extraJavaOptions", f"-Djava.library.path={NVRTC_LIB_PATH}") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.executor.cores", 3) \
    .config("spark.executor.instances", 1) \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.executor.memory", "10g") \
    .config("spark.executor.resource.gpu.amount", 1) \
    .config("spark.executor.memoryOverhead", "10g") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.sql.broadcastTimeout", "1200") \
    .config("spark.sql.cache.serializer", "com.nvidia.spark.ParquetCachedBatchSerializer") \
    .config('spark.sql.shuffle.partitions', '200') \
    .config("spark.network.timeout", "800s") \
    .config("spark.rapids.sql.enabled", "true") \
    .config("spark.rapids.shims-provider-override", "com.nvidia.spark.rapids.shims.spark351.SparkShimServiceProvider") \
    .config("spark.rapids.memory.pinnedPool.size", "4g") \
    .config("spark.task.resource.gpu.amount", 0.33) \
    .config("spark.jars", RAPIDS_JAR) \
    .config("spark.kerberos.access.hadoopFileSystems", "s3a://go01-demo/user/jprosser/spark-rapids-ml/") \
    .config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
    .config("spark.shuffle.service.enabled", "false") \
    .config('spark.shuffle.file.buffer', '64k') \
    .config('spark.shuffle.spill.compress', 'true') \
    .config("spark.hadoop.fs.defaultFS", "s3a://go01-demo/") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
# View the underlying Java Spark Context
pprint.pprint(f"Java Context Object: {spark.sparkContext._jsc}")

# View the Spark Master (in CML, this usually points to the local container or YARN)
pprint.pprint(f"Master: {spark.sparkContext.master}")

# View the User running the session
pprint.pprint(f"Spark User: {spark.sparkContext.sparkUser()}")


Setting spark.hadoop.yarn.resourcemanager.principal to jprosser


'Java Context Object: org.apache.spark.api.java.JavaSparkContext@7a7278f3'
'Master: k8s://https://172.20.0.1:443'
'Spark User: jprosser'


In [4]:
# Enable CollectLimit so that large datasets are collected on the GPU.
# Not worth it for small datasets
spark.conf.set("spark.rapids.sql.exec.CollectLimitExec", "true")

# Enabled to let the GPU to handle the random sampling of rows for large datasets
spark.conf.set("spark.rapids.sql.exec.SampleExec", "true")

# Enabled to let allow more time for large broadcast joins
spark.conf.set("spark.sql.broadcastTimeout", "1200") # Increase to 20 mins
from pyspark.sql import functions as F

#spark.conf.set("spark.rapids.sql.explain", "ALL")
spark.conf.set("spark.rapids.sql.explain", "NOT_ON_GPU") # Only log when/why the GPU was not selected
spark.conf.set("spark.rapids.sql.variable.float.allow", "true") # Allow float math

# Allow the GPU to cast instead of pushing back to CPU just for cast
spark.conf.set("spark.rapids.sql.castFloatToDouble.enabled", "true") 
spark.conf.set("spark.rapids.sql.format.parquet.enabled", "true")

# Turning off Adaptive Query Execution (AQE) makes the entire SQL plan use the GPU
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [5]:
# Test if the JVM can actually talk to the CUDA driver
cuda_manager = spark._jvm.ai.rapids.cudf.Cuda
print(f"CUDA Driver Version: {cuda_manager.getDriverVersion()}")
print(f"Device Count: {cuda_manager.getDeviceCount()}")
print(f"Dynamic Allocation: {spark.conf.get('spark.dynamicAllocation.enabled')}")
print(f"Executor Instances: {spark.conf.get('spark.executor.instances')}")
print(f"Dynamic Allocation Enabled: {spark.conf.get('spark.dynamicAllocation.enabled')}")

CUDA Driver Version: 12080
Device Count: 1
Dynamic Allocation: false
Executor Instances: 1
Dynamic Allocation Enabled: false


In [6]:
# Test acess to the SQLPlugin
sql_plugin = spark._jvm.com.nvidia.spark.SQLPlugin()

driver_comp = sql_plugin.driverPlugin()


log_manager = spark._jvm.org.apache.log4j.LogManager
level_debug = spark._jvm.org.apache.log4j.Level.DEBUG

logger = driver_comp.log()
log_manager.getLogger("com.nvidia.spark.rapids").setLevel(level_debug)

print(f"Debug enabled for RAPIDS: {driver_comp.isTraceEnabled() or True}")


Debug enabled for RAPIDS: True


In [7]:
# Do a test connection to a DB and show the GPU in action
from pyspark.sql import functions as F

df = spark.read.table("DataLakeTable")
print(f"Columns: {len(df.columns)}")
print(f"Schema: {df.schema}")
# Look for 'Gpu' operators in the output
df.limit(5).explain(mode="formatted")

26/02/02 13:27:50 WARN  client.HiveClientImpl: [Thread-6]: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = 957c8015-78b1-4e53-b297-fc5206e0537b


Columns: 15
Schema: StructType([StructField('age', FloatType(), True), StructField('credit_card_balance', FloatType(), True), StructField('bank_account_balance', FloatType(), True), StructField('mortgage_balance', FloatType(), True), StructField('sec_bank_account_balance', FloatType(), True), StructField('savings_account_balance', FloatType(), True), StructField('sec_savings_account_balance', FloatType(), True), StructField('total_est_nworth', FloatType(), True), StructField('primary_loan_balance', FloatType(), True), StructField('secondary_loan_balance', FloatType(), True), StructField('uni_loan_balance', FloatType(), True), StructField('longitude', FloatType(), True), StructField('latitude', FloatType(), True), StructField('transaction_amount', FloatType(), True), StructField('fraud_trx', IntegerType(), True)])
== Physical Plan ==
GpuColumnarToRow (6)
+- GpuGlobalLimit (5)
   +- GpuShuffleCoalesce (4)
      +- GpuColumnarExchange (3)
         +- GpuLocalLimit (2)
            +- GpuSc

In [8]:
# Access the Java 'SessionState' through the back door
jvm_session_state = spark._jsparkSession.sessionState()

# Check if the Catalyst Optimizer is using the RAPIDS extensions
pprint.pprint(f"Experimental Methods: {jvm_session_state.experimentalMethods()}")

# Access the experimental methods via the JVM bridge
experimental = spark._jsparkSession.sessionState().experimentalMethods()

# Enable SampleExec (another commonly disabled-by-default op)
spark.conf.set("spark.rapids.sql.exec.SampleExec", "true")
spark.conf.set("spark.sql.broadcastTimeout", "1200") # Increase to 20 mins

'Experimental Methods: org.apache.spark.sql.ExperimentalMethods@1c310484'


In [9]:
# Create two large-ish dataframes and join them
# This creates 10 million rows to give the GPU something to chew on
left_df = spark.range(0, 10000000) \
    .withColumn("join_key", F.col("id") % 1000) \
    .withColumn("data_value", F.rand(seed=42) * 100)

right_df = spark.range(0, 1000) \
    .withColumnRenamed("id", "join_key") \
    .withColumn("category", F.concat(F.lit("Category_"), F.col("join_key")))

# We use a inner join here on 'join_key'
# GPUs prefer HASH so we give it a hint
joined_df = left_df.hint("SHUFFLE_HASH").join(right_df, on="join_key", how="inner")

# Perform an Aggregation to trigger a Shuffle
final_result = joined_df.groupBy("category") \
    .agg(F.avg("data_value").alias("avg_val")) \
    .orderBy(F.desc("avg_val"))


final_result.show(10)

# Check the Physical Plan
# Look for 'GpuHashJoin' and 'GpuColumnarExchange'
final_result.explain(mode="formatted")

26/02/02 13:28:24 WARN  scheduler.TaskSchedulerImpl: [task-starvation-timer]: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
26/02/02 13:28:39 WARN  scheduler.TaskSchedulerImpl: [task-starvation-timer]: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
26/02/02 13:28:54 WARN  scheduler.TaskSchedulerImpl: [task-starvation-timer]: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

+------------+------------------+
|    category|           avg_val|
+------------+------------------+
|Category_493| 51.01314531748956|
|Category_425| 50.92732004901775|
|Category_312|50.856966235932894|
|Category_388| 50.82908321026169|
|Category_335| 50.77918679311496|
|Category_414| 50.77466769977464|
|Category_758| 50.76249040325846|
|Category_688|50.729797095056455|
|Category_421| 50.71255550280795|
|Category_754| 50.71146305713495|
+------------+------------------+
only showing top 10 rows

== Physical Plan ==
GpuColumnarToRow (18)
+- GpuSort (17)
   +- GpuShuffleCoalesce (16)
      +- GpuColumnarExchange (15)
         +- GpuHashAggregate (14)
            +- GpuShuffleCoalesce (13)
               +- GpuColumnarExchange (12)
                  +- GpuHashAggregate (11)
                     +- GpuProject (10)
                        +- GpuShuffledSymmetricHashJoin (9)
                           :- GpuColumnarExchange (5)
                           :  +- GpuCoalesceBatches (4)
       

In [10]:
# Transform data into a single vector column 
feature_cols = ["age", "credit_card_balance", "bank_account_balance", "mortgage_balance", "sec_bank_account_balance", "savings_account_balance",
                    "sec_savings_account_balance", "total_est_nworth", "primary_loan_balance", "secondary_loan_balance", "uni_loan_balance",
                    "longitude", "latitude", "transaction_amount"]

# Avoid VectorAssembler as it creates VectorUDT data types that are not GPU Friendly
#assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
#df_assembled = assembler.transform(df)
# Split data into training and test sets
#(training_data, test_data) = df_assembled.randomSplit([0.8, 0.2], seed=1234)

(training_data, test_data) = df.randomSplit([0.8, 0.2], seed=1234)


# Use spark_rapids_ml.classification.RandomForestClassifier

# Import from spark_rapids_ml to use the GPU-native implementation
from spark_rapids_ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define the RAPIDS-native classifier
# As noted above, by using 'featuresCols' (list of strings), we avoid VectorAssembler 
# 
rf_classifier = RandomForestClassifier(
    labelCol="fraud_trx", 
    featuresCols=feature_cols, 
    numTrees=20
)

In [11]:
# Train the model
# This runs the training logic in C++ on the GPU via cuML
print("Training Spark RAPIDS ML model...")
rf_model = rf_classifier.fit(training_data)
print("Model training complete.")

2026-02-02 13:29:18,431 - spark_rapids_ml.classification.RandomForestClassifier - INFO - Training spark-rapids-ml with 1 worker(s) ...


Training Spark RAPIDS ML model...


2026-02-02 13:29:19,132 - spark_rapids_ml.classification.RandomForestClassifier - INFO - Training tasks require the resource(cores=3, gpu=1.0)
2026-02-02 13:30:22,152 - spark_rapids_ml.classification.RandomForestClassifier - INFO - Finished training


Model training complete.


In [13]:
# Predict and optimize the output
# We drop 'probability' and 'rawPrediction' because they are VectorUDT types
# that Spark SQL would otherwise force back to the CPU for formatting.
predictions = rf_model.transform(test_data).drop("probability", "rawPrediction")

# Show results (This will be fully accelerated)
predictions.select("prediction", "fraud_trx").show(5)
# Verify GPU Plan
# You should see 'GpuProject' and 'GpuFilter' nodes without the VectorUDT warning
predictions.explain(mode="formatted")




26/02/02 13:36:02 WARN  util.SparkStringUtils: [Thread-6]: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 7:>                                                          (0 + 2) / 2]

+----------+---------+
|prediction|fraud_trx|
+----------+---------+
|       0.0|        0|
|       0.0|        0|
|       0.0|        0|
|       0.0|        0|
|       0.0|        1|
+----------+---------+
only showing top 5 rows

== Physical Plan ==
GpuColumnarToRow (8)
+- GpuProject (7)
   +- GpuCoalesceBatches (6)
      +- GpuArrowEvalPython (5)
         +- GpuCoalesceBatches (4)
            +- GpuSample (3)
               +- GpuSort (2)
                  +- GpuScan parquet spark_catalog.default.datalaketable (1)


(1) GpuScan parquet spark_catalog.default.datalaketable
Output [15]: [age#0, credit_card_balance#1, bank_account_balance#2, mortgage_balance#3, sec_bank_account_balance#4, savings_account_balance#5, sec_savings_account_balance#6, total_est_nworth#7, primary_loan_balance#8, secondary_loan_balance#9, uni_loan_balance#10, longitude#11, latitude#12, transaction_amount#13, fraud_trx#14]
Batched: true
Eager_IO_Prefetch: false
Location: InMemoryFileIndex [s3a://go01-demo/wareho

                                                                                

In [29]:
import spark_rapids_ml.metrics.MulticlassMetrics as mm
print(f"Available in metrics: {help(mm)}")

Help on module spark_rapids_ml.metrics.MulticlassMetrics in spark_rapids_ml.metrics:

NAME
    spark_rapids_ml.metrics.MulticlassMetrics

DESCRIPTION
    # Copyright (c) 2024, NVIDIA CORPORATION.
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #

CLASSES
    builtins.object
        MulticlassMetrics
    
    class MulticlassMetrics(builtins.object)
     |  MulticlassMetrics(tp: Dict[float, float] = {}, fp: Dict[float, float] = {}, label: Dict[flo

In [33]:
accuracy = predictions.filter("prediction = fraud_trx").count() / predictions.count()

print(f"GPU-Accelerated Accuracy: {accuracy:.4f}")

                                                                                

GPU-Accelerated Accuracy: 0.8999


In [34]:
#from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from spark_rapids_ml.metrics.MulticlassMetrics import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="fraud_trx", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.4f}")



26/02/02 14:27:33 WARN  rapids.GpuOverrides: [Thread-6]: 
! <DeserializeToObjectExec> cannot run on GPU because not all expressions can be replaced; GPU does not currently support the operator class org.apache.spark.sql.execution.DeserializeToObjectExec
  ! <CreateExternalRow> createexternalrow(staticinvoke(class java.lang.Double, ObjectType(class java.lang.Double), valueOf, prediction#238, true, false, true), staticinvoke(class java.lang.Double, ObjectType(class java.lang.Double), valueOf, fraud_trx#1720, true, false, true), staticinvoke(class java.lang.Double, ObjectType(class java.lang.Double), valueOf, 1.0#1721, true, false, true), StructField(prediction,DoubleType,true), StructField(fraud_trx,DoubleType,true), StructField(1.0,DoubleType,false)) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow
    !Expression <StaticInvoke> staticinvoke(class java.lang.Double, ObjectType(class java.lang.D

Test Accuracy: 0.8999
