# RayDP - Distributed Spark MLLib Model Training

This notebook demonstrates how to properly distribute RayDP workloads across Ray cluster nodes.

In [1]:
# Import necessary libraries
import ray
import raydp
import time
import logging
import warnings
import os
import sys
import numpy as np
sys.path.insert(0, '/home/artifacts/')

from snowflake.ml.data.data_connector import DataConnector
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, when, round as spark_round
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import snowflake.connector
from snowflake.snowpark import Session

print(f"Ray version: {ray.__version__}")
print(f"RayDP version: {raydp.__version__}")


  from .autonotebook import tqdm as notebook_tqdm
2025-07-08 03:05:41,941	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2025-07-08 03:05:42,431	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


Ray version: 2.42.0
RayDP version: 1.6.2


In [2]:
def create_distributed_ray_dataset(train_snowdf, nodes):
    print("Converting to Ray dataset...")
    ray_ds = DataConnector.from_dataframe(train_snowdf).to_ray_dataset()
    num_partitions = len(nodes) * 4  # 4 partitions per node for good distribution
    print(f"Distributing data across {num_partitions} partitions (was 1 partition)")
    ray_ds_distributed = ray_ds.repartition(num_partitions)
    ray_ds_arrow = ray_ds_distributed.map_batches(lambda b: b, batch_format="pyarrow")
    return ray_ds_arrow

def get_distributed_spark_config(cluster_resources, nodes):
    total_cpus = int(cluster_resources.get('CPU', 24))
    total_memory_gb = cluster_resources.get('memory', 72 * 1024**3) / (1024**3)
    num_nodes = len(nodes)
    worker_nodes = [n for n in nodes if 'node:__internal_head__' not in n.get('Resources', {})]
    num_worker_nodes = len(worker_nodes)
    print(f"Enhanced Cluster Analysis:")
    print(f"  Total nodes: {num_nodes}")
    print(f"  Worker nodes: {num_worker_nodes}")
    print(f"  Total CPUs: {total_cpus}")
    print(f"  Total Memory: {total_memory_gb:.1f} GB")
    num_executors = num_nodes * 2  # 2 executors per node instead of 1
    # Calculate cores per executor (leave some for Ray/OS)
    cpus_per_node = total_cpus // num_nodes
    reserved_cpus_per_node = 2  # Reserve for Ray and OS
    available_cpus_per_node = max(1, cpus_per_node - reserved_cpus_per_node)
    executor_cores = max(1, available_cpus_per_node // 2)  # 2 executors per node
    # Calculate memory per executor
    memory_per_node = total_memory_gb / num_nodes
    reserved_memory_per_node = 8  # Reserve for Ray and OS
    available_memory_per_node = max(4, memory_per_node - reserved_memory_per_node)
    executor_memory_gb = max(2, int(available_memory_per_node // 3))  # Conservative
    driver_memory_gb = max(2, int(available_memory_per_node // 3))
    config = {
        'num_executors': num_executors,
        'executor_cores': executor_cores,
        'executor_memory': f"{executor_memory_gb}g",
        'driver_memory': f"{driver_memory_gb}g"
    }
    
    print(f"Optimized Spark Configuration:")
    print(f"  Executors: {config['num_executors']} (was {num_nodes})")
    print(f"  Executor cores: {config['executor_cores']} per executor")
    print(f"  Executor memory: {config['executor_memory']}")
    print(f"  Driver memory: {config['driver_memory']}")
    
    return config


In [3]:
def get_distributed_spark_configs():
    return {
        # Use Java serializer for better compatibility
        "spark.serializer": "org.apache.spark.serializer.JavaSerializer",
        
        # Adaptive query execution
        "spark.sql.adaptive.enabled": "true",
        "spark.sql.adaptive.coalescePartitions.enabled": "true",
        "spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB",
        
        # Network and timeout settings
        "spark.network.timeout": "800s",
        "spark.executor.heartbeatInterval": "60s",
        "spark.storage.blockManagerSlaveTimeoutMs": "600s",
        
        # CRITICAL: Higher parallelism for better distribution
        "spark.sql.shuffle.partitions": "400",  # Higher for better distribution
        "spark.default.parallelism": "400",
        
        # Memory and execution settings
        "spark.sql.execution.arrow.pyspark.enabled": "true",
        "spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
        "spark.task.maxFailures": "3",
        "spark.ml.tree.maxMemoryInMB": "1024",
        
        # Compression settings
        "spark.rdd.compress": "true",
        "spark.io.compression.codec": "snappy",
        "spark.broadcast.blockSize": "4m",
        "spark.sql.autoBroadcastJoinThreshold": "10MB",
        
        # RayDP specific settings
        "spark.sql.adaptive.localShuffleReader.enabled": "false",
        "spark.scheduler.blacklist.enabled": "false",
        
        # CRITICAL: Force executor placement across nodes
        "spark.locality.wait": "1s",
        "spark.locality.wait.process": "1s",
        "spark.locality.wait.node": "1s",
        "spark.locality.wait.rack": "1s"
    }

In [5]:
def configure_ray_logger() -> None:
    #Configure Ray logging
    ray_logger = logging.getLogger("ray")
    ray_logger.setLevel(logging.CRITICAL)

    data_logger = logging.getLogger("ray.data")
    data_logger.setLevel(logging.CRITICAL)

    #Configure root logger
    logger = logging.getLogger()
    logger.setLevel(logging.CRITICAL)

    #Configure Ray's data context
    context = ray.data.DataContext.get_current()
    context.execution_options.verbose_progress = False
    context.enable_operator_progress_bars = False



In [6]:
# Step 1: Connect to Ray cluster
print("1. Connecting to Ray cluster...")
cli = ray.init(address="raydpheadservice:6379", ignore_reinit_error=True, log_to_driver=False)
configure_ray_logger()

# Step 2: Analyze cluster resources
print("\\n2. Analyzing cluster resources...")
cluster_resources = ray.cluster_resources()
nodes = ray.nodes()
print(f"Found {len(nodes)} nodes with {cluster_resources.get('CPU', 0)} total CPUs")

# Step 3: Get distributed Spark configuration
print("\\n3. Calculating distributed Spark configuration...")
spark_config = get_distributed_spark_config(cluster_resources, nodes)
spark_configs = get_distributed_spark_configs()
spark_configs["spark.driver.memory"] = spark_config['driver_memory']

# Step 4: Initialize RayDP with distributed config
print("\\n4. Initializing RayDP with distributed configuration...")
spark = raydp.init_spark(
    app_name="RayDP_Distributed_MLLib_Training",
    num_executors=spark_config['num_executors'],
    executor_cores=spark_config['executor_cores'],
    executor_memory=spark_config['executor_memory'],
    configs=spark_configs
)
spark.sparkContext.setLogLevel("ERROR")
print(f"RayDP initialized with {spark_config['num_executors']} executors")

# Step 5: Load and prepare data properly
print("\\n5. Loading and distributing data...")
def get_session():
    def get_token():
        return open('/snowflake/session/token', 'r').read()
    
    def connection():
        creds = {
            'host': os.getenv('SNOWFLAKE_HOST'),
            'port': os.getenv('SNOWFLAKE_PORT'),
            'protocol': "https",
            'account': os.getenv('SNOWFLAKE_ACCOUNT'),
            'authenticator': "oauth",
            'token': get_token(),
            'warehouse': "LARGE_WH",
            'database': os.getenv('SNOWFLAKE_DATABASE'),
            'schema': os.getenv('SNOWFLAKE_SCHEMA'),
            'client_session_keep_alive': True
        }
        return snowflake.connector.connect(**creds)
    
    return Session.builder.configs({"connection": connection()}).create()

session = get_session()
train_snowdf = session.table("TRAIN_SPARK_MLLIB_DATASET")

ray_ds_distributed = create_distributed_ray_dataset(train_snowdf, nodes)

print("\\n6. Converting to Spark DataFrame with proper partitioning...")
spark_partitions = spark_config['num_executors'] * spark_config['executor_cores']
df = ray_ds_distributed.to_spark(spark).repartition(spark_partitions)
df.cache()  # Cache across executors
row_count = df.count()  # Trigger caching
print(f"✅ DataFrame created with {df.rdd.getNumPartitions()} partitions, {row_count:,} rows")

print("\\n=== SETUP COMPLETE - READY FOR DISTRIBUTED TRAINING ===")

2025-07-08 03:05:52,689	INFO worker.py:1654 -- Connecting to existing Ray cluster at address: raydpheadservice:6379...
2025-07-08 03:05:52,792	INFO worker.py:1832 -- Connected to Ray cluster. View the dashboard at [1m[32m10.244.24.9:8265 [39m[22m


=== APPLYING CRITICAL FIXES FOR DISTRIBUTED RAYDP ===\n
1. Connecting to Ray cluster...


[2025-07-08 03:05:52,839 I 31347 31347] logging.cc:293: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to 1


\n2. Analyzing cluster resources...
Found 4 nodes with 112.0 total CPUs
\n3. Calculating distributed Spark configuration...
Enhanced Cluster Analysis:
  Total nodes: 4
  Worker nodes: 3
  Total CPUs: 112
  Total Memory: 908.1 GB
Optimized Spark Configuration:
  Executors: 8 (was 4)
  Executor cores: 13 per executor
  Executor memory: 73g
  Driver memory: 73g
\n4. Initializing RayDP with distributed configuration...


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/08 03:05:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/08 03:05:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


✅ RayDP initialized with 8 executors
\n5. Loading and distributing data...
Converting to Ray dataset...
Distributing data across 16 partitions (was 1 partition)
\n6. Converting to Spark DataFrame with proper partitioning...


Running 0: 0.00 row [00:00, ? row/s]

Split Repartition 2:   0%|          | 0.00/1.00 [00:00<?, ? row/s][A[A

  *- Split Repartition:   0%|          | 0.00/1.00 [00:01<?, ? row/s][A[A

  *- Split Repartition:   0%|          | 0.00/438k [00:01<?, ? row/s][A[A

  *- Split Repartition: 100%|██████████| 438k/438k [00:01<00:00, 339k row/s][A[A

                                                                                                               

✔️  Dataset execution finished in 2.58 seconds: 100%|██████████| 1.00/1.00 [00:02<00:00, 2.58s/ row]


  *- Split Repartition: 100%|██████████| 438k/438k [00:02<00:00, 339k row/s][A[A

  *- Split Repartition:  63%|██████▎   | 438k/700k [00:02<00:00, 339k row/s][A[A

  *- Split Repartition: 100%|██████████| 700k/700k [00:02<00:00, 260k row/s][A[A

  *- Split Repartition: 100%|██████████| 700k/700k [00:02<00:00, 260k row/s][A[A

  *- Split Repartition: 100%|██████████| 700k/700k [00:02<00:00, 271k row/s]
Running 0: 0.00 row [

✅ DataFrame created with 104 partitions, 700,465 rows
\n=== SETUP COMPLETE - READY FOR DISTRIBUTED TRAINING ===
Before training, please verify in Ray Dashboard that:
- All nodes show activity in the Ray Dashboard
- Spark UI shows executors across multiple hosts
- Data is properly partitioned and cached


                                                                                

In [7]:
def train_distributed_model(df, spark_config):
    print("=== STARTING DISTRIBUTED TRAINING ===\\n")
    
    # Feature preparation
    feature_cols = [f"FEATURE_{i}" for i in range(20)]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="RAW_FEATURES")
    scaler = StandardScaler(inputCol="RAW_FEATURES", outputCol="FEATURES", withStd=True, withMean=True)
    
    # Model optimized for distributed training
    rf = RandomForestClassifier(
        labelCol="TARGET",
        featuresCol="FEATURES",
        numTrees=200,  # More trees for better distribution
        maxDepth=12,
        maxBins=32,
        minInstancesPerNode=5,
        subsamplingRate=0.8,  # Enable subsampling for better distribution
        seed=42
    )
    
    pipeline = Pipeline(stages=[assembler, scaler, rf])
    
    # Split and cache data
    train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
    train_data.cache()
    test_data.cache()
    
    train_count = train_data.count()
    test_count = test_data.count()
    print(f"Training set: {train_count:,} rows")
    print(f"Test set: {test_count:,} rows")
    
    start_time = time.time()
    model = pipeline.fit(train_data)
    training_time = time.time() - start_time
    
    print(f"\\n✅ Training completed in {training_time:.2f} seconds")
    
    # Evaluate model
    print("\\nEvaluating model...")
    predictions = model.transform(test_data)
    
    evaluator_auc = BinaryClassificationEvaluator(labelCol="TARGET", metricName="areaUnderROC")
    evaluator_acc = MulticlassClassificationEvaluator(labelCol="TARGET", predictionCol="prediction", metricName="accuracy")
    
    auc = evaluator_auc.evaluate(predictions)
    accuracy = evaluator_acc.evaluate(predictions)
    
    print(f"\\n📊 Model Performance:")
    print(f"  AUC: {auc:.4f}")
    print(f"  Accuracy: {accuracy:.4f}")
    print(f"  Training time: {training_time:.2f}s")
    
    return model, {'auc': auc, 'accuracy': accuracy, 'training_time': training_time}

# Execute distributed training
print("\\n8. Running distributed training...")
model, metrics = train_distributed_model(df, spark_config)

# Final verification
print("\\n=== FINAL VERIFICATION ===")
print("After training, verify that:")
print("1. All Ray worker nodes showed high CPU utilization during training")
print("2. Memory usage was distributed across all nodes")
print("3. Training completed faster than single-node execution")
print(f"\\nFinal Results: {metrics}")

# Cleanup
print("\\n9. Cleaning up resources...")
df.unpersist()
session.close()
print("✅ Cleanup completed")


\n8. Running distributed training...
=== STARTING DISTRIBUTED TRAINING ===\n


                                                                                

Training set: 560,277 rows
Test set: 140,188 rows
\n🚀 Starting distributed training...
Monitor resource utilization in:
- Ray Dashboard: Check CPU/memory usage across nodes
- Spark UI: Check executor activity and task distribution


                                                                                

\n✅ Training completed in 133.09 seconds
\nEvaluating model...


                                                                                

\n📊 Model Performance:
  AUC: 0.9033
  Accuracy: 0.8169
  Training time: 133.09s
\n=== FINAL VERIFICATION ===
After training, verify that:
1. All Ray worker nodes showed high CPU utilization during training
2. Memory usage was distributed across all nodes
3. Spark UI showed tasks executing on multiple hosts
4. Training completed faster than single-node execution
\nFinal Results: {'auc': 0.9032950657088992, 'accuracy': 0.8168887493936714, 'training_time': 133.09215712547302}
\n9. Cleaning up resources...
✅ Cleanup completed
