Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Rewrite Router to be Embeddable #12019

Merged
merged 5 commits into from
Nov 17, 2020

Conversation

simon-mo
Copy link
Contributor

@simon-mo simon-mo commented Nov 14, 2020

Why are these changes needed?

This PR refactor the router into an embeddable object. It:

  • utilizes long polling to fetch config from the controller
  • has a simpler job (assign query to replica)

Now you can just call await router.assign_query(query) to retrieve an object ref. The method is async and it will wait until a replica is freed. The ServeHandle now utilizes this embeddable router (instead of calling to the HTTPProxy actor). For the handle to be accessible in sync context, we start a thread hosting the asyncio loop that runs the controller.

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Contributor

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, this is significantly simpler, which is a great sign :)

router_chosen,
endpoint_name,
)
if endpoint_name not in self._handle_cache:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we remove from this cache?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do it when an endpoint is deleted? Should be some kind of hook for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I also added a comment in the constructor.


import numpy as np

from ray.serve.utils import logger


@lru_cache(maxsize=128)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! It's probably worth benchmarking this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stdlib is 10x faster than numpy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this generally task ~10us in cache miss


import numpy as np

from ray.serve.utils import logger


@lru_cache(maxsize=128)
def deterministic_hash(key: bytes) -> float:
"""Given an arbitrary string, return a value between 0 and 1"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Given an arbitrary string, return a value between 0 and 1"""
"""Given an arbitrary string, return a deterministic value between 0 and 1."""

Comment on lines 31 to 32
def flush(self, query) -> List[str]:
"""Assign a query to a list of backends.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So much nicer!

from the endpoint queue and pushing them onto a backend queue. The
method must also return a set of all backend tags so that the caller
knows which backend_queues to flush.
def flush(self, query) -> List[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this method btw? Something like "assign_query" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i changed it to assign. good catch!

def create_or_get_async_loop_in_thread():
global global_async_loop
if global_async_loop is None:
# We have to asyncio's loop because uvloop will segfault when there
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot a word

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems pretty hacky/dangerous. seems like a red flag that we're doing something wrong to me?

Comment on lines +80 to +82
asyncio.run_coroutine_threadsafe(
self.router.setup_in_async_loop(),
self.async_loop,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we use run_coroutine_threadsafe here but create_task down below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_task isn't thread safe :-)

future = asyncio.run_coroutine_threadsafe(coro, self.async_loop)
return future.result()

async def remote_async(self, request_data, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please mark this experimental for now so that we can change the API in the near future if we want to

Comment on lines 174 to 175
# NOTE(simon): we separated this method to run all the setup in asyncio
# loop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment doesn't make sense to me out of context. Separated it from what?

self.backend_replicas[backend_tag].set_max_concurrent_queries(
config.max_concurrent_queries)

async def enqueue_request(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there aren't really queues anymore, maybe we should rename this :) --> assign_request? send_request?

@edoakes edoakes self-assigned this Nov 16, 2020
kwargs = pickle.loads(value)
return Query(**kwargs)
class ReplicaSet:
"""Data structure represents a set of replica actor handles"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Data structure represents a set of replica actor handles"""
"""Data structure representing a set of replica actor handles."""

"""Data structure represents a set of replica actor handles"""

def __init__(self):
# The initialization setup before we receive them from long polling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before we receive what from long polling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed. the NOTE below already explained it.

self.max_concurrent_queries: int = 8
self.in_flight_queries: Dict[ActorHandle, set] = dict()

# The iterator used for load balancing among replicas.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please summarize the existing policy

# the same node.
self.replica_iterator = itertools.cycle(self.in_flight_queries.keys())

# Used to unblock the `await all_in_flight_queries` wait. A newly added
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't know what that wait is out of context, please describe it in higher level terms rather than code

self.in_flight_queries.keys())
self.config_updated_event.set()

def _try_assign_replica(self, query: Query) -> Optional[ray.ObjectRef]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please comment when this returns a ref vs. None and what None signals

return len(done)

async def assign_replica(self, query: Query) -> ray.ObjectRef:
"""Given a query, submit it to a replica and returns the object ref.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Given a query, submit it to a replica and returns the object ref.
"""Given a query, submit it to a replica and return the object ref.

if assigned_ref is not None:
return assigned_ref

# Maybe there exist a free replica, we just need to refresh our queries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Maybe there exist a free replica, we just need to refresh our queries
# Maybe there exists a free replica, we just need to refresh our query

# Maybe there exist a free replica, we just need to refresh our queries
# tracker.
num_finished = self._drain_completed_object_refs()
# All replicas are really busy, let's wait for one query to complete or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# All replicas are really busy, let's wait for one query to complete or
# All replicas are really busy, wait for a query to complete or

# tracker.
num_finished = self._drain_completed_object_refs()
# All replicas are really busy, let's wait for one query to complete or
# the config is updated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# the config is updated.
# the config to be updated.

self.config_updated_event.clear()
# We are pretty sure a free replica is ready now, let's recurse and
# assign this query a replica.
return await self.assign_replica(query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to use recursion here to avoid any issues with Python stack limits in case of weird behavior. The logic is potentially easier to follow that way too.

router_chosen,
endpoint_name,
)
if endpoint_name not in self._handle_cache:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I also added a comment in the constructor.


import numpy as np

from ray.serve.utils import logger


@lru_cache(maxsize=128)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stdlib is 10x faster than numpy.


import numpy as np

from ray.serve.utils import logger


@lru_cache(maxsize=128)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this generally task ~10us in cache miss

from the endpoint queue and pushing them onto a backend queue. The
method must also return a set of all backend tags so that the caller
knows which backend_queues to flush.
def flush(self, query) -> List[str]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i changed it to assign. good catch!

Comment on lines +80 to +82
asyncio.run_coroutine_threadsafe(
self.router.setup_in_async_loop(),
self.async_loop,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_task isn't thread safe :-)

"""Data structure represents a set of replica actor handles"""

def __init__(self):
# The initialization setup before we receive them from long polling
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed. the NOTE below already explained it.


def create_or_get_async_loop_in_thread():
global global_async_loop
if global_async_loop is None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi the resetting the default event loop policy is gone (probably some false positive during testing and debugging)

Copy link
Contributor

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

assert self.sync, "handle.remote() should be called from sync handle."
coro = self._remote(request_data, kwargs)
future = asyncio.run_coroutine_threadsafe(coro, self.async_loop)
return future.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this doesn't seem like the interface we want -- this will throw an exception if the future's result isn't yet availabe

Copy link
Contributor Author

@simon-mo simon-mo Nov 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this future is a concurrent.futures.Future object, which will block until the result is ready when you call future.result().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, that's quite confusing...

@edoakes
Copy link
Contributor

edoakes commented Nov 16, 2020

@simon-mo please add a description to the PR before merging.

@simon-mo simon-mo merged commit d7c95a4 into ray-project:master Nov 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants