## Distributed ML Pipeline: Ray Data Processing + Kubeflow Training Demo

This notebook demonstrates how to combine Ray's distributed data processing with kubeflow-training SDK for end-to-end ML pipelines using IBM Granite models.


### Distributed Data Processing with Synthetic Data Generation + Training
```
Phase 1: Ray Data Processing (CodeFlare SDK)
GSM8K →   Ray Cluster →    Synthetic Data Generation
(7.5K)   (Distributed)   (Ray Multi-worker ~ 50K+ samples)

Phase 2: Distributed Training (kubeflow-training SDK)  
Synthetic Dataset → PyTorchJob → Fine-tune Granite Model
(50K+ samples)      (Multi-node)   (LoRA adapters)
```

### Features Demonstrated

**Phase 1: Ray Data Processing**
- **CodeFlare SDK**: Ray cluster deployment and management on Kubernetes
- **Ray Job Submission**: Distributed synthetic data generation using Ray workers

**Phase 2: Distributed Training**  
- **kubeflow-training SDK**: PyTorchJob creation and management
- **TRL + PEFT**: Modern fine-tuning with LoRA adapters
- **Distributed Training**: Multi-node GPU coordination 

### Prerequisites
- Red Hat OpenShift AI installed
- NVIDIA and Node Feature Discovery Operator installed
- CodeFlare and Kubeflow Training operator (KFTO-V1) enabled/managed
- A persistent shared storage with RWX(ReadWriteMany) access.
    - Workbench Notebook pod: `/opt/app-root/src/shared`
    - RayCluster pods : `/shared`
    - PyTorchJob pods : `/shared`

### References
- [CodeFlare SDK](https://github.com/project-codeflare/codeflare-sdk): Kubernetes-native distributed computing
- [Ray Documentation](https://docs.ray.io/): Distributed computing framework
- [kubeflow-training SDK](https://github.com/kubeflow/training-operator): Kubernetes-native ML training
- [IBM Granite Models](https://huggingface.co/ibm-granite/granite-3.1-2b-instruct): Enterprise LLM family


### Setup Ray Cluster using CodeFlare SDK

Configure and deploy the Ray cluster for distributed data processing


In [None]:
# Setup Ray cluster using CodeFlare SDK
from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication
import time

token=""
api_server=""

# Authenticate with the cluster (replace with your credentials)
auth = TokenAuthentication(
    token=token,
    server=api_server,
    skip_tls=False
)
auth.login()

In [107]:
# Configure Ray cluster for distributed synthetic data generation
from kubernetes.client.models import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource

ray_cluster = Cluster(ClusterConfiguration(
    name="test1-cluster",
    num_workers=1,  # 2 workers for distributed processing
    # Head node configuration
    head_cpu_requests=1,
    head_cpu_limits=2,
    head_memory_requests=8,
    head_memory_limits=16,    
    # Worker node configuration  
    worker_cpu_requests=1,
    worker_cpu_limits=2,
    worker_memory_requests=10,
    worker_memory_limits=20,
    # UnComment in case of using accelerators for RayCluster
    # worker_extended_resource_requests={'nvidia.com/gpu': 1},
    # Ray runtime image
    image="quay.io/rhoai/ray:2.35.0-py311-cu121-torch24-fa26",
    # Volume mount - Shared PVC storage with RWX peermissions
    volume_mounts=[
        V1VolumeMount(
            name="shared",
            mount_path="/shared"
        )
    ],
    volumes=[
        V1Volume(
            name="shared",
            persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
                claim_name="shared"
            )
        )
    ],
))

print(" Ray Cluster Configuration:")
print(f"   Name: {ray_cluster.config.name}")
print(f"   Workers: {ray_cluster.config.num_workers}")
print(f"   Worker Resources: {ray_cluster.config.worker_cpu_requests}CPU, {ray_cluster.config.worker_memory_requests} RAM, {ray_cluster.config.worker_extended_resource_requests} GPU")
print(f"   Image: {ray_cluster.config.image}")


Yaml resources loaded for test1-cluster


VBox(children=(HBox(children=(Button(description='Cluster Apply', icon='play', style=ButtonStyle(), tooltip='C…

Output()

 Ray Cluster Configuration:
   Name: test1-cluster
   Workers: 1
   Worker Resources: 1CPU, 10G RAM, {} GPU
   Image: quay.io/rhoai/ray:2.35.0-py311-cu121-torch24-fa26


In [108]:
# Deploy the Ray cluster
ray_cluster.apply()

Ray Cluster: 'test1-cluster' has successfully been applied. For optimal resource management, you should delete this Ray Cluster when no longer in use.


In [44]:
# Wait for Ray cluster to be ready
ray_cluster.wait_ready()

Waiting for requested resources to be set up...
Requested cluster is up and running!
Dashboard is ready!
Ray cluster is ready!


In [106]:
ray_cluster.down()

Ray Cluster: 'test1-cluster' has successfully been deleted


In [109]:
ray_cluster.details()

RayCluster(name='test1-cluster', status=<RayClusterStatus.READY: 'ready'>, head_cpu_requests=1, head_cpu_limits=2, head_mem_requests='8G', head_mem_limits='16G', num_workers=1, worker_mem_requests='10G', worker_mem_limits='20G', worker_cpu_requests=1, worker_cpu_limits=2, namespace='abdhumal-test', dashboard='', worker_extended_resources={}, head_extended_resources={})

In [110]:
# Initialize the Job Submission Client
client = ray_cluster.job_client
print("Ray job client initialized")


Ray job client initialized


## Submit Ray Job for Synthetic Data Generation

Submit the synthetic data generation function to the Ray cluster:


In [111]:
# Submit the Ray job
submission_id = client.submit_job(
    entrypoint="python ray_sdg_job.py",
    runtime_env={
        "env_vars": {
            'HF_HOME': '/shared/cache',
            'HF_DATASETS_CACHE': '/shared/cache/datasets',
            'TRANSFORMERS_CACHE': '/shared/cache/transformers',
            'TOKENIZERS_PARALLELISM': 'false',
        },
        'pip': [
            'ray[data]>=2.8.0',
            'transformers>=4.36.0',
            'torch>=2.0.0', 
            'datasets>=2.14.0',
            'accelerate>=0.24.0'
        ],
        'working_dir': './',
        "excludes": ["*.ipynb", "*.md"]
    },
)

print(f"Ray job submitted with ID: {submission_id}")

Ray job submitted with ID: raysubmit_aKg8ifY1BQyCtpzM


In [None]:
# Stop/Delete any running jobs
# client.stop_job(submission_id)

client.delete_job(submission_id)

### Cleanup Ray Cluster

Clean up the Ray cluster resources (following ray_finetune_llm_deepspeed.ipynb pattern):


In [None]:
# Cleanup Ray cluster (following ray_finetune_llm_deepspeed.ipynb pattern)
print(" Cleaning up Ray cluster...")

# Tear down the Ray cluster
ray_cluster.down()


### Phase Transition: Ray → kubeflow-training

The synthetic dataset generated by Ray is now ready for distributed training:


In [None]:
# Verify synthetic data is ready for training
import os
import json

data_path = "/shared/datasets/synthetic_dataset.json"

if os.path.exists(data_path):
    with open(data_path, "r") as f:
        dataset_info = json.load(f)
    
    print("Synthetic dataset ready for training!")
    print(f"   Train samples: {len(dataset_info.get('train', []))}")
    print(f"   Test samples: {len(dataset_info.get('test', []))}")
    print(f"   Quality threshold: {dataset_info.get('metadata', {}).get('quality_threshold', 'N/A')}")
    print(f"   Generated by: {dataset_info.get('metadata', {}).get('model_used', 'Local Granite')}")
    print(f"   Data location: {data_path}")
    
    # Show sample data
    if dataset_info.get('train'):
        sample = dataset_info['train'][0]
        print(f"\n Sample synthetic problem:")
        print(f"   Question: {sample.get('question', '')[:100]}...")
        print(f"   Answer: {sample.get('answer', '')[:100]}...")
        
    print(f"\nTransitioning to Phase 2: Distributed Training with kubeflow-training SDK")
    
else:
    print("Synthetic dataset not found!")
    print("   Please run the Ray data processing phase first.")
    print(f"   Expected location: {data_path}")


### Training Configuration using kubeflow-training SDK

In [None]:
%%yaml parameters

# Model configuration
model_name_or_path: ibm-granite/granite-3.1-2b-instruct
model_revision: main
torch_dtype: bfloat16
attn_implementation: flash_attention_2
use_liger: false

# PEFT / LoRA configuration
use_peft: true
lora_r: 16
lora_alpha: 8
lora_dropout: 0.05
lora_target_modules: ["q_proj", "v_proj", "k_proj", "o_proj", "gate_proj", "up_proj", "down_proj"]
lora_modules_to_save: []

# QLoRA (BitsAndBytes)
load_in_4bit: false
load_in_8bit: false

# Dataset configuration (synthetic data from Ray preprocessing)
dataset_name: synthetic_gsm8k
dataset_config: main
dataset_train_split: train
dataset_test_split: test
dataset_text_field: text
dataset_kwargs:
  add_special_tokens: false
  append_concat_token: false

# SFT configuration
max_seq_length: 1024
dataset_batch_size: 1000
packing: false

# Training hyperparameters
num_train_epochs: 3
per_device_train_batch_size: 8
per_device_eval_batch_size: 8
auto_find_batch_size: false
eval_strategy: epoch

# Precision and optimization
bf16: true
tf32: false
learning_rate: 2.0e-4
warmup_steps: 10
lr_scheduler_type: inverse_sqrt
optim: adamw_torch_fused
max_grad_norm: 1.0
seed: 42

# Gradient settings
gradient_accumulation_steps: 1
gradient_checkpointing: false
gradient_checkpointing_kwargs:
  use_reentrant: false

# FSDP for distributed training
fsdp: "full_shard auto_wrap"
fsdp_config:
  activation_checkpointing: true
  cpu_ram_efficient_loading: false
  sync_module_states: true
  use_orig_params: true
  limit_all_gathers: false

# Checkpointing and logging
save_strategy: epoch
save_total_limit: 1
resume_from_checkpoint: false
log_level: warning
logging_strategy: steps
logging_steps: 1
report_to:
- tensorboard

output_dir: /tmp/granite-3.1-2b-instruct-synthetic


### Configure kubeflow-training Client

Set up the kubeflow-training SDK client following the sft.ipynb pattern:


In [None]:
# Configure kubeflow-training client (following sft.ipynb pattern)
from kubernetes import client
from kubeflow.training import TrainingClient
from kubeflow.training.models import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource

configuration = client.Configuration()
configuration.host = api_server
configuration.api_key = {"authorization": f"Bearer {token}"}
# Un-comment if your cluster API server uses a self-signed certificate or an un-trusted CA
# configuration.verify_ssl = False

api_client = client.ApiClient(configuration)
training_client = TrainingClient(client_configuration=api_client.configuration)

print("kubeflow-training client configured")

### Create Training Job using kubeflow-training SDK

Create and submit the distributed training job following the sft.ipynb pattern:


In [None]:
# Submit PyTorchJob using granite_training.py script
from kft_granite_training import main

job = training_client.create_job(
    job_kind="PyTorchJob",
    name="test1-training",
    
    # Use script file instead of function import
    train_func="main",
    
    # Pass YAML parameters as config
    parameters=parameters,
    
    # Distributed training configuration
    num_workers=2,
    num_procs_per_worker=1,
    resources_per_worker={
        "nvidia.com/gpu": 1,  # Uncomment for GPU training

        "memory": "16Gi",
        "cpu": 4,
    }    
    base_image="quay.io/modh/training:py311-cuda124-torch251",
    
    # Environment variables for training
    env_vars={
        # HuggingFace configuration - use shared storage
        "HF_HOME": "/shared/cache",
        "HF_DATASETS_CACHE": "/shared/cache/datasets",
        "TRANSFORMERS_CACHE": "/shared/cache/transformers",
        "TOKENIZERS_PARALLELISM": "false",
        
        # Training configuration
        "PYTHONUNBUFFERED": "1",
        "NCCL_DEBUG": "INFO",
    },
    
    # Package dependencies
    packages_to_install=[
        "transformers>=4.36.0",
        "trl>=0.7.0",
        "datasets>=2.14.0",
        "peft>=0.6.0",
        "accelerate>=0.24.0",
        "torch>=2.0.0",
    ],
    volumes=[
        V1Volume(
            name="shared",
            persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name="shared")
        ),
    ],
    volume_mounts=[
        V1VolumeMount(name="shared", mount_path="/shared"),
    ],
)

print(f"PyTorchJob '{job.metadata.name}' submitted successfully")


### Monitor Training Job

Follow the training progress and logs:


In [None]:
# Monitor training job logs (following sft.ipynb pattern)
training_client.get_job_logs(
    name="test1-training",
    job_kind="PyTorchJob",
    follow=True,
)


In [None]:
# Delete the Training Job
training_client.delete_job("test1-training")