# CPU Cluster: Parallel Training of ML Models with Ray

This notebook trains **90 ML models** in parallel on a CPU cluster:
- Logistic Regression (15 models)
- SVM (10 models)
- Random Forest (20 models)
- XGBoost (20 models)
- Gradient Boosting (15 models)
- Naive Bayes (10 models)

**Cluster Configuration**: 8 workers, 32 cores per node (256 total cores)

**Note**: Run this notebook on the CPU cluster. Results are saved to `ryuta.ray.model_training_results`.

## 1. Setup and Imports

In [None]:
# Core libraries
import numpy as np
import pandas as pd
from datetime import datetime
import json
import time

# Ray imports
import ray

# Scikit-learn models and utilities
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, precision_score, recall_score
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif

# XGBoost
import xgboost as xgb

# Optuna for Bayesian optimization
import optuna
from optuna.samplers import TPESampler

# PySpark
from pyspark.sql import functions as F

# Suppress Optuna logging
optuna.logging.set_verbosity(optuna.logging.WARNING)

print("All imports successful!")
print(f"Notebook type: CPU CLUSTER")

## 2. Configuration

In [None]:
# Configuration
CONFIG = {
    'table_name': 'ryuta.ray.synthetic_data',
    'results_table': 'ryuta.ray.model_training_results',
    'test_size': 0.2,
    'random_state': 42,
    'n_trials_per_model': 20,  # Bayesian optimization trials
    
    # Cluster configuration
    'cluster_type': 'cpu',
    'n_workers': 8,
    'cores_per_node': 32,
    'total_cores': 256,
    
    # Model distribution (CPU models only)
    'model_distribution': {
        'logistic_regression': 15,
        'svm': 10,
        'random_forest': 20,
        'xgboost': 20,
        'gradient_boosting': 15,
        'naive_bayes': 10
    },
    
    # Model ID offset (CPU models: 0-89)
    'model_id_start': 0
}

CONFIG['n_models_total'] = sum(CONFIG['model_distribution'].values())

print("Configuration loaded successfully!")
print(json.dumps(CONFIG, indent=2))

## 3. Load Data from Delta Table

In [None]:
# Load data from Delta table
print(f"Loading data from {CONFIG['table_name']}...")
df_spark = spark.table(CONFIG['table_name'])

print(f"Total rows: {df_spark.count()}")

# Convert to pandas
df = df_spark.toPandas()
print(f"Data loaded successfully! Shape: {df.shape}")

# Prepare features and labels
feature_columns = [col for col in df.columns if col.startswith('feature_')]
X = df[feature_columns].values
y = df['label'].values

print(f"Features shape: {X.shape}")
print(f"Labels shape: {y.shape}")
print(f"Class distribution: {np.bincount(y)}")

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=CONFIG['test_size'], 
    random_state=CONFIG['random_state'],
    stratify=y
)

print(f"\nTrain set: {X_train.shape}")
print(f"Test set: {X_test.shape}")

## 4. Feature Subset Selection

In [None]:
def generate_feature_subsets(n_features, n_subsets):
    """
    Generate diverse feature subsets using different strategies
    """
    np.random.seed(CONFIG['random_state'])
    subsets = []
    
    # Calculate feature importance scores
    f_scores, _ = f_classif(X_train, y_train)
    mi_scores = mutual_info_classif(X_train, y_train, random_state=CONFIG['random_state'])
    
    top_f_features = np.argsort(f_scores)[::-1]
    top_mi_features = np.argsort(mi_scores)[::-1]
    
    for i in range(n_subsets):
        subset_size = np.random.randint(20, n_features + 1)
        strategy = i % 7
        
        if strategy == 0:
            subset = list(range(n_features))
        elif strategy == 1:
            subset = sorted(np.random.choice(n_features, subset_size, replace=False))
        elif strategy == 2:
            subset = sorted(top_f_features[:subset_size])
        elif strategy == 3:
            subset = sorted(top_mi_features[:subset_size])
        elif strategy == 4:
            start = np.random.randint(0, n_features - subset_size + 1)
            subset = list(range(start, start + subset_size))
        elif strategy == 5:
            n_top = subset_size // 2
            top_features = list(top_f_features[:n_top])
            remaining = [f for f in range(n_features) if f not in top_features]
            random_features = list(np.random.choice(remaining, subset_size - n_top, replace=False))
            subset = sorted(top_features + random_features)
        else:
            n_top = subset_size // 2
            top_features = list(top_mi_features[:n_top])
            remaining = [f for f in range(n_features) if f not in top_features]
            random_features = list(np.random.choice(remaining, subset_size - n_top, replace=False))
            subset = sorted(top_features + random_features)
        
        subsets.append({
            'feature_indices': subset,
            'n_features': len(subset),
            'strategy': ['all', 'random', 'top_f', 'top_mi', 'block', 'f_random_mix', 'mi_random_mix'][strategy]
        })
    
    return subsets

# Generate feature subsets
n_features = X.shape[1]
feature_subsets = generate_feature_subsets(n_features, CONFIG['n_models_total'])

print(f"Generated {len(feature_subsets)} feature subsets")

## 5. Hyperparameter Spaces

In [None]:
# Hyperparameter spaces for CPU models
HYPERPARAMETER_SPACES = {
    'logistic_regression': {
        'C': (1e-4, 1e2, 'log'),
        'penalty': ['l2'],
        'solver': ['lbfgs', 'saga'],
        'max_iter': [500]
    },
    
    'svm': {
        'C': (1e-3, 1e2, 'log'),
        'kernel': ['rbf', 'linear'],
        'gamma': ['scale', 'auto'],
    },
    
    'random_forest': {
        'n_estimators': (50, 300),
        'max_depth': (5, 30),
        'min_samples_split': (2, 20),
        'min_samples_leaf': (1, 10),
        'max_features': ['sqrt', 'log2', None]
    },
    
    'xgboost': {
        'n_estimators': (50, 300),
        'max_depth': (3, 15),
        'learning_rate': (1e-3, 0.3, 'log'),
        'subsample': (0.6, 1.0),
        'colsample_bytree': (0.6, 1.0),
        'min_child_weight': (1, 10),
        'gamma': (0, 5)
    },
    
    'gradient_boosting': {
        'n_estimators': (50, 300),
        'max_depth': (3, 15),
        'learning_rate': (1e-3, 0.3, 'log'),
        'subsample': (0.6, 1.0),
        'min_samples_split': (2, 20),
        'min_samples_leaf': (1, 10)
    },
    
    'naive_bayes': {
        'var_smoothing': (1e-11, 1e-7, 'log')
    }
}

print("Hyperparameter spaces defined!")

## 6. Training Functions

In [None]:
def train_sklearn_model(model_type, hyperparams, X_tr, y_tr, X_val, y_val):
    """Train a scikit-learn model"""
    
    if model_type == 'logistic_regression':
        model = LogisticRegression(**hyperparams, random_state=CONFIG['random_state'])
    elif model_type == 'svm':
        model = SVC(**hyperparams, probability=True, random_state=CONFIG['random_state'])
    elif model_type == 'random_forest':
        model = RandomForestClassifier(**hyperparams, random_state=CONFIG['random_state'], n_jobs=-1)
    elif model_type == 'gradient_boosting':
        model = GradientBoostingClassifier(**hyperparams, random_state=CONFIG['random_state'], n_jobs=-1)
    elif model_type == 'naive_bayes':
        model = GaussianNB(**hyperparams)
    else:
        raise ValueError(f"Unknown model type: {model_type}")
    
    # Scale features
    scaler = StandardScaler()
    X_tr_scaled = scaler.fit_transform(X_tr)
    X_val_scaled = scaler.transform(X_val)
    
    # Train
    model.fit(X_tr_scaled, y_tr)
    
    # Predict
    y_pred = model.predict(X_val_scaled)
    y_pred_proba = model.predict_proba(X_val_scaled)[:, 1]
    
    # Calculate metrics
    metrics = {
        'accuracy': accuracy_score(y_val, y_pred),
        'roc_auc': roc_auc_score(y_val, y_pred_proba),
        'f1': f1_score(y_val, y_pred),
        'precision': precision_score(y_val, y_pred),
        'recall': recall_score(y_val, y_pred)
    }
    
    return metrics


def train_xgboost_model(hyperparams, X_tr, y_tr, X_val, y_val):
    """Train XGBoost model"""
    
    scaler = StandardScaler()
    X_tr_scaled = scaler.fit_transform(X_tr)
    X_val_scaled = scaler.transform(X_val)
    
    model = xgb.XGBClassifier(
        **hyperparams,
        objective='binary:logistic',
        eval_metric='auc',
        random_state=CONFIG['random_state'],
        n_jobs=-1,
        use_label_encoder=False
    )
    
    model.fit(X_tr_scaled, y_tr, eval_set=[(X_val_scaled, y_val)], verbose=False)
    
    y_pred = model.predict(X_val_scaled)
    y_pred_proba = model.predict_proba(X_val_scaled)[:, 1]
    
    metrics = {
        'accuracy': accuracy_score(y_val, y_pred),
        'roc_auc': roc_auc_score(y_val, y_pred_proba),
        'f1': f1_score(y_val, y_pred),
        'precision': precision_score(y_val, y_pred),
        'recall': recall_score(y_val, y_pred)
    }
    
    return metrics

print("Training functions defined!")

## 7. Bayesian Optimization

In [None]:
def suggest_hyperparameters(trial, model_type):
    """Suggest hyperparameters using Optuna"""
    space = HYPERPARAMETER_SPACES[model_type]
    hyperparams = {}
    
    for param_name, param_config in space.items():
        if isinstance(param_config, tuple):
            if len(param_config) == 3 and param_config[2] == 'log':
                hyperparams[param_name] = trial.suggest_float(
                    param_name, param_config[0], param_config[1], log=True
                )
            else:
                hyperparams[param_name] = trial.suggest_int(
                    param_name, param_config[0], param_config[1]
                )
        elif isinstance(param_config, list):
            hyperparams[param_name] = trial.suggest_categorical(param_name, param_config)
    
    return hyperparams


def optimize_hyperparameters(model_type, X_tr, y_tr, X_val, y_val, n_trials=20):
    """Optimize hyperparameters using Bayesian optimization"""
    
    def objective(trial):
        hyperparams = suggest_hyperparameters(trial, model_type)
        
        try:
            if model_type == 'xgboost':
                metrics = train_xgboost_model(hyperparams, X_tr, y_tr, X_val, y_val)
            else:
                metrics = train_sklearn_model(model_type, hyperparams, X_tr, y_tr, X_val, y_val)
            
            return metrics['roc_auc']
        except Exception:
            return 0.5
    
    study = optuna.create_study(
        direction='maximize',
        sampler=TPESampler(seed=CONFIG['random_state'])
    )
    
    study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
    
    return study.best_params, study.best_value

print("Bayesian optimization functions defined!")

## 8. Ray Remote Training Function

In [None]:
@ray.remote
def train_model_with_optimization(
    model_id,
    model_type,
    feature_subset,
    X_train_full,
    y_train_full,
    X_test_full,
    y_test_full,
    n_trials=20
):
    """
    Ray remote function to train a single model
    """
    start_time = time.time()
    
    try:
        # Extract feature subset
        feature_indices = feature_subset['feature_indices']
        X_train = X_train_full[:, feature_indices]
        X_test = X_test_full[:, feature_indices]
        
        # Split for validation
        X_tr, X_val, y_tr, y_val = train_test_split(
            X_train, y_train_full,
            test_size=0.2,
            random_state=CONFIG['random_state'],
            stratify=y_train_full
        )
        
        # Optimize hyperparameters
        best_hyperparams, best_val_score = optimize_hyperparameters(
            model_type, X_tr, y_tr, X_val, y_val, n_trials
        )
        
        # Train final model on full training set
        if model_type == 'xgboost':
            test_metrics = train_xgboost_model(
                best_hyperparams, X_train, y_train_full, X_test, y_test_full
            )
        else:
            test_metrics = train_sklearn_model(
                model_type, best_hyperparams, X_train, y_train_full, X_test, y_test_full
            )
        
        training_time = time.time() - start_time
        
        result = {
            'model_id': model_id,
            'model_type': model_type,
            'cluster_type': 'cpu',
            'n_features_used': len(feature_indices),
            'feature_strategy': feature_subset['strategy'],
            'best_hyperparams': json.dumps(best_hyperparams),
            'best_val_score': best_val_score,
            'accuracy': test_metrics['accuracy'],
            'roc_auc': test_metrics['roc_auc'],
            'f1': test_metrics['f1'],
            'precision': test_metrics['precision'],
            'recall': test_metrics['recall'],
            'training_time': training_time,
            'status': 'success'
        }
        
        return result
        
    except Exception as e:
        return {
            'model_id': model_id,
            'model_type': model_type,
            'cluster_type': 'cpu',
            'status': 'failed',
            'error': str(e),
            'training_time': time.time() - start_time
        }

print("Ray remote training function defined!")

## 9. Initialize Ray

In [None]:
# Initialize Ray cluster using Databricks utilities
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

# Shutdown any existing Ray instance
if ray.is_initialized():
    ray.shutdown()

try:
    # Setup Ray cluster spanning all Spark nodes
    print("Setting up Ray cluster across all nodes...")
    setup_ray_cluster(
        num_worker_nodes=CONFIG['n_workers'],  # 8 worker nodes
        num_cpus_worker_node=CONFIG['cores_per_node'],  # 32 CPUs per worker node
        num_gpus_worker_node=0,  # 0 GPUs for CPU cluster
        collect_log_to_path="/Workspace/Users/ryuta.yoshimatsu@databricks.com/ray_logs"
    )
    
    # Explicitly initialize Ray after cluster setup
    ray.init(address='auto', ignore_reinit_error=True, logging_level='ERROR')
    
except Exception as e:
    print(f"Warning: setup_ray_cluster failed with: {e}")
    print("Falling back to standard Ray initialization...")
    # Fallback to standard initialization
    ray.init(ignore_reinit_error=True, logging_level='ERROR')

print("\nRay initialized successfully on CPU cluster!")
print(f"Available CPUs: {ray.cluster_resources().get('CPU', 0)}")
print(f"Available GPUs: {ray.cluster_resources().get('GPU', 0)}")
print(f"Expected CPUs: {CONFIG['total_cores']} (8 workers Ã— 32 cores)")

# Verify cluster setup
print(f"\nCluster nodes connected: {len(ray.nodes())}")
print(f"Total cluster resources: {ray.cluster_resources()}")

## 10. Generate Model Configurations

In [None]:
# Generate model configurations
model_configs = []
model_id = CONFIG['model_id_start']

for model_type, count in CONFIG['model_distribution'].items():
    for i in range(count):
        feature_subset = feature_subsets[model_id % len(feature_subsets)]
        
        # Determine CPU allocation
        if model_type in ['random_forest', 'xgboost', 'gradient_boosting']:
            n_cpus = 4
        else:
            n_cpus = 1
        
        config = {
            'model_id': model_id,
            'model_type': model_type,
            'feature_subset': feature_subset,
            'n_cpus': n_cpus
        }
        
        model_configs.append(config)
        model_id += 1

print(f"Generated {len(model_configs)} CPU model configurations")
print(f"Model IDs: {CONFIG['model_id_start']} to {model_id - 1}")
print(f"\nModel distribution:")
for model_type, count in CONFIG['model_distribution'].items():
    print(f"  {model_type}: {count}")

## 11. Launch Parallel Training

In [None]:
# Put data in Ray object store
X_train_ref = ray.put(X_train)
y_train_ref = ray.put(y_train)
X_test_ref = ray.put(X_test)
y_test_ref = ray.put(y_test)

print("Data stored in Ray object store")

In [None]:
# Launch training jobs
print(f"\nLaunching {len(model_configs)} training jobs on CPU cluster...\n")
print("="*80)
start_time = time.time()

futures = []
for config in model_configs:
    remote_fn = train_model_with_optimization.options(num_cpus=config['n_cpus'])
    
    future = remote_fn.remote(
        model_id=config['model_id'],
        model_type=config['model_type'],
        feature_subset=config['feature_subset'],
        X_train_full=X_train_ref,
        y_train_full=y_train_ref,
        X_test_full=X_test_ref,
        y_test_full=y_test_ref,
        n_trials=CONFIG['n_trials_per_model']
    )
    
    futures.append(future)

print(f"All {len(futures)} jobs submitted. Waiting for results...\n")

In [None]:
# Collect results
results = []
completed = 0
remaining_futures = futures.copy()

while remaining_futures:
    ready_futures, remaining_futures = ray.wait(remaining_futures, num_returns=1)
    
    for future in ready_futures:
        result = ray.get(future)
        results.append(result)
        completed += 1
        
        if result['status'] == 'success':
            print(f"[{completed}/{len(futures)}] Model {result['model_id']} ({result['model_type']}) - "
                  f"ROC AUC: {result['roc_auc']:.4f} ({result['training_time']:.1f}s)")
        else:
            print(f"[{completed}/{len(futures)}] Model {result['model_id']} FAILED")

total_time = time.time() - start_time
print(f"\n{'='*80}")
print(f"CPU CLUSTER TRAINING COMPLETE")
print(f"{'='*80}")
print(f"Total time: {total_time:.2f}s ({total_time/60:.2f} minutes)")
print(f"Successful: {sum(1 for r in results if r['status'] == 'success')}")
print(f"Failed: {sum(1 for r in results if r['status'] == 'failed')}")

In [None]:
## 12. Cleanup

In [None]:
# Shutdown Ray cluster
shutdown_ray_cluster()
print("Ray cluster shut down successfully!")

## 13. Save Results to Delta Table

In [None]:
# Filter successful results
successful_results = [r for r in results if r['status'] == 'success']

if successful_results:
    # Create DataFrame
    results_df = pd.DataFrame(successful_results)
    results_df['training_timestamp'] = datetime.now().isoformat()
    
    # Convert to Spark DataFrame
    results_spark_df = spark.createDataFrame(results_df)
    
    # Save to Delta table (append mode)
    print(f"\nSaving {len(results_df)} results to {CONFIG['results_table']}...")
    results_spark_df.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable(CONFIG['results_table'])
    
    print(f"Results saved successfully!")
    
    # Show summary statistics
    print(f"\nSummary Statistics:")
    print(f"  Mean ROC AUC: {results_df['roc_auc'].mean():.4f}")
    print(f"  Best ROC AUC: {results_df['roc_auc'].max():.4f}")
    print(f"  Mean Accuracy: {results_df['accuracy'].mean():.4f}")
    print(f"  Best Accuracy: {results_df['accuracy'].max():.4f}")
    print(f"  Total training time: {results_df['training_time'].sum():.2f}s")
else:
    print("No successful results to save.")

## Summary

This notebook successfully trained **90 traditional ML models** on the CPU cluster:
- Efficient parallel execution using Ray
- Bayesian hyperparameter optimization for each model
- Various feature selection strategies
- Results saved to shared Delta table: `ryuta.ray.model_training_results`

**Next Steps**:
1. Run the GPU notebook for PyTorch models
2. Use the analysis notebook to compare all results