# Ray Async Actor

* [Pattern: Using asyncio to run actor methods concurrently](https://docs.ray.io/en/latest/ray-core/patterns/concurrent-operations-async-actor.html)

> By default, a Ray actor runs in a single thread and actor method calls are executed sequentially. we use await to yield control from the long running method call so other method calls can run concurrently. Normally the control is yielded when the method is doing IO operations but you can also use await asyncio.sleep(0) to yield control explicitly.

* [Scaling Python Asyncio with Ray](https://medium.com/distributed-computing-with-ray/scaling-python-asyncio-with-ray-aaf42ee03a8e)

> This post explains how Ray natively supports Python’s single-threaded asyncio coroutines and enables seamless scaling of coroutines to multiple processes and to a cluster of machines.
> 
> Combining concurrent actor methods and async ray.get create more efficient ray code. For example, a service load balancer can be now implemented in a few lines of code. In the load balancer actor, many instances of proxy_request method will be executed concurrently.
> ```
> @ray.remote
> class LoadBalancer:
>     async def proxy_request(self, query):
>         actor = self.choose_actor()
>         return await actor.execute.remote(query)
> ```

In [1]:
import time
from enum import Enum
import ray
from ray.util.queue import Queue, Empty

In [2]:
ray.init()

2023-11-04 18:47:59,748	INFO worker.py:1553 -- Started a local Ray instance.


0,1
Python version:,3.9.13
Ray version:,2.3.0


# Workflow

1. Starts two actors and invoke its method that read from the queue. The method blocks on the queue while it is empty.
2. Put tasks into the queue.
3. The actor method get unblocked and run the task from the queue.
4. Kill the actors.

In [13]:
# You can pass this object around to different tasks/actors
queue = Queue(maxsize=100)

In [14]:
class State(Enum):
    INITIALIZED: int = 0,
    RUNNING: int = 1,
    STOPPED: int = 2


@ray.remote
class Consumer:
    def __init__(self, worker_id: int, queue: ray.util.queue.Queue): 
        self._id: int = worker_id
        self._queue: ray.util.queue.Queue = queue
        self._state: State.INITIALIZED
            
    async def run(self) -> int:
        print(f"{self._id} running.")
        self._state = State.RUNNING
        try:
            while True:
                # Need to use get_async.
                # item = await queue.get(block=True, timeout=None)
                item = await queue.get_async(block=True, timeout=None)
                if self._state == State.RUNNING:
                    print(f"consumer {self._id} got work {item}")
                else:
                    break
            
            print("exiting run()...")
            return 1 
                
        except error as e:
            print(f"id: {self._id} exiting due to [{e}].")
            return -1
        
        print("run() unexpected exit.")
        return -2

    def stop(self):
        """Set the state to STOPPED.
        """
        print(f"id: {self._id} stopping...")
        self._state = State.STOPPED
        # ray.actor.exit_actor()

## Start two actors in parallel

In [15]:
consumers = [
    # Consumer.options(max_concurrency=2).remote(worker_id=worker_id, queue=queue) 
    Consumer.options(max_concurrency=2).remote(worker_id=worker_id, queue=queue) 
    for worker_id in range(2)
]

## Invoke actor method to read from the queue

In [16]:
future_objects = [
        consumer.run.remote() for consumer in consumers
]

[2m[36m(Consumer pid=3174)[0m 1 running.
[2m[36m(Consumer pid=3173)[0m 0 running.


## Put tasks into the queue to get the remote method unblocked

In [17]:
_ = [queue.put(i) for i in range(10)]

[2m[36m(Consumer pid=3174)[0m consumer 1 got work 1
[2m[36m(Consumer pid=3174)[0m consumer 1 got work 3
[2m[36m(Consumer pid=3174)[0m consumer 1 got work 5
[2m[36m(Consumer pid=3174)[0m consumer 1 got work 7
[2m[36m(Consumer pid=3174)[0m consumer 1 got work 9
[2m[36m(Consumer pid=3173)[0m consumer 0 got work 0
[2m[36m(Consumer pid=3173)[0m consumer 0 got work 2
[2m[36m(Consumer pid=3173)[0m consumer 0 got work 4
[2m[36m(Consumer pid=3173)[0m consumer 0 got work 6
[2m[36m(Consumer pid=3173)[0m consumer 0 got work 8


## Stop the actors

In [18]:
for consumer in consumers:
    # consumer.exit.remote()
    consumer.stop.remote()

[2m[36m(Consumer pid=3174)[0m id: 1 stopping...
[2m[36m(Consumer pid=3173)[0m id: 0 stopping...


In [19]:
# Send items to unblock actors if they are blocking on the queue
_ = [queue.put(i) for i in range(10)]

[2m[36m(Consumer pid=3174)[0m exiting run()...
[2m[36m(Consumer pid=3173)[0m exiting run()...


In [20]:
# Wait for the actors exiting
while future_objects:
    
    # Replace not_done_ids with the list of object references that aren't
    # ready. Store the list of object references that are ready in done_ids.
    # timeout=1 means sleep at most 1 second, do not sleep if there are
    # new object references that are ready.
    done_objects, future_objects = ray.wait(future_objects, timeout=1)
    
    # ray.get can take an iterable of object references.
    done_values = ray.get(done_objects)

    # Process each result.
    for result in done_values:
        print(f'result: {result}')

result: 1
result: 1


In [21]:
queue.shutdown()

# Cleanup

In [22]:
ray.shutdown()