In [1]:
import ray
import time
import random
import logging

logging.basicConfig(level=logging.INFO)

2025-12-27 00:28:54,092	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [2]:
use_existing_kuberay = True
kuberay_api_port = 10001

def ray_init():
    # Shutdown any existing Ray connection
    if ray.is_initialized():
        ray.shutdown()

    if not use_existing_kuberay:
        # Setup and connect to a local Ray cluster
        ray.init()  # creates a local Ray cluster
    else:
        # Connect to existing KubeRay cluster
        # logging.basicConfig(level=logging.INFO)
        ray.init(address=f"ray://localhost:{kuberay_api_port}")
        # ray.init(address="ray://localhost:65374", configure_logging=True, log_to_driver=True)

def ray_shutdown():
    # Shutdown Ray client
    ray.shutdown()

In [3]:
ray_init()

2025-12-27 00:29:00,910	INFO client_builder.py:241 -- Passing the following kwargs to ray.init() on the server: log_to_driver
SIGTERM handler is not set because current thread is not the main thread.
    Ray: 2.53.0
    Python: 3.12.9
This process on Ray Client was started with:
    Ray: 2.53.0
    Python: 3.12.12



In [4]:
# run a task

@ray.remote
def square(x):
    time.sleep(0.1)  # Simulate work
    return x**2


numbers = [1, 2, 3, 4, 5]
futures = [square.remote(x) for x in numbers]
results = ray.get(futures)
print(f"Squared: {results}")

Squared: [1, 4, 9, 16, 25]


In [5]:
# run task with params

@ray.remote
def compute_task(task_id, multiplier, base_value):
    # time.sleep(0.2)  # 200ms sleep
    return task_id * multiplier + base_value

# Create tasks with different parameters
futures = []
for i in range(50):
    task_id = i + 1
    multiplier = random.randint(2, 10)
    base_value = random.randint(1, 50)
    future = compute_task.remote(task_id, multiplier, base_value)
    futures.append(future)

# Wait for all tasks to complete
results = ray.get(futures)
print(f"Results: {results}")

Results: [54, 30, 29, 50, 46, 49, 81, 55, 53, 69, 103, 27, 87, 120, 144, 165, 160, 107, 179, 154, 203, 136, 109, 54, 268, 258, 232, 118, 90, 97, 219, 302, 259, 147, 368, 118, 239, 205, 360, 124, 249, 445, 277, 179, 378, 325, 205, 365, 177, 419]


In [6]:
# serialization


# Variable - gets serialized
bonus_points = 5


# Helper function - gets serialized
def add_one(x):
    return x + 1


# Helper class - gets serialized
class Multiplier:
    def __init__(self, factor):
        self.factor = factor

    def multiply(self, x):
        return x * self.factor


@ray.remote
def process(x, multiplier_obj):
    # Ray serializes the function, object, and variable to workers
    incremented = add_one(x)  # function call
    result = multiplier_obj.multiply(incremented)  # object method call
    return result + bonus_points  # variable usage


# Usage - Ray handles serialization automatically
numbers = [1, 2, 3, 4, 5]
mult = Multiplier(factor=10)
futures = [process.remote(x, mult) for x in numbers]
results = ray.get(futures)
print(f"Results: {results}")  # [25, 35, 45, 55, 65]

Results: [25, 35, 45, 55, 65]


In [7]:
# Python envs with dependencies

@ray.remote(runtime_env={"pip": ["pandas"]})
# this needs 'pip' available in the environment
# you could also add packages to the ray cluster itself:
# ray.init(runtime_env={"pip": ["pandas"]})
def process_with_pandas(data):
    import pandas as pd

    df = pd.DataFrame(data)
    return df.sum().tolist()


pandas_data = {"col1": [1, 2, 3], "col2": [4, 5, 6]}
pandas_result = ray.get(process_with_pandas.remote(pandas_data))
print(f"Pandas result: {pandas_result}")

Pandas result: [6, 15]


In [8]:
# passing data between pipeline steps in a master step
# steps returning only data refs, only end result is transferred.


@ray.remote
def square(x):
    logging.info(f"Starting square of {x}")
    return x * x
 

@ray.remote
def sum_two(a, b):
    logging.info(f"Starting sum of {a} + {b}")
    return a + b


@ray.remote
def calculate_mean(total, count):
    logging.info(f"Starting mean calculation: {total} / {count}")
    return total / count


@ray.remote
def run_pipeline(input_data):
    # Step 1: Square each number with named tasks
    squared_futures = [square.options(name=f"square_{x}").remote(x) for x in input_data]

    # Step 2: Sum using reduce pattern with named tasks
    total_future = squared_futures[0]
    for i, future in enumerate(squared_futures[1:], 1):
        total_future = sum_two.options(name=f"sum_step_{i}").remote(
            total_future, future
        )

    # Step 3: Calculate mean with named task
    count = len(input_data)
    mean_future = calculate_mean.options(name="final_mean").remote(total_future, count)

    # Resolve graph
    final_result = ray.get(mean_future)
    print(f"Mean of squares: {final_result}")


input_data = [1, 2, 3, 4, 5]
pipeline_future = run_pipeline.remote(input_data)
final_result = ray.get(pipeline_future)
print(f"Mean of squares: {final_result}")


# Computation graph: square(1), square(2), square(3), square(4), square(5)
#                   → sum_two reduction tree → calculate_mean → final result
# Ray schedules all tasks optimally with lazy execution!


Mean of squares: None


[36m(run_pipeline pid=16575)[0m Mean of squares: 11.0


In [9]:
# passing data between pipeline steps in a master step
# steps returning only data refs, only end result is transferred.


@ray.remote
def square(x):
    logging.info(f"Starting square of {x}")
    return x * x


@ray.remote
def sum_two(a, b):
    logging.info(f"Starting sum of {a} + {b}")
    return a + b


@ray.remote
def calculate_mean(total, count):
    logging.info(f"Starting mean calculation: {total} / {count}")
    return total / count


# Execute tasks at top level for dashboard visibility
input_data = [1, 2, 3, 4, 5]

# Step 1: Square each number with named tasks
squared_futures = [square.options(name=f"square_{x}").remote(x) for x in input_data]

# Step 2: Sum using reduce pattern with named tasks  
total_future = squared_futures[0]
for i, future in enumerate(squared_futures[1:], 1):
    total_future = sum_two.options(name=f"sum_step_{i}").remote(total_future, future)

# Step 3: Calculate mean with named task
count = len(input_data)
mean_future = calculate_mean.options(name="final_mean").remote(total_future, count)

# Resolve graph
final_result = ray.get(mean_future)
print(f"Mean of squares: {final_result}")

# Computation graph: square(1), square(2), square(3), square(4), square(5)
#                   → sum_two reduction tree → calculate_mean → final result
# Ray schedules all tasks optimally with lazy execution!


Mean of squares: 11.0


In [21]:
# Detached Actor - survives disconnection

@ray.remote
class Worker:
    def __init__(self):
        self.results = {}
    
    def run_task(self, task_id, data):
        time.sleep(3)  # Simulate long-running task
        result = sum(x * x for x in data)
        self.results[task_id] = result
    
    def get_result(self, task_id):
        return self.results.get(task_id)

# Create detached actor
worker = Worker.options(
    name="my_worker",
    namespace="my_namespace",
    lifetime="detached"
).remote()

# Submit work
worker.run_task.remote("job1", [1, 2, 3, 4, 5])
time.sleep(0.5)  # Ensure task starts

# Disconnect
ray.shutdown()
time.sleep(4)

# Reconnect
ray.init(address=f"ray://localhost:10001", namespace="my_namespace")
worker = ray.get_actor("my_worker", namespace="my_namespace")

# Get result
result = ray.get(worker.get_result.remote("job1"))
print(f"Result: {result}")

# Cleanup
ray.kill(worker)

2025-12-27 02:31:21,809	INFO client_builder.py:241 -- Passing the following kwargs to ray.init() on the server: log_to_driver
SIGTERM handler is not set because current thread is not the main thread.
    Ray: 2.53.0
    Python: 3.12.9
This process on Ray Client was started with:
    Ray: 2.53.0
    Python: 3.12.12



Result: 55


In [22]:
ray_shutdown()