### Prerequisites

- **Compatibility with GPUs**:
  - NVIDIA Turing Series (RTX 2000)
  - NVIDIA Tesla T4
  - NVIDIA A100
  - NVIDIA Tesla V100
- **CUDA Toolkit**: Version 11.0 or higher installed on your machine.
- **Apache Spark**: Version 3.x.
- **Java**: Version 8 or 11.
- **RAPIDS Accelerator for Apache Spark**.


### Note

Using GPUs from the Maxwell series (architecture 5.2) will result in the following error:

java.lang.RuntimeException: Device architecture 52 is unsupported. Minimum supported architecture: 70.

Ensure that your GPU meets the minimum required architecture (7.0 or higher, rtx 2000+).

### Note on RAPIDS Initialization

If you see the following messages in the console:

```bash
24/12/29 20:43:16 WARN RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set spark.rapids.sql.enabled to false.
24/12/29 20:43:16 INFO DriverPluginContainer: Initialized driver component for plugin com.nvidia.spark.SQLPlugin.
```
means that's all set up and ready to go.

### Important Information

The latest officially supported version of Apache Spark for RAPIDS is **3.5.3** [1].  
During local deployment, version **3.5.2** was used.

#### Issue with GPU Resource Allocation in Spark during Cross-Validation

While setting up cross-validation in Spark, a problem occurred related to the improper allocation of GPU resources. Specifically, Spark was unable to properly configure access to the RTX 4000 series graphics card. This issue became apparent in the task manager, where the GPU resources were not being utilized, resulting in prolonged execution times.

##### Encountered Problem:
- **GPU Resources Not Allocated**: The RTX 4000 GPU resources were not allocated during the process.
- **Slow Execution**: Due to the lack of GPU utilization, the process execution time was significantly extended.
- **Configuration Suspected**: The issue is likely related to the configuration setting `.config("spark.rapids.sql.concurrentGpuTasks", "10")`, which might have affected the GPU resource allocation.

This issue may need further investigation into the GPU configuration and task concurrency settings in Spark.

---

#### Reference  
[1] NVIDIA Spark RAPIDS Documentation: [https://nvidia.github.io/spark-rapids/docs/download.html](https://nvidia.github.io/spark-rapids/docs/download.html)


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
!tar xf spark-3.5.2-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
!wget https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.1/rapids-4-spark_2.12-24.10.1-cuda11.jar
!wget https://repo1.maven.org/maven2/ai/rapids/cudf/24.10.1/cudf-24.10.1-cuda11.jar

In [None]:
import os

# Define the paths
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['SPARK_HOME'] = "/home/test/and_t/spark-3.5.2-bin-hadoop3" #Replace /home/test/and_t/ with the current working directory.

import findspark
findspark.init() 

In [2]:
os.environ['SPARK_RAPIDS_DIR'] = '/home/test/and_t'  # Replace with the actual RAPIDS Plugin directory
os.environ['SPARK_RAPIDS_PLUGIN_JAR'] = f"{os.environ['SPARK_RAPIDS_DIR']}/rapids-4-spark_2.12-24.10.1-cuda11.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = "--jars "  \
            + f"{os.environ['SPARK_RAPIDS_DIR']}/rapids-4-spark_2.12-24.10.1-cuda11.jar"  \
            + f"{os.environ['SPARK_RAPIDS_DIR']}/cudf-24.10.1-cuda11.jar " \
            + " " + "--master local[*] pyspark-shell"

       

In [None]:
print(os.environ['PYSPARK_SUBMIT_ARGS'])

In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("SparkRAPIDS") \
    .master("local[*]") \
    .getOrCreate()

print(spark.sparkContext.getConf().getAll())

In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("SparkRAPIDS") \
    .config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
    .config('spark.rapids.sql.enabled', 'true') \
    .config('spark.rapids.sql.incompatibleOps.enabled', 'true') \
    .config('spark.rapids.sql.format.csv.read.enabled', 'true') \
    .config("spark.rapids.sql.concurrentGpuTasks", "10") \
    .config("spark.sql.files.maxPartitionBytes", "512m") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config('spark.rapids.sql.format.csv.enabled', 'true') \
    .config("spark.rapids.sql.exec.CollectLimitExec", "true")  \
    .config("spark.rapids.sql.explain", "OFF") \
    .getOrCreate()

spark.sparkContext.addPyFile('/home/test/and_t/cudf-24.10.1-cuda11.jar')
spark.sparkContext.addPyFile('/home/test/and_t/rapids-4-spark_2.12-24.10.1-cuda11.jar')

print(spark.sparkContext.getConf().getAll())

In [None]:
spark.stop()

For testing purpose we will use Credit card fraud detection dataset from Kaggle.
https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud

In [None]:
file_path = "/home/test/and_t/creditcard.csv"
fraud_df = spark.read.csv(file_path, header=True, inferSchema=True)

fraud_df.printSchema()

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Select features and label
feature_columns = [f"V{i}" for i in range(1, 29)] + ["Amount"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(fraud_df)
data = data.select("features", "Class")
data = data.withColumnRenamed("Class", "label")

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Define the logistic regression model
lr = LogisticRegression(maxIter=10)

# Train the model
model = lr.fit(train_data)

# Make predictions
test_results = model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(test_results)

print(f"Test AUC: {roc_auc:.4f}")

In [None]:
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Select features and label
feature_columns = [f"V{i}" for i in range(1, 29)] + ["Amount"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)
data = data.select("features", "Class")
data = data.withColumnRenamed("Class", "label")

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Define the logistic regression model
lr = LogisticRegression()

# Create a parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [10, 50, 100]) \
    .build()

# Define the evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Set up cross-validation
crossval = CrossValidator(estimator=lr, 
                          estimatorParamMaps=param_grid, 
                          evaluator=evaluator, 
                          numFolds=5)

# Train the model with cross-validation
cv_model = crossval.fit(train_data)

# Make predictions
test_results = cv_model.transform(test_data)

# Evaluate the model
roc_auc = evaluator.evaluate(test_results)

print(f"Test AUC: {roc_auc:.4f}")