In [None]:
import ray
import numpy as np
import pandas as pd
import joblib

from ray.util.joblib import register_ray
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC

from cloud_data_cockpit import DataCockpit

In [None]:
ray.init()
register_ray()

## Data Loading and Partitioning with DataCockpit

In this cell, we will initialize the data loader and prepare the dataset for distributed processing with Ray. You should:

1. **Select a CSV file**  
   - For example, use `iris.csv`, which is already available in this workspace.  
   - If you want to use a different CSV, upload it.

2. **Define the number of _chunks_**  
   - Specify into how many partitions (_chunks_) you want to split the dataset.  
   - This allows Ray to distribute the workload efficiently.

3. **Partition the dataset**  
   - Use DataCockpit to divide the CSV into the defined number of _chunks_.  

4. **Run the rest of the notebook with Ray**  
   - Once the dataset is partitioned, Ray will handle parallel processing.  
   - Make sure you have initialized your Ray cluster before proceeding.

In [None]:
data_loader = DataCockpit()

In [None]:
slices = data_loader.get_data_slices()

In [None]:
# Define a remote task to process each slice
@ray.remote
def process_slice(slice_id, data_slice):
    df: pd.DataFrame = data_slice.get_as_pandas()
    # Compute new features
    df["sepal_area"]  = df["sepal_length"] * df["sepal_width"]
    df["petal_ratio"] = df["petal_length"] / df["petal_width"]
    return df

In [None]:

# Process all slices in parallel and collect the DataFrames
futures = [process_slice.remote(i, sl) for i, sl in enumerate(slices)]
dfs = ray.get(futures)

In [None]:
# Concatenate into a single DataFrame and prepare X, y
full_df = pd.concat(dfs, ignore_index=True)
X = full_df[["sepal_area", "petal_ratio"]].values
y = pd.Categorical(full_df["species"]).codes

In [None]:
# Define the hyperparameter search space and RandomizedSearchCV
param_space = {
    "C": np.logspace(-3, 3, 20),
    "gamma": np.logspace(-4, 1, 20),
    "kernel": ["rbf", "poly"],
}
svc = SVC()
search = RandomizedSearchCV(
    svc,
    param_space,
    n_iter=10,
    cv=3,
    verbose=2,
    scoring="accuracy"
)

In [None]:
# Run the search using Ray as the Joblib backend
with joblib.parallel_backend("ray"):
    search.fit(X, y)

In [None]:
# Print results
print("Best parameters:", search.best_params_)
print(f"Best CV accuracy: {search.best_score_:.4f}")

In [None]:
ray.shutdown()