## Example 1: Stateful Hello World

This is an example of how to use routed services to create a simple "Hello World" service, where we'll route the request to a service that will return "Hello World" to the client.



In [1]:
get_ipython().cache_size = 0  # Disables caching completely

import ray

ray.init()


2023-09-04 14:27:55,472	INFO worker.py:1431 -- Connecting to existing Ray cluster at address: 192.168.178.154:6379...
2023-09-04 14:27:55,512	INFO worker.py:1612 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.11.4
Ray version:,2.6.3
Dashboard:,http://127.0.0.1:8265


# First define the service
Define a sticky connections worker by subclassing `StatefulWorker` and decorating it with `routed_service`. The `routed_service` decorator takes two arguments: `expiry_period` and `max_concurrent_sessions`. The `expiry_period` is the amount of time that a session will be kept alive for after the last request. The `max_concurrent_sessions` is the maximum number of sessions that can be active at any one time. If a new session is requested and the maximum number of sessions is already active, then a new worker is spun up and they are balanced across.

## Our serialisable session

To define the session state we use a `SerialisableBaseModel` from the `stick_ray.utils` module. This is a subclass of `pydantic.BaseModel` that is efficiently serialisable by Ray. We can then define the session state as a class with some attributes. In this case we'll just have two attributes `count` that we'll increment each time the session is handled, and `world_name` will just point to the `session_id` for now.

**Note**: we are not required to store the `session_id` at any point. The only thing you need the `session_id` for is to set and retrieve the session state. The `session_id` is automatically stored in the session state by the `StatefulWorker` class.


In [2]:
import asyncio
from stick_ray.utils import SerialisableBaseModel
from stick_ray.stateful_worker import StatefulWorker
from datetime import timedelta
from stick_ray import routed_service
import numpy as np


class MySessionState(SerialisableBaseModel):
    """
    This is the state that will be stored for each session.
    """
    world_name: str
    count: int = 0


@routed_service(expiry_period=timedelta(seconds=10))
class ToyWorker(StatefulWorker):
    async def _create_session(self, session_id: str):
        state = MySessionState(world_name=f"{session_id} World", count=0)
        # Use API to store the session state
        await self.set_session_state(session_id, state)

    async def say_hello(self, *, session_id):
        # Use API to get the session state
        state = await self.get_session_state(session_id=session_id)
        state.count += 1
        return f"Hello {state.world_name} for the {state.count}-th time!"






# Starting the service

To start the service we just need to create an instance of the class. This will automatically start the service and register it with the Ray backend. We can fetch the service by calling the class method `fetch`. This checks if the service is running and if it is, returns the service. If the service is not running, it will start it and then return it.

In [3]:
service = ToyWorker()

# After the service is created, we can fetch it simply by calling the class method `fetch`
assert service == ToyWorker.fetch()


[2m[36m(Router:ToyWorker pid=12852)[0m Using blocking ray.get inside async actor. This blocks the event loop. Please use `await` on object ref with asyncio.gather if you want to yield execution to the event loop instead.
[2m[33m(raylet)[0m [2023-09-04 14:27:58,531 E 12455 12470] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2023-09-04_14-27-46_650200_12356 is over 95% full, available space: 15171612672; capacity: 502392610816. Object creation will fail if spilling is required.



# Getting a handle

To call into the service we need a `RoutedServiceHandle`. To get a handle we call the `get_handle` method on the service. This returns a `RoutedServiceHandle` that we can use to call into the service.

## Calling into the service

The handle then exposes all the public methods of the `StatefulWorker` (those with a leading underscore). We can call into the service by calling the method on the handle. This will return a `ray.ObjectRef` that we can use to retrieve the result of the call. The `ray.ObjectRef` is a handle to the result of the call. We can retrieve the result by calling `ray.get` on the `ray.ObjectRef`. This will block until the result is ready. If we were using a `sync=False` handle then we would use the double `await` pattern of Ray Serve deployments.

## Worker expiry

Since we set the worker expiry to 10 seconds, after 10 seconds the worker will be removed from the Ray cluster. If we try to call into the service after this time, we cause the creation of a new session. We can see this by calling into the service twice, waiting 11 seconds, and then calling into the service again. The first two calls will show the count being incremented, and the third call will show the state being reset to 1.

In [10]:
from time import sleep

ray.init(address="auto")

# Deploy / Update the service
ToyWorker()

# Get a handle
handle = service.get_handle(sync=True)

print(ray.get(handle.say_hello(session_id="Bill")))
# “Hello Bill World for the 1-th time!”

print(ray.get(handle.say_hello(session_id="Bill")))
# “Hello Bill World for the 2-th time!”

# Let the session expire
sleep(11)

print(ray.get(handle.say_hello(session_id="Bill")))
# “Hello Bill World for the 1-th time!”



Hello World Bill! You've been handled 1 times!
Hello World Bill! You've been handled 1 times!


[2m[33m(raylet)[0m [2023-09-04 14:33:58,967 E 12455 12470] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2023-09-04_14-27-46_650200_12356 is over 95% full, available space: 15170351104; capacity: 502392610816. Object creation will fail if spilling is required.


Hello World Bill! You've been handled 1 times!


# Async handles
Async handles are gotten easily, and awaiting a method call returns an object ref, which must be awaited to get the result. This results in the double await pattern seen in other parts of Ray.

In [11]:
async_handle = service.get_handle(sync=False)
obj_ref = await async_handle.say_hello(session_id="Bill")
result = await obj_ref
print(result)

obj_ref = await async_handle.say_hello(session_id="Bill")
result = await obj_ref
print(result)

Hello World Bill! You've been handled 1 times!
Hello World Bill! You've been handled 2 times!


In [None]:
ray.init(address='auto')

# Deploy/Update the service
ToyWorker()

# Get a handle
handle = service.get_handle(sync=True)

print(ray.get(handle.say_hello(session_id="Bill")))
# "Hello Bill World for the 1-th time!"

print(ray.get(handle.say_hello(session_id="Bill")))
# "Hello Bill World for the 2-th time!"
