# Advanced Ray Pipeline for Machine Learning

## Overview

This notebook demonstrates an advanced machine learning pipeline using Ray and scikit-learn. We'll build a distributed pipeline that includes:

1. Distributed data loading and preprocessing
2. Parallel feature engineering
3. Distributed cross-validation
4. Hyperparameter tuning with Ray Tune
5. Distributed model training
6. Parallel model evaluation
7. Model serving with Ray Serve

This pipeline will showcase Ray's capabilities in handling large-scale machine learning workflows efficiently.

Let's start by importing the necessary libraries and initializing Ray.

In [None]:
import ray
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.serve.drivers import DAGDriver
from ray.serve.deployments import PythonFunctionDeployment
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, roc_auc_score
import joblib
import time

# Initialize Ray
ray.init(num_cpus=8, num_gpus=0)  # Adjust based on your machine's capabilities

## 1. Distributed Data Loading and Preprocessing

We'll start by creating a large synthetic dataset and distributing its loading and preprocessing across multiple Ray tasks.

In [None]:
@ray.remote
def generate_data_chunk(n_samples, n_features, n_classes):
    X, y = make_classification(n_samples=n_samples, n_features=n_features, n_classes=n_classes, random_state=42)
    return pd.DataFrame(X), pd.Series(y)

@ray.remote
def preprocess_data(X, y):
    # Simulate some preprocessing steps
    X = StandardScaler().fit_transform(X)
    return X, y

# Generate and preprocess data in parallel
n_chunks = 10
data_refs = [generate_data_chunk.remote(100000, 50, 2) for _ in range(n_chunks)]
preprocessed_refs = [preprocess_data.remote(*chunk_ref) for chunk_ref in data_refs]

# Collect results
X_list, y_list = zip(*ray.get(preprocessed_refs))
X = np.concatenate(X_list)
y = np.concatenate(y_list)

print(f"Generated dataset shape: {X.shape}")

## 2. Parallel Feature Engineering

Next, we'll perform some feature engineering tasks in parallel.

In [None]:
@ray.remote
def engineer_features(X_chunk):
    # Simulate complex feature engineering
    new_features = np.column_stack([
        np.mean(X_chunk, axis=1),
        np.std(X_chunk, axis=1),
        np.max(X_chunk, axis=1),
        np.min(X_chunk, axis=1),
        np.median(X_chunk, axis=1)
    ])
    return np.hstack([X_chunk, new_features])

# Split data into chunks for parallel processing
chunk_size = len(X) // ray.available_resources()['CPU']
X_chunks = [X[i:i+chunk_size] for i in range(0, len(X), chunk_size)]

# Engineer features in parallel
engineered_chunks = ray.get([engineer_features.remote(chunk) for chunk in X_chunks])
X_engineered = np.vstack(engineered_chunks)

print(f"Engineered dataset shape: {X_engineered.shape}")

## 3. Distributed Cross-Validation

We'll implement a distributed cross-validation function to evaluate our model's performance.

In [None]:
@ray.remote
def cross_validate_fold(X_train, y_train, X_val, y_val, params):
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_val)
    return accuracy_score(y_val, y_pred)

def distributed_cross_validation(X, y, params, n_splits=5):
    from sklearn.model_selection import KFold
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    
    tasks = []
    for train_index, val_index in kf.split(X):
        X_train, X_val = X[train_index], X[val_index]
        y_train, y_val = y[train_index], y[val_index]
        tasks.append(cross_validate_fold.remote(X_train, y_train, X_val, y_val, params))
    
    scores = ray.get(tasks)
    return np.mean(scores)

# Example usage
params = {'n_estimators': 100, 'max_depth': 10}
cv_score = distributed_cross_validation(X_engineered, y, params)
print(f"Cross-validation score: {cv_score:.4f}")

## 4. Hyperparameter Tuning with Ray Tune

Now, we'll use Ray Tune to perform distributed hyperparameter optimization.

In [None]:
def objective(config):
    score = distributed_cross_validation(X_engineered, y, config)
    tune.report(mean_accuracy=score)

search_space = {
    "n_estimators": tune.choice([50, 100, 200]),
    "max_depth": tune.choice([5, 10, 15, 20]),
    "min_samples_split": tune.choice([2, 5, 10]),
    "min_samples_leaf": tune.choice([1, 2, 4])
}

analysis = tune.run(
    objective,
    config=search_space,
    num_samples=20,
    scheduler=ASHAScheduler(metric="mean_accuracy", mode="max"),
    resources_per_trial={"cpu": 2}
)

best_config = analysis.get_best_config(metric="mean_accuracy", mode="max")
print("Best hyperparameters found:", best_config)

## 5. Distributed Model Training

With the best hyperparameters, we'll now train our model using distributed data.

In [None]:
@ray.remote
class DistributedRandomForest:
    def __init__(self, n_estimators, max_depth, min_samples_split, min_samples_leaf):
        self.model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf
        )
    
    def fit(self, X, y):
        self.model.fit(X, y)
    
    def predict(self, X):
        return self.model.predict(X)

# Split data for training
X_train, X_test, y_train, y_test = train_test_split(X_engineered, y, test_size=0.2, random_state=42)

# Create distributed model
dist_rf = DistributedRandomForest.remote(**best_config)

# Train in parallel
chunk_size = len(X_train) // ray.available_resources()['CPU']
train_tasks = []
for i in range(0, len(X_train), chunk_size):
    X_chunk = X_train[i:i+chunk_size]
    y_chunk = y_train[i:i+chunk_size]
    train_tasks.append(dist_rf.fit.remote(X_chunk, y_chunk))

# Wait for training to complete
ray.get(train_tasks)
print("Distributed training completed")

## 6. Parallel Model Evaluation

Now that our model is trained, let's evaluate it using parallel processing.

In [None]:
@ray.remote
def evaluate_chunk(model, X_chunk, y_chunk):
    y_pred = model.predict.remote(X_chunk)
    return ray.get(y_pred), y_chunk

# Evaluate in parallel
chunk_size = len(X_test) // ray.available_resources()['CPU']
eval_tasks = []
for i in range(0, len(X_test), chunk_size):
    X_chunk = X_test[i:i+chunk_size]
    y_chunk = y_test[i:i+chunk_size]
    eval_tasks.append(evaluate_chunk.remote(dist_rf, X_chunk, y_chunk))

# Collect results
results = ray.get(eval_tasks)
y_pred, y_true = zip(*results)
y_pred = np.concatenate(y_pred)
y_true = np.concatenate(y_true)

# Calculate metrics
accuracy = accuracy_score(y_true, y_pred)
auc = roc_auc_score(y_true, y_pred)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test AUC: {auc:.4f}")

## 7. Model Serving with Ray Serve

Finally, let's deploy our trained model using Ray Serve for real-time predictions.

In [None]:
from ray import serve

@serve.deployment(route_prefix="/predict")
class ModelServer:
    def __init__(self, model):
        self.model = model
    
    async def __call__(self, request):
        data = await request.json()
        features = np.array(data['features']).reshape(1, -1)
        prediction = await self.model.predict.remote(features)
        return {'prediction': int(ray.get(prediction)[0])}

# Deploy the model
serve.start()
ModelServer.deploy(model=dist_rf)

print("Model deployed and ready to serve predictions.")
# Simulate a prediction request
import requests
import json
sample_input = X_test[0].tolist()
response = requests.post("http://localhost:8000/predict", json={"features": sample_input})
print(f'Prediction for sample input: {response.json()['prediction']}')

In this final section, we've deployed our trained model using Ray Serve. This allows us to make real-time predictions using a simple HTTP API. The `ModelServer` class wraps our distributed random forest model and handles incoming prediction requests.

## Conclusion

This notebook has demonstrated an advanced machine learning pipeline using Ray and scikit-learn. We've covered several key aspects of distributed machine learning:

1. **Distributed Data Loading and Preprocessing**: We generated and preprocessed a large dataset in parallel, showcasing Ray's ability to handle big data efficiently.

2. **Parallel Feature Engineering**: We performed feature engineering tasks across multiple workers, demonstrating how Ray can speed up computationally intensive tasks.

3. **Distributed Cross-Validation**: We implemented a custom distributed cross-validation function, allowing for faster and more scalable model evaluation.

4. **Hyperparameter Tuning with Ray Tune**: We used Ray Tune to perform distributed hyperparameter optimization, efficiently searching the parameter space.

5. **Distributed Model Training**: We created a distributed version of RandomForestClassifier and trained it in parallel across multiple workers.

6. **Parallel Model Evaluation**: We evaluated our trained model on the test set using parallel processing for faster results.

7. **Model Serving with Ray Serve**: Finally, we deployed our trained model as a scalable prediction service using Ray Serve.

This pipeline demonstrates how Ray can be used to scale up machine learning workflows, from data preprocessing to model deployment. By leveraging distributed computing, we can handle larger datasets, perform more extensive hyperparameter searches, and speed up model training and evaluation.

In an MLOps context, this pipeline showcases several important capabilities:

- **Scalability**: The ability to handle large datasets and complex computations by distributing work across multiple machines or cores.
- **Flexibility**: Ray's API allows for easy parallelization of existing scikit-learn code with minimal changes.
- **End-to-End Workflow**: From data preprocessing to model serving, Ray provides tools for each step of the ML lifecycle.
- **Resource Efficiency**: By dynamically allocating tasks to available resources, Ray helps utilize cluster resources effectively.

To further enhance this pipeline for production use, consider integrating it with other MLOps tools:

- Use MLflow or Weights & Biases for experiment tracking and model versioning.
- Implement data version control using DVC or other data versioning tools.
- Set up continuous integration and deployment (CI/CD) pipelines for automating the model update process.
- Implement monitoring and logging for the deployed model to track performance and detect drift.

By combining Ray with these MLOps practices, you can create robust, scalable, and maintainable machine learning systems capable of handling real-world challenges.