Skip to content

[Serve][Streaming] Intermediate deployment throughput drops when serving concurrent users.  #39705

@sihanwang41

Description

@sihanwang41

What happened + What you expected to happen

Observed intermediate deployment throughput dropped when serving 20 concurrent users (aka locsut users).
Model throughput is around 600~, yet the Router throughput is 100 - 200.

Throughput is the number we print in the code.

note:

  • Doing async sleep in the SimpleGenerator class.
  • Observed Failed to get queue length from replica default#SimpleGenerator#Rndqlx within 0.1s very often in the Router replica log. That is might be reasonable since huge number of requests into the Model replica. (happen very often for 100 users)

Versions / Dependencies

master

Reproduction script

stream.py

from fastapi import FastAPI
from ray import serve
from starlette.responses import StreamingResponse
from starlette.requests import Request
import requests
import time
import asyncio
from data2 import AviaryModelResponse
import time


app = FastAPI()


@serve.deployment(max_concurrent_queries=1000)
@serve.ingress(app)
class Router:
    def __init__(self, handle) -> None:
        self._h = handle.options(stream=True)
        self.total_recieved = 0

    @app.get("/")
    def stream_hi(self, request: Request) -> StreamingResponse:
        async def consume_obj_ref_gen():
            obj_ref_gen = await self._h.hi_gen.remote()
            start = time.time()
            num_recieved = 0
            async for chunk in obj_ref_gen:
                chunk = await chunk
                num_recieved +=1 
                yield str(chunk.json())
            delta = time.time() - start
            print(f"**request throughput: {num_recieved / delta}")
        return StreamingResponse(consume_obj_ref_gen(), media_type="text/plain")


@serve.deployment(max_concurrent_queries=1000)
class SimpleGenerator:
    async def hi_gen(self):
        start = time.time()
        for i in range(100):
            await asyncio.sleep(0.001)
            #time.sleep(0.001) # if change to async sleep, i don't see crash.
            yield AviaryModelResponse(generated_text="abcd")
        delta = time.time() - start
        print(f"**model throughput: {100 / delta}")


serve.run(Router.bind(SimpleGenerator.bind()))

time.sleep(3600)

locust

import requests
import time
import io


from locust import HttpUser, task, constant, events

class CodeGenClient(HttpUser):
    
    wait_time = constant(1)
    @task
    def send_serve_requests(self):
        request_meta = {
            "request_type": "InvokeEndpoint",
            "name": "Streamtest",
            "start_time": time.time(),
            "response_length": 0,
            "response": None,
            "context": {},
            "exception": None,
        }
        start_perf_counter = time.perf_counter()
        #r = self.client.get("/", stream=True)
        r = requests.get("http://localhost:8000", stream=True)
        if r.status_code != 200:
            print(r)
        else:
            for i, chunk in enumerate(r.iter_content(chunk_size=None, decode_unicode=True)):
                pass
            request_meta["response_time"] = (
                time.perf_counter() - start_perf_counter
            ) * 1000
            events.request.fire(**request_meta)

Issue Severity

None

Metadata

Metadata

Labels

P2Important issue, but not time-criticalbugSomething that is supposed to be working; but isn'tcoreIssues that should be addressed in Ray Coreperformance

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions