# Ray Features Demo Notebook

This notebook demonstrates key features of the Ray cluster including:
- Basic Ray tasks and actors
- GPU utilization
- Autoscaling behavior
- Resource management
- Common patterns and best practices

## 1. Connect to Ray Cluster

In [None]:
import ray
import time
import numpy as np
import torch

# Connect to the Ray cluster
ray.init(address='ray://ray-cluster-head-svc.ray-system.svc:10001')

# Print cluster resources
print("Cluster Resources:")
print(ray.cluster_resources())

## 2. Basic Ray Tasks

Demonstrate basic task parallelization

In [None]:
@ray.remote
def compute_heavy(n):
    time.sleep(1)  # Simulate computation
    return n * n

# Sequential execution
start_time = time.time()
regular_result = [compute_heavy.bind(i) for i in range(10)]
print(f"Sequential time: {time.time() - start_time:.2f} seconds")

# Parallel execution
start_time = time.time()
futures = [compute_heavy.remote(i) for i in range(10)]
parallel_result = ray.get(futures)
print(f"Parallel time: {time.time() - start_time:.2f} seconds")

print("\nResults are the same:", regular_result == parallel_result)

## 3. GPU Tasks

Demonstrate GPU utilization with PyTorch

In [None]:
@ray.remote(num_gpus=1)
def gpu_task():
    # Create a large tensor on GPU
    tensor = torch.randn(1000, 1000, device='cuda')
    # Perform some GPU operations
    result = torch.mm(tensor, tensor)
    return {
        'gpu_available': torch.cuda.is_available(),
        'gpu_device': torch.cuda.current_device(),
        'gpu_name': torch.cuda.get_device_name(),
        'tensor_device': tensor.device
    }

# Run GPU task
result = ray.get(gpu_task.remote())
print("GPU Task Results:")
for k, v in result.items():
    print(f"{k}: {v}")

## 4. Ray Actors

Demonstrate stateful computations with actors

In [None]:
@ray.remote
class Counter:
    def __init__(self):
        self.value = 0
        
    def increment(self):
        self.value += 1
        return self.value
    
    def get_value(self):
        return self.value

# Create actor instances
counters = [Counter.remote() for _ in range(4)]

# Increment counters in parallel
futures = []
for _ in range(5):  # 5 rounds of increments
    for counter in counters:
        futures.append(counter.increment.remote())

# Get final values
final_values = ray.get([counter.get_value.remote() for counter in counters])
print("Final counter values:", final_values)

## 5. Testing Autoscaling

Create enough tasks to trigger autoscaling

In [None]:
@ray.remote(num_cpus=1)
def cpu_intensive_task(task_id):
    # Simulate CPU-intensive work
    time.sleep(2)
    return f"Task {task_id} completed"

print("Initial cluster resources:")
print(ray.cluster_resources())

# Submit many tasks to trigger autoscaling
futures = [cpu_intensive_task.remote(i) for i in range(20)]

# Process results as they complete
while futures:
    done_id, futures = ray.wait(futures)
    result = ray.get(done_id[0])
    print(result)
    print("Current cluster resources:")
    print(ray.cluster_resources())
    time.sleep(0.5)  # Small delay to see scaling

## 6. Resource Management

Demonstrate proper resource specification

In [None]:
@ray.remote(num_cpus=0.5, num_gpus=0.5)
def partial_resource_task():
    # This task uses half a CPU and half a GPU
    time.sleep(1)
    return "Completed with partial resources"

@ray.remote(num_cpus=1, num_gpus=1)
def full_resource_task():
    # This task uses a full CPU and GPU
    time.sleep(1)
    return "Completed with full resources"

# Run mixed resource tasks
partial_futures = [partial_resource_task.remote() for _ in range(4)]
full_futures = [full_resource_task.remote() for _ in range(2)]

# Get results
partial_results = ray.get(partial_futures)
full_results = ray.get(full_futures)

print("Partial resource task results:", partial_results)
print("Full resource task results:", full_results)

## 7. Clean Up

Proper cleanup of Ray resources

In [None]:
# Disconnect from the Ray cluster
ray.shutdown()
print("Disconnected from Ray cluster")