In [None]:
# Working
import subprocess
import json
import sys

# Configuration
NS = "ray-finetune-llm-deepspeed002"
CLUSTER_NAME = "ray"

def run_cmd(cmd):
    """Run shell command and return output."""
    return subprocess.check_output(cmd, text=True).strip()

print("=" * 70)
print("ENVIRONMENT VERIFICATION")
print("=" * 70)

# Check authentication
print("\n1. OpenShift Authentication:")
whoami = run_cmd(["oc", "whoami"])
print(f"   User: {whoami}")

# Check Ray cluster exists
print("\n2. Ray Cluster Status:")
try:
    rc_json = run_cmd(["oc", "get", "raycluster", CLUSTER_NAME, "-n", NS, "-o", "json"])
    rc_data = json.loads(rc_json)
    
    state = rc_data.get("status", {}).get("state", "unknown")
    ready_workers = rc_data.get("status", {}).get("availableWorkerReplicas", 0)
    desired_workers = rc_data.get("status", {}).get("desiredWorkerReplicas", 0)
    
    print(f"   Cluster: {CLUSTER_NAME}")
    print(f"   State: {state}")
    print(f"   Workers: {ready_workers}/{desired_workers}")
    
    if state != "ready":
        print("   ‚ö†Ô∏è  WARNING: Cluster not ready yet. Wait a few minutes.")
    
    # Check GPU tolerations
    head_tolerations = rc_data["spec"]["headGroupSpec"]["template"]["spec"].get("tolerations", [])
    has_gpu_tol = any(
        t.get("key") == "nvidia.com/gpu" and t.get("effect") == "NoSchedule"
        for t in head_tolerations
    )
    print(f"   GPU Toleration: {has_gpu_tol}")
    
except subprocess.CalledProcessError:
    print(f"   ‚ùå ERROR: RayCluster '{CLUSTER_NAME}' not found!")
    print(f"   ‚Üí Apply: oc apply -f 02_ray_localqueue_and_cluster.yaml")
    raise

# Check Ray pods
print("\n3. Ray Pods:")
try:
    pods_output = run_cmd(["oc", "get", "pods", "-n", NS, "-l", "ray.io/cluster=ray", "--no-headers"])
    if pods_output:
        pods = pods_output.split("\n")
        print(f"   Found {len(pods)} pod(s):")
        for pod in pods:
            parts = pod.split()
            if len(parts) >= 3:
                print(f"     - {parts[0]}: {parts[2]}")
    else:
        print("   ‚ö†Ô∏è  No Ray pods found (cluster may be suspended)")
except:
    print("   ‚ö†Ô∏è  Could not retrieve pod status")

print("\n" + "=" * 70)
print("‚úÖ Prerequisites verified!")
print("=" * 70)

In [None]:
# Install packages needed for this notebook
!{sys.executable} -m pip install --quiet --upgrade \
    "numpy==1.26.4" \
    "pyarrow==15.0.2" \
    "datasets==2.18.0"

print("‚úÖ Dependencies installed")

In [None]:
from codeflare_sdk import TokenAuthentication

# Get token and server from oc CLI
token = subprocess.check_output(["oc", "whoami", "-t"]).decode().strip()
server = subprocess.check_output(["oc", "whoami", "--show-server=true"]).decode().strip()

# Authenticate
auth = TokenAuthentication(
    token=token,
    server=server,
    skip_tls=True  # Set False if your cluster has proper TLS certs
)
auth.login()

print(f"‚úÖ Authenticated to: {server}")

In [None]:
from ray.job_submission import JobSubmissionClient
import subprocess
import json, os

NS = "ray-finetune-llm-deepspeed002"

print("Connecting to Ray cluster...")
print("=" * 70)

# Auto-discover dashboard URL
try:
    result = subprocess.run(
        ["oc", "get", "raycluster", "ray", "-n", NS, "-o", "jsonpath={.status.head.serviceName}"],
        capture_output=True, text=True, check=True
    )
    head_service = result.stdout.strip()
    
    if head_service:
        ray_dashboard_url = f"http://{head_service}.{NS}.svc.cluster.local:8265"
        print(f"‚úÖ Auto-discovered: {ray_dashboard_url}")
    else:
        raise Exception("No service name found")
        
except Exception as e:
    print(f"‚ö†Ô∏è Auto-discovery failed, trying common names...")
    
    # Try common service names
    for svc_name in ["ray-head-svc", "ray-cluster-head-svc", "raycluster-head-svc"]:
        ray_dashboard_url = f"http://{svc_name}.{NS}.svc.cluster.local:8265"
        try:
            test_client = JobSubmissionClient(ray_dashboard_url)
            test_client.list_jobs()
            print(f"‚úÖ Found working URL: {ray_dashboard_url}")
            break
        except:
            continue
    else:
        raise RuntimeError("Could not connect to Ray dashboard!")

# Connect
client = JobSubmissionClient(ray_dashboard_url)

# Verify
jobs = client.list_jobs()
print(f"‚úÖ Connected successfully!")
print(f"   URL: {ray_dashboard_url}")
print(f"   Existing jobs: {len(jobs)}")

print("=" * 70)

In [None]:
# Create requirements.txt for Ray runtime environment
requirements_content = """torch>=2.0.0
transformers>=4.30.0
datasets>=2.18.0
accelerate>=0.20.0
deepspeed>=0.9.0
peft>=0.4.0
bitsandbytes>=0.39.0
scipy
"""

# Write requirements.txt
with open("requirements.txt", "w") as f:
    f.write(requirements_content)

print("‚úÖ Created requirements.txt:")
print(requirements_content)
# Know your Current Working Directory for any references
print(f"\nüìç Current directory: {os.getcwd()}")

In [None]:
import os

MLFORENG_ROOT = "/opt/app-root/src/MLforEng"

# Create setup.py
setup_content = '''from setuptools import setup, find_packages

setup(
    name="mlforeng",
    version="0.1.0",
    packages=find_packages(),
    install_requires=[
        "torch>=2.0.0",
        "transformers>=4.30.0",
        "datasets>=2.18.0",
        "accelerate>=0.20.0",
        "deepspeed>=0.9.0",
        "peft>=0.4.0",
        "bitsandbytes>=0.39.0",
        "scipy",
    ],
    python_requires=">=3.9",
)
'''

setup_path = os.path.join(MLFORENG_ROOT, "setup.py")

with open(setup_path, "w") as f:
    f.write(setup_content)

print(f"‚úÖ Created: {setup_path}")
print("\nContents:")
print(setup_content)
print("\n‚úÖ Now you can submit the job with:")
print('   "pip": ["requirements.txt", "."]')

In [None]:
import os
import json

MLFORENG_ROOT = "/opt/app-root/src/MLforEng"
STORAGE_PATH = "/opt/app-root/src"

# Read DeepSpeed config as dictionary

ds_config_path = f"{MLFORENG_ROOT}/mlforeng/llm_finetune/deepspeed_configs/zero_3_offload_optim_param.json"

if os.path.exists(ds_config_path):
    with open(ds_config_path, 'r') as f:
        ds_config_dict = json.load(f)
    print("‚úÖ DeepSpeed config loaded as dictionary")
else:
    print(f"‚ùå DeepSpeed config not found: {ds_config_path}")
    ds_config_dict = None

# Convert to base64 encoded string for Ray
import base64
ds_config_base64 = base64.b64encode(json.dumps(ds_config_dict).encode()).decode()

TRAINING_CONFIG = {
    "model_name": "meta-llama/Meta-Llama-3.1-8B",
    "use_lora": True,
    "num_devices": 2,
    "num_epochs": 1,
    "batch_size_per_device": 1,
    "eval_batch_size_per_device": 1,
    "storage_path": f"{STORAGE_PATH}/ray_finetune_output/",
    "ds_config": ds_config_base64,  # ‚Üê Base64 encoded string
    "ctx_len": 256,
    "lora_config": "mlforeng/llm_finetune/lora_configs/lora.json"
}

print("\nTraining Configuration:")
print("=" * 70)
for key, value in TRAINING_CONFIG.items():
    if key == "ds_config":
        print(f"  {key}: <base64 encoded> ({len(value)} chars)")
    else:
        print(f"  {key}: {value}")
print("=" * 70)



In [None]:
print("Submitting training job to Ray cluster...")
print("=" * 70)

# Build entrypoint command
# CRITICAL: Use -m flag because script has relative imports
entrypoint_cmd = (
    f"python -m mlforeng.llm_finetune.ray_finetune_llm_deepspeed "
    f"--model-name={TRAINING_CONFIG['model_name']} "
    f"{'--lora ' if TRAINING_CONFIG['use_lora'] else ''}"
    f"--lora-config=mlforeng/llm_finetune/lora_configs/lora.json "
    f"--num-devices={TRAINING_CONFIG['num_devices']} "
    f"--num-epochs={TRAINING_CONFIG['num_epochs']} "
#    f"--max-steps={TRAINING_CONFIG['max_steps']} "
    f"--ds-config={TRAINING_CONFIG['ds_config']} "
    f"--storage-path={TRAINING_CONFIG['storage_path']} "
    f"--batch-size-per-device={TRAINING_CONFIG['batch_size_per_device']} "
    f"--eval-batch-size-per-device={TRAINING_CONFIG['eval_batch_size_per_device']} "
    f"--ctx-len={TRAINING_CONFIG['ctx_len']} "  # Memory optimization
    f"--as-test" # Quick Test Mode
)

print("Entrypoint command:")
print(f"  {entrypoint_cmd}")
print("\n" + "=" * 70)

# Runtime environment configuration
runtime_env = {
    # Working directory: MLforEng root
    "working_dir": MLFORENG_ROOT,
    "pip": [
        # Core ML/DL packages
        "torch>=2.0.0",
        "transformers>=4.30.0",
        "datasets>=2.18.0",
        "accelerate>=0.20.0",
        "deepspeed>=0.9.0",
        "peft>=0.4.0",
        "bitsandbytes>=0.39.0",
        
        # Data science packages
        "scikit-learn",
        "scipy",
        "pandas",
        "numpy",
        "joblib",
        "awscliv2",
        "boto3",
        
        
        # Other utilities
        "tqdm",
        "sentencepiece",
        "protobuf",
    ],
    
    # Environment variables
    "env_vars": {
        # Add MLforEng to Python path
        "PYTHONPATH": MLFORENG_ROOT,
        
        # HuggingFace cache location
        "HF_HOME": f"{STORAGE_PATH}/.cache",
        "TRANSFORMERS_CACHE": f"{STORAGE_PATH}/.cache/transformers",
    },
    
    # Exclude unnecessary files
    "excludes": [
        "/docs/", 
        "*.ipynb", 
        "*.md", 
        ".git/", 
        "__pycache__/",
        "/workshops/",
        "*.pyc"
    ]
}

print("Runtime environment:")
print(f"  Working dir: {runtime_env['working_dir']}")
print(f"  Pip install: {runtime_env['pip']}")
print(f"  PYTHONPATH: {runtime_env['env_vars']['PYTHONPATH']}")
print("\n" + "=" * 70)

try:
    # Submit the job
    submission_id = client.submit_job(
        entrypoint=entrypoint_cmd,
        runtime_env=runtime_env
    )
    
    print(f"\n‚úÖ Job submitted successfully!")
    print(f"\nüìã Job ID: {submission_id}")
    print(f"\n‚è±Ô∏è  Initial status: {client.get_job_status(submission_id)}")
    print(f"\nüí° Monitor progress in the next cell")
    
except Exception as e:
    print(f"\n‚ùå Job submission failed!")
    print(f"\nError: {e}")
    print(f"\nüí° Troubleshooting:")
    print("   1. Check MLforEng has setup.py or pyproject.toml")
    print("   2. Verify all paths in config cell show ‚úÖ")
    print("   3. Check Ray cluster is running")
    raise

print("\n" + "=" * 70)

In [None]:
import time
from IPython.display import clear_output

print("Monitoring training job...")
print("=" * 70)
print(f"Job ID: {submission_id}")
print("=" * 70)

prev_log_length = 0
check_interval = 30  # Check every 30 seconds
max_checks = 120  # Max 60 minutes (120 * 30 seconds)

for iteration in range(max_checks):
    try:
        # Get current status
        status = client.get_job_status(submission_id)
        
        # Get logs
        logs = client.get_job_logs(submission_id)
        lines = logs.split('\n') if logs else []
        
        # Show new log lines
        if len(lines) > prev_log_length:
            new_lines = lines[prev_log_length:]
            
            # Filter for important lines
            important_keywords = [
                'step', 'epoch', 'loss', 'loading', 'downloading',
                'error', 'training', 'started', 'completed', 'saving'
            ]
            
            for line in new_lines:
                line_lower = line.lower()
                if any(keyword in line_lower for keyword in important_keywords):
                    print(line)
            
            prev_log_length = len(lines)
        
        # Check if job finished
        if status in ["SUCCEEDED", "FAILED", "STOPPED"]:
            print("\n" + "=" * 70)
            print(f"‚úÖ Job finished with status: {status}")
            print("=" * 70)
            
            if status == "SUCCEEDED":
                print("\nüéâ Training completed successfully!")
                print(f"\nüìÅ Model saved to: {TRAINING_CONFIG['storage_path']}")
                print("\n‚úÖ Proceed to next cell to check results")
            elif status == "FAILED":
                print("\n‚ùå Training failed!")
                print("\nüìã Check the full logs above for error details")
                print("\nüí° Common issues:")
                print("   - Out of GPU memory (reduce batch size)")
                print("   - Model download failed (check network/auth)")
                print("   - Missing dependencies (check requirements.txt)")
            break
        
        # Show periodic status update
        if iteration % 10 == 0:  # Every 5 minutes
            elapsed = iteration * check_interval
            print(f"\n[{elapsed}s elapsed] Status: {status} - Still monitoring...")
        
        time.sleep(check_interval)
        
    except KeyboardInterrupt:
        print("\n‚ö†Ô∏è  Monitoring interrupted by user")
        print(f"\nJob is still running. Current status: {client.get_job_status(submission_id)}")
        print(f"\nTo resume monitoring, re-run this cell")
        print(f"To stop the job, run: client.stop_job('{submission_id}')")
        break
    
    except Exception as e:
        print(f"\n‚ùå Error monitoring job: {e}")
        break

else:
    print("\n‚ö†Ô∏è  Monitoring timeout reached")
    print(f"Job status: {client.get_job_status(submission_id)}")
    print(f"\nThe job may still be running. Check Ray dashboard or re-run this cell.")

In [None]:
import os

print("Checking training results...")
print("=" * 70)

# Get final job status
final_status = client.get_job_status(submission_id)
print(f"Final job status: {final_status}\n")

# Check if output directory exists
output_dir = TRAINING_CONFIG['storage_path']

if os.path.exists(output_dir):
    print(f"‚úÖ Output directory exists: {output_dir}\n")
    
    # List files in output directory
    print("Files in output directory:")
    try:
        for root, dirs, files in os.walk(output_dir):
            level = root.replace(output_dir, '').count(os.sep)
            indent = ' ' * 2 * level
            print(f"{indent}{os.path.basename(root)}/")
            subindent = ' ' * 2 * (level + 1)
            for file in files:
                file_path = os.path.join(root, file)
                size = os.path.getsize(file_path)
                size_mb = size / (1024 * 1024)
                print(f"{subindent}{file} ({size_mb:.2f} MB)")
                if level > 2:  # Limit depth
                    break
    except Exception as e:
        print(f"  Error listing files: {e}")
else:
    print(f"‚ùå Output directory not found: {output_dir}")
    print("\nThis could mean:")
    print("  - Training hasn't completed yet")
    print("  - Training failed before saving")
    print("  - Storage path is incorrect")

print("\n" + "=" * 70)

# Show full logs if job failed
if final_status == "FAILED":
    print("\nüìã Full job logs:")
    print("=" * 70)
    full_logs = client.get_job_logs(submission_id)
    print(full_logs[-5000:])  # Last 5000 characters
    print("=" * 70)

In [None]:
# Uncomment to stop the job
# client.stop_job(submission_id)
# print(f"‚úÖ Job {submission_id} stopped")

print("üí° Uncomment the lines above to stop the job")

In [None]:
import subprocess, json

NAMESPACE = "ray-finetune-llm-deepspeed002"
RAY_CLUSTER_NAME = "ray"   # what you used in the RayCluster yaml

def get_ray_dashboard_url(namespace: str = NAMESPACE, cluster_name: str = RAY_CLUSTER_NAME) -> str:
    """
    Find the Route created for the Ray dashboard and return a HTTPS URL.
    In RHOAI, the route name convention is 'rayclient-<cluster-name>'.
    """
    route_name = f"ray-dashboard-{cluster_name}"
    
    raw = subprocess.check_output(
        [
            "oc",
            "get",
            "route",
            route_name,
            "-n",
            namespace,
            "-o",
            "json",
        ]
    ).decode()
    data = json.loads(raw)
    host = data["spec"]["host"]
    # Ray dashboard default root, you can jump straight to Jobs tab with /#/jobs
    return f"https://{host}/#/overview"

dash_url = get_ray_dashboard_url()
print("‚úÖ Ray dashboard URL:")
print(dash_url)


In [None]:
# Module 06b: Llama 3 Fine-Tuning with Ray on OpenShift AI

## üéØ **Overview**

This notebook submits a distributed training job to an existing Ray cluster for fine-tuning Llama 3.

**Prerequisites:**
- Ray cluster deployed (via `02_ray_localqueue_and_cluster.yaml`)
- Kueue configured (via `01_gpu_flavor_and_queue.yaml`)
- RBAC permissions (via `03_rbac_notebook_ray.yaml`)
- Training code and data uploaded to `/opt/app-root/src`

**Expected Time:** 1.5-2 hours for full training

**Steps:**
1. Verify prerequisites
2. Configure training parameters
3. Submit training job to Ray
4. Monitor training progress
5. Retrieve results

## 1Ô∏è‚É£ Verify Environment & Prerequisites

Check that we're authenticated and the Ray cluster exists.
import subprocess
import json
import sys

# Configuration
NS = "ray-finetune-llm-deepspeed002"
CLUSTER_NAME = "ray"

def run_cmd(cmd):
    """Run shell command and return output."""
    return subprocess.check_output(cmd, text=True).strip()

print("=" * 70)
print("ENVIRONMENT VERIFICATION")
print("=" * 70)

# Check authentication
print("\n1. OpenShift Authentication:")
whoami = run_cmd(["oc", "whoami"])
print(f"   User: {whoami}")

# Check Ray cluster exists
print("\n2. Ray Cluster Status:")
try:
    rc_json = run_cmd(["oc", "get", "raycluster", CLUSTER_NAME, "-n", NS, "-o", "json"])
    rc_data = json.loads(rc_json)
    
    state = rc_data.get("status", {}).get("state", "unknown")
    ready_workers = rc_data.get("status", {}).get("availableWorkerReplicas", 0)
    desired_workers = rc_data.get("status", {}).get("desiredWorkerReplicas", 0)
    
    print(f"   Cluster: {CLUSTER_NAME}")
    print(f"   State: {state}")
    print(f"   Workers: {ready_workers}/{desired_workers}")
    
    if state != "ready":
        print("   ‚ö†Ô∏è  WARNING: Cluster not ready yet. Wait a few minutes.")
    
    # Check GPU tolerations
    head_tolerations = rc_data["spec"]["headGroupSpec"]["template"]["spec"].get("tolerations", [])
    has_gpu_tol = any(
        t.get("key") == "nvidia.com/gpu" and t.get("effect") == "NoSchedule"
        for t in head_tolerations
    )
    print(f"   GPU Toleration: {has_gpu_tol}")
    
except subprocess.CalledProcessError:
    print(f"   ‚ùå ERROR: RayCluster '{CLUSTER_NAME}' not found!")
    print(f"   ‚Üí Apply: oc apply -f 02_ray_localqueue_and_cluster.yaml")
    raise

# Check Ray pods
print("\n3. Ray Pods:")
try:
    pods_output = run_cmd(["oc", "get", "pods", "-n", NS, "-l", "ray.io/cluster=ray", "--no-headers"])
    if pods_output:
        pods = pods_output.split("\n")
        print(f"   Found {len(pods)} pod(s):")
        for pod in pods:
            parts = pod.split()
            if len(parts) >= 3:
                print(f"     - {parts[0]}: {parts[2]}")
    else:
        print("   ‚ö†Ô∏è  No Ray pods found (cluster may be suspended)")
except:
    print("   ‚ö†Ô∏è  Could not retrieve pod status")

print("\n" + "=" * 70)
print("‚úÖ Prerequisites verified!")
print("=" * 70)
## 2Ô∏è‚É£ Install Dependencies

Install required Python packages for job submission.
# Install packages needed for this notebook
!{sys.executable} -m pip install --quiet --upgrade \
    "numpy==1.26.4" \
    "pyarrow==15.0.2" \
    "datasets==2.18.0"

print("‚úÖ Dependencies installed")
## 3Ô∏è‚É£ Authenticate to OpenShift API

Required for CodeFlare SDK (even though we don't create cluster from notebook).
from codeflare_sdk import TokenAuthentication

# Get token and server from oc CLI
token = subprocess.check_output(["oc", "whoami", "-t"]).decode().strip()
server = subprocess.check_output(["oc", "whoami", "--show-server=true"]).decode().strip()

# Authenticate
auth = TokenAuthentication(
    token=token,
    server=server,
    skip_tls=True  # Set False if your cluster has proper TLS certs
)
auth.login()

print(f"‚úÖ Authenticated to: {server}")
## 4Ô∏è‚É£ Connect to Ray Cluster

Connect to the existing Ray cluster (created from YAML files).
from ray.job_submission import JobSubmissionClient

# Ray dashboard URL (internal cluster service)
ray_dashboard_url = f"http://ray-head-svc.{NS}.svc.cluster.local:8265"

print("Connecting to Ray cluster...")
print(f"  Cluster: {CLUSTER_NAME}")
print(f"  Namespace: {NS}")
print(f"  Dashboard: {ray_dashboard_url}")

# Create job submission client
client = JobSubmissionClient(ray_dashboard_url)

# Verify connection by listing existing jobs
try:
    existing_jobs = client.list_jobs()
    print(f"\n‚úÖ Connected! Found {len(existing_jobs)} existing job(s).")
    
    if existing_jobs:
        print("\nExisting jobs:")
        for job_id in existing_jobs:
            status = client.get_job_status(job_id)
            print(f"  - {job_id}: {status}")
except Exception as e:
    print(f"\n‚ùå Connection failed: {e}")
    print("\nüí° Troubleshooting:")
    print("   1. Check Ray cluster is running: oc get pods -n", NS)
    print("   2. Check service exists: oc get svc ray-head-svc -n", NS)
    print("   3. Wait a few minutes if cluster just started")
    raise
## 5Ô∏è‚É£ Configure Training Parameters

Set up the training configuration and storage.
import os

# Storage configuration
# Use local persistent storage in the notebook pod
STORAGE_PATH = "/opt/app-root/src"

# Training parameters
TRAINING_CONFIG = {
    "model_name": "meta-llama/Meta-Llama-3.1-8B",
    "use_lora": True,
    "num_devices": 2,  # 2 GPUs (head + 1 worker)
    "num_epochs": 1,
    # "max_steps": 5,  # Quick test - increase for full training
    "batch_size_per_device": 1,
    "eval_batch_size_per_device": 1,
    "storage_path": f"{STORAGE_PATH}/ray_finetune_llm_deepspeed/",
    "ds_config": "./deepspeed_configs/zero_3_offload_optim_param.json"
}

print("Training Configuration:")
print("=" * 70)
for key, value in TRAINING_CONFIG.items():
    print(f"  {key}: {value}")
print("=" * 70)

print(f"\nüìÅ Storage: {STORAGE_PATH}")
print(f"üìä Expected training time: ~10-15 minutes ({TRAINING_CONFIG['max_steps']} steps)")
print(f"üí° For full training, increase max_steps to 100+")
## 6Ô∏è‚É£ Prepare Runtime Environment

Create requirements file for the Ray job.
# Create requirements.txt for Ray runtime environment
requirements_content = """torch>=2.0.0
transformers>=4.30.0
datasets>=2.18.0
accelerate>=0.20.0
deepspeed>=0.9.0
peft>=0.4.0
bitsandbytes>=0.39.0
scipy
"""

# Write requirements.txt
with open("requirements.txt", "w") as f:
    f.write(requirements_content)

print("‚úÖ Created requirements.txt:")
print(requirements_content)
## 7Ô∏è‚É£ Submit Training Job to Ray

Submit the distributed training job to the Ray cluster.

**Expected behavior:**
- Job submission takes ~1-2 minutes
- Training starts on GPU workers
- Progress can be monitored in next cell
print("Submitting training job to Ray cluster...")
print("=" * 70)

# Build entrypoint command
entrypoint_cmd = (
    f"python ray_finetune_llm_deepspeed.py "
    f"--model-name={TRAINING_CONFIG['model_name']} "
    f"{'--lora ' if TRAINING_CONFIG['use_lora'] else ''}"
    f"--num-devices={TRAINING_CONFIG['num_devices']} "
    f"--num-epochs={TRAINING_CONFIG['num_epochs']} "
    f"--max-steps={TRAINING_CONFIG['max_steps']} "
    f"--ds-config={TRAINING_CONFIG['ds_config']} "
    f"--storage-path={TRAINING_CONFIG['storage_path']} "
    f"--batch-size-per-device={TRAINING_CONFIG['batch_size_per_device']} "
    f"--eval-batch-size-per-device={TRAINING_CONFIG['eval_batch_size_per_device']}"
)

print("Entrypoint command:")
print(f"  {entrypoint_cmd}")
print("\n" + "=" * 70)

# Runtime environment configuration
runtime_env = {
    "env_vars": {
        "HF_HOME": f"{STORAGE_PATH}/.cache",
        "TRANSFORMERS_CACHE": f"{STORAGE_PATH}/.cache/transformers",
    },
    "pip": "requirements.txt",
    "working_dir": "./",
    "excludes": ["/docs/", "*.ipynb", "*.md", ".git/", "__pycache__/"]
}

try:
    # Submit the job
    submission_id = client.submit_job(
        entrypoint=entrypoint_cmd,
        runtime_env=runtime_env
    )
    
    print(f"\n‚úÖ Job submitted successfully!")
    print(f"\nüìã Job ID: {submission_id}")
    print(f"\n‚è±Ô∏è  Initial status: {client.get_job_status(submission_id)}")
    print(f"\nüí° Monitor progress in the next cell")
    
except Exception as e:
    print(f"\n‚ùå Job submission failed!")
    print(f"\nError: {e}")
    print(f"\nüí° Common issues:")
    print("   - Missing training script: ray_finetune_llm_deepspeed.py")
    print("   - Missing DeepSpeed config: deepspeed_configs/zero_3_offload_optim_param.json")
    print("   - Ray cluster not ready (check previous cells)")
    raise
## 8Ô∏è‚É£ Monitor Training Progress

Monitor job status and training logs in real-time.

**What to look for:**
- Job status: PENDING ‚Üí RUNNING ‚Üí SUCCEEDED
- Training logs showing epoch/step progress
- Loss decreasing over time

**Expected timeline:**
- Setup: 2-5 minutes (downloading model, installing packages)
- Training: 10-15 minutes (5 steps with test config)
- Cleanup: 1-2 minutes
import time
from IPython.display import clear_output

print("Monitoring training job...")
print("=" * 70)
print(f"Job ID: {submission_id}")
print("=" * 70)

prev_log_length = 0
check_interval = 30  # Check every 30 seconds
max_checks = 120  # Max 60 minutes (120 * 30 seconds)

for iteration in range(max_checks):
    try:
        # Get current status
        status = client.get_job_status(submission_id)
        
        # Get logs
        logs = client.get_job_logs(submission_id)
        lines = logs.split('\n') if logs else []
        
        # Show new log lines
        if len(lines) > prev_log_length:
            new_lines = lines[prev_log_length:]
            
            # Filter for important lines
            important_keywords = [
                'step', 'epoch', 'loss', 'loading', 'downloading',
                'error', 'training', 'started', 'completed', 'saving'
            ]
            
            for line in new_lines:
                line_lower = line.lower()
                if any(keyword in line_lower for keyword in important_keywords):
                    print(line)
            
            prev_log_length = len(lines)
        
        # Check if job finished
        if status in ["SUCCEEDED", "FAILED", "STOPPED"]:
            print("\n" + "=" * 70)
            print(f"‚úÖ Job finished with status: {status}")
            print("=" * 70)
            
            if status == "SUCCEEDED":
                print("\nüéâ Training completed successfully!")
                print(f"\nüìÅ Model saved to: {TRAINING_CONFIG['storage_path']}")
                print("\n‚úÖ Proceed to next cell to check results")
            elif status == "FAILED":
                print("\n‚ùå Training failed!")
                print("\nüìã Check the full logs above for error details")
                print("\nüí° Common issues:")
                print("   - Out of GPU memory (reduce batch size)")
                print("   - Model download failed (check network/auth)")
                print("   - Missing dependencies (check requirements.txt)")
            break
        
        # Show periodic status update
        if iteration % 10 == 0:  # Every 5 minutes
            elapsed = iteration * check_interval
            print(f"\n[{elapsed}s elapsed] Status: {status} - Still monitoring...")
        
        time.sleep(check_interval)
        
    except KeyboardInterrupt:
        print("\n‚ö†Ô∏è  Monitoring interrupted by user")
        print(f"\nJob is still running. Current status: {client.get_job_status(submission_id)}")
        print(f"\nTo resume monitoring, re-run this cell")
        print(f"To stop the job, run: client.stop_job('{submission_id}')")
        break
    
    except Exception as e:
        print(f"\n‚ùå Error monitoring job: {e}")
        break

else:
    print("\n‚ö†Ô∏è  Monitoring timeout reached")
    print(f"Job status: {client.get_job_status(submission_id)}")
    print(f"\nThe job may still be running. Check Ray dashboard or re-run this cell.")
## 9Ô∏è‚É£ Check Training Results

Verify the trained model was saved successfully.
import os

print("Checking training results...")
print("=" * 70)

# Get final job status
final_status = client.get_job_status(submission_id)
print(f"Final job status: {final_status}\n")

# Check if output directory exists
output_dir = TRAINING_CONFIG['storage_path']

if os.path.exists(output_dir):
    print(f"‚úÖ Output directory exists: {output_dir}\n")
    
    # List files in output directory
    print("Files in output directory:")
    try:
        for root, dirs, files in os.walk(output_dir):
            level = root.replace(output_dir, '').count(os.sep)
            indent = ' ' * 2 * level
            print(f"{indent}{os.path.basename(root)}/")
            subindent = ' ' * 2 * (level + 1)
            for file in files:
                file_path = os.path.join(root, file)
                size = os.path.getsize(file_path)
                size_mb = size / (1024 * 1024)
                print(f"{subindent}{file} ({size_mb:.2f} MB)")
                if level > 2:  # Limit depth
                    break
    except Exception as e:
        print(f"  Error listing files: {e}")
else:
    print(f"‚ùå Output directory not found: {output_dir}")
    print("\nThis could mean:")
    print("  - Training hasn't completed yet")
    print("  - Training failed before saving")
    print("  - Storage path is incorrect")

print("\n" + "=" * 70)

# Show full logs if job failed
if final_status == "FAILED":
    print("\nüìã Full job logs:")
    print("=" * 70)
    full_logs = client.get_job_logs(submission_id)
    print(full_logs[-5000:])  # Last 5000 characters
    print("=" * 70)
## üîü Optional: Stop Running Job

Use this cell if you need to stop a running job.
# Uncomment to stop the job
# client.stop_job(submission_id)
# print(f"‚úÖ Job {submission_id} stopped")

print("üí° Uncomment the lines above to stop the job")
## 1Ô∏è‚É£1Ô∏è‚É£ Optional: View Ray Dashboard

Get the URL to the Ray dashboard for detailed monitoring.
print("Getting Ray Dashboard URL...")
print("=" * 70)

# Try to get the route
try:
    result = subprocess.run([
        "oc", "get", "route", "-n", NS,
        "-o", "jsonpath={.items[?(@.spec.to.name=='ray-head-svc')].spec.host}"
    ], capture_output=True, text=True, check=False)
    
    route_host = result.stdout.strip()
    
    if route_host:
        dashboard_url = f"https://{route_host}/#/overview"
        print(f"‚úÖ Ray Dashboard URL:\n\n   {dashboard_url}\n")
        print("üìä Dashboard shows:")
        print("   ‚Ä¢ Cluster status")
        print("   ‚Ä¢ Running jobs")
        print("   ‚Ä¢ GPU utilization")
        print("   ‚Ä¢ Worker nodes")
        print("   ‚Ä¢ Logs and metrics")
    else:
        print("‚ö†Ô∏è  No route found for Ray dashboard")
        print("\nTo create one:")
        print(f"   oc expose svc ray-head-svc -n {NS} --port=dashboard")
        
except Exception as e:
    print(f"‚ùå Error getting route: {e}")

print("\n" + "=" * 70)
## 1Ô∏è‚É£2Ô∏è‚É£ Summary & Next Steps

**What you accomplished:**
- ‚úÖ Verified Ray cluster prerequisites
- ‚úÖ Connected to Ray cluster
- ‚úÖ Submitted distributed training job
- ‚úÖ Monitored training progress
- ‚úÖ Retrieved training results

**Next Steps:**

1. **Evaluate the Model:**
   - Load the fine-tuned model
   - Test on evaluation dataset
   - Compare with base model

2. **Deploy the Model:**
   - Package model for serving
   - Deploy with KServe (Module 05)
   - Create inference API

3. **Iterate:**
   - Adjust hyperparameters
   - Train for more epochs
   - Try different LoRA configs

**Resources:**
- Ray Documentation: https://docs.ray.io/
- DeepSpeed: https://www.deepspeed.ai/
- Llama 3: https://ai.meta.com/llama/

## üßπ Cleanup (Optional)

**Note:** The Ray cluster is managed by YAML files and should NOT be deleted from this notebook.

To delete the Ray cluster, run this command in a terminal:
```bash
oc delete raycluster ray -n ray-finetune-llm-deepspeed002
```

Or delete all resources:
```bash
oc delete -f 02_ray_localqueue_and_cluster.yaml
oc delete -f 01_gpu_flavor_and_queue.yaml
oc delete -f 03_rbac_notebook_ray.yaml
```

In [None]:
# cluster.details()

In [None]:
from ray.job_submission import JobSubmissionClient

# Match what we set in the previous cell
namespace = "ray-finetune-llm-deepspeed002"
cluster_name = "ray"

ray_url = f"http://ray-head-svc.{namespace}.svc.cluster.local:8265"

print(f"Cluster:  {cluster_name}")
print(f"Namespace:{namespace}")
print(f"Ray URL:  {ray_url}")

client = JobSubmissionClient(ray_url)


In [None]:
from ray.job_submission import JobSubmissionClient

# Use cluster config (you already have this from earlier cells)
namespace = cluster.config.namespace
cluster_name = cluster.config.name

# Construct URL dynamically
ray_url = f"http://ray-head-svc.{namespace}.svc.cluster.local:8265"

print(f"Cluster: {cluster_name}")
print(f"Namespace: {namespace}")
print(f"Ray URL: {ray_url}")

# Create client
client = JobSubmissionClient(ray_url)
print("‚úì Client connected!")

# Verify
jobs = client.list_jobs()
print(f"‚úì Found {len(jobs)} existing jobs")

In [None]:
# Storage configuration
storage_path = '/opt/app-root/src'

# The S3 bucket where to store checkpoint.
# It can be set manually, otherwise it's retrieved from configured the data connection.
s3_bucket = ''  # Empty string for local storage

# Comment out S3 logic - keep it simple
# if not s3_bucket:
#     s3_bucket = os.environ.get('AWS_S3_BUCKET')
# if s3_bucket:
#     storage_path = f's3://{s3_bucket}'

print(f"Using local storage: {storage_path}")

In [None]:
# Submit Ray job
submission_id = client.submit_job(
    entrypoint="python mlforeng/llm_finetune/ray_finetune_llm_deepspeed"
               "--model-name=meta-llama/Meta-Llama-3.1-8B "
               "--lora "
               "--num-devices=2 "
               "--num-epochs=1 "
               "--max-steps=5 "
               "--ds-config=./deepspeed_configs/zero_3_offload_optim_param.json "
               f"--storage-path={storage_path}/ray_finetune_llm_deepspeed/ "
               "--batch-size-per-device=1 "
               "--eval-batch-size-per-device=1 ",
    runtime_env={
        "env_vars": {
            # Set the following variables if using AWS S3 as storage
            # 'AWS_ACCESS_KEY_ID': os.environ.get('AWS_ACCESS_KEY_ID'),
            # 'AWS_SECRET_ACCESS_KEY': os.environ.get('AWS_SECRET_ACCESS_KEY'),
            # 'AWS_DEFAULT_REGION': os.environ.get('AWS_DEFAULT_REGION'),
            'HF_HOME': f'{storage_path}/.cache'
        },
        'pip': 'requirements.txt',
        'working_dir': './',
        "excludes": ["/docs/", "*.ipynb", "*.md"]
    },
)
print(submission_id)

In [None]:
import time

print("Monitoring training progress...")
print("-" * 60)

prev_log_length = 0
for i in range(60):  # Check for 60 iterations (30 minutes)
    logs = client.get_job_logs(submission_id)
    lines = logs.split('\n')
    
    # Only show new lines
    if len(lines) > prev_log_length:
        new_lines = lines[prev_log_length:]
        for line in new_lines:
            if any(keyword in line.lower() for keyword in ['step', 'epoch', 'loss', 'loading', 'error', 'training']):
                print(line)
        prev_log_length = len(lines)
    
    status = client.get_job_status(submission_id)
    if status in ["SUCCEEDED", "FAILED", "STOPPED"]:
        print(f"\n‚úì Job finished with status: {status}")
        break
    
    time.sleep(30)  # Check every 30 seconds

In [None]:
client.stop_job(submission_id)

In [None]:
# cluster.down()
# We no longer tear down the RayCluster from the notebook.
# The RayCluster "ray" is managed via YAML and can be deleted by an admin with:
#   oc delete raycluster ray -n ray-finetune-llm-deepspeed002
print("Skipping cluster.down(); RayCluster is managed by YAML."