Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vLLM ignores my requests when I increase the number of concurrent requests #2752

Open
savannahfung opened this issue Feb 5, 2024 · 7 comments

Comments

@savannahfung
Copy link

I am using a runpod container to run vLLM.
Template: runpod/pytorch:2.1.1-py3.10-cuda12.1.1-devel-ubuntu22.04
GPU Cloud: 1 x RTX 3090 | 12 vCPU 31 GB RAM

It works perfectly fine when I send 9 concurrent requests but it starts to hang when I increase it to 10.

python -m vllm.entrypoints.openai.api_server --model openchat/openchat_3.5 --tensor-parallel-size 1

...
INFO:     127.0.0.1:46228 - "POST /v1/chat/completions HTTP/1.1" 200 OK
INFO:     127.0.0.1:46230 - "POST /v1/chat/completions HTTP/1.1" 200 OK
INFO 02-05 04:53:20 async_llm_engine.py:111] Finished request cmpl-672a8058f6cb4d1d8f5ba5397af93575.
INFO 02-05 04:53:20 async_llm_engine.py:111] Finished request cmpl-4314994fe17a4b708bdbc0570668107b.
INFO 02-05 04:53:20 async_llm_engine.py:111] Finished request cmpl-85089ac09b6241f781d49b2b05fec1c6.
INFO 02-05 04:53:20 async_llm_engine.py:111] Finished request cmpl-b66387e22ebb4b33a010835b5d31f499.
INFO 02-05 04:53:21 llm_engine.py:706] Avg prompt throughput: 1137.2 tokens/s, Avg generation throughput: 193.7 tokens/s, Running: 5 reqs, Swapped: 0 reqs, Pending: 0 reqs, GPU KV cache usage: 4.2%, CPU KV cache usage: 0.0%
INFO 02-05 04:53:21 async_llm_engine.py:111] Finished request cmpl-e9f50d97a01148308ccb3e8626b6feb6.
INFO 02-05 04:53:21 async_llm_engine.py:111] Finished request cmpl-b0e0c9e0b76c450c8d4990fbab5f9fa6.
INFO 02-05 04:53:21 async_llm_engine.py:111] Finished request cmpl-90c28be5bdb44d079f8e3bc4b281cc29.
INFO 02-05 04:53:21 async_llm_engine.py:111] Finished request cmpl-55440ec21be24922b2ef820fdead76bc.
INFO 02-05 04:53:21 async_llm_engine.py:111] Finished request cmpl-e2a78285e0814d518750ef67f28a7af4.
INFO 02-05 04:53:21 async_llm_engine.py:111] Finished request cmpl-e14764b23e38400393e22015ea6c6fd7.

It just stop processing the last input and hangs there.

Processing files:  67%|█████████████████████▎          | 2/3 [01:34<00:50, 50.91s/files]
User 0: Processing intent.txt:  80%|███████████████▏   | 4/5 [00:06<00:01,  1.88s/lines]
User 1: Processing intent.txt:  80%|███████████████▏   | 4/5 [00:07<00:02,  2.13s/lines]
User 2: Processing intent.txt:  80%|███████████████▏   | 4/5 [00:07<00:02,  2.08s/lines]
User 5: Processing intent.txt:  80%|███████████████▏   | 4/5 [00:07<00:02,  2.00s/lines]
User 3: Processing intent.txt:  80%|███████████████▏   | 4/5 [00:07<00:02,  2.02s/lines]
User 9: Processing intent.txt:  80%|███████████████▏   | 4/5 [00:07<00:02,  2.01s/lines]
User 8: Processing intent.txt:  80%|███████████████▏   | 4/5 [00:07<00:02,  2.16s/lines]
User 4: Processing intent.txt:  80%|███████████████▏   | 4/5 [00:07<00:01,  1.99s/lines]
User 7: Processing intent.txt:  80%|███████████████▏   | 4/5 [00:07<00:01,  1.91s/lines]
User 6: Processing intent.txt:  80%|███████████████▏   | 4/5 [00:07<00:02,  2.06s/lines]

I tried to inlcude --swap-space 0 but the error still exists, nothing changes.

@hmellor
Copy link
Collaborator

hmellor commented Feb 7, 2024

There could be an issue with your processing script, I've managed to have hundreds of concurrent requests in flight without seeing issues like this. Would you be able to share your processing script?

@nehalvaghasiya
Copy link

nehalvaghasiya commented Feb 7, 2024

@savannahfung @WoosukKwon @hmellor I'm experiencing a similar issue as described above, including with "vllm.entrypoints.api_server". Initially, it handles concurrent requests effectively, but eventually, it starts to hang. My assumption is that this might be related to the GPU KV cache steadily increasing to 99.4%, leading to crashes and hang-ups, thereby leaving other requests in a "pending" state. It's highly probable that this issue is related to #2731.

@savannahfung
Copy link
Author

@hmellor I am testing the performance of different llms and the script works for smaller number of concurrent requests. For openchat/openchat_3.5, it hangs at around 10 concurrent requests, but if I use larger models (e.g. Nexusflow/NexusRaven-13B, mistralai/Mixtral-8x7B-Instruct-v0.1), it hangs earlier at around 2-3 concurrent requests.

def create(self, messages):
      res = self.client.chat.completions.create(
          model=self.model,
          messages=messages,
          stream=True
      )
      return res

def generate(self, messages):
      start_time = time.time()
      stream = self.create(messages)
      res_content = ""
      for chunk in stream:
          res = chunk
          if res.choices[0].finish_reason is not None:
              end_time = time.time()
              break
          if res.choices[0].delta.role is not None:
              created_time = time.time()
              res_role = res.choices[0].delta.role
          if res.choices[0].delta.content is not None:
              res_content += res.choices[0].delta.content
          
      if res.choices[0].finish_reason == "stop":
          res_status = "success"
      elif res.choices[0].finish_reason == "length":
          print("Warning: response reached max tokens.")
          res_status = "max_tokens"
      else:
          print("Error: response finished unexpectedly.")
          res_status = "error"

      response = {
          "status": res_status,
          "object": res
      }
      total_duration = end_time - start_time
      ttft = created_time - start_time
      tpot = total_duration / res.usage["completion_tokens"] if res.usage["completion_tokens"] > 0 else 0
      throughput = 1 / (ttft + tpot)
      res_message = {
          "role": res_role,
          "content": res_content
      }

      return {
          "ttft": ttft,
          "tpot": tpot,
          "throughput": throughput,
          "latency": total_duration,
          "message": res_message,
          "response": response
      }
def evaluate(self, input_file, output_file, reference_file, user_id = None, avg_res_time = None, results = None):
      chat_history = []
      with open(input_file, "r") as inf, open(output_file, "w") as outf, open(reference_file, "r") as ref:
          if user_id is not None:
              outf.write("User ID: " + str(user_id) + "\n")
          outf.write("Model: " + self.llm.model + "\n")
          outf.write("Input file: " + input_file + "\n\n")
          metrics = {
              "bleu": 0,
              "rouge1": 0,
              "rougeL": 0,
              "ttft": 0,
              "tpot": 0,
              "throughput": 0,
              "latency": 0,
          }

          total_lines = sum(1 for line in inf)
          inf.seek(0)
          desc = f"{'User ' + str(user_id) + ': ' if user_id is not None else ''}Processing {input_file.split('/')[-1]}"
          for line_num, (input_line, ref_line) in enumerate(zip(tqdm(inf, total=total_lines, desc=desc, unit="lines"), ref)):
              message = json.loads(input_line)
              chat_history.append(message)

              if message["role"] == "system":
                  outf.write("> " + message["content"] + "\n\n")
                  continue

              if avg_res_time is not None:
                  res_time = np.random.weibull(1) / avg_res_time
                  time.sleep(res_time)

              res = self.llm.generate(chat_history)
              chat_history.append(res["message"])

              smoothing = SmoothingFunction().method1
              bleu = sentence_bleu([ref_line.split()], res["message"]["content"].split(), smoothing_function=smoothing)
              metrics["bleu"] += bleu

              scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
              rouge = scorer.score(ref_line, res["message"]["content"])
              metrics["rouge1"] += rouge["rouge1"].fmeasure
              metrics["rougeL"] += rouge["rougeL"].fmeasure

              metrics["ttft"] += res["ttft"]
              metrics["tpot"] += res["tpot"]
              metrics["throughput"] += res["throughput"]
              metrics["latency"] += res["latency"]

              outf.write(">>> " + message["content"] + "\n\n")
              outf.write(res["message"]["content"] + "\n\n")
              outf.write("BLEU: " + str(bleu) + "\n")
              outf.write("Rouge-1: " + str(rouge["rouge1"].fmeasure) + "\n")
              outf.write("Rouge-L: " + str(rouge["rougeL"].fmeasure) + "\n")
              outf.write("TTFT: " + str(res["ttft"]) + "\n")
              outf.write("TPOT: " + str(res["tpot"]) + "\n")
              outf.write("Throughput: " + str(res["throughput"]) + "\n")
              outf.write("Latency: " + str(res["latency"]) + "\n\n")

              if res["response"]["status"] != "success":
                  outf.write("Stop request.\n")
                  outf.write("Status: " + res["response"]["status"] + "\n")
                  outf.write("Object: " + str(res["response"]["object"]) + "\n")
                  break
          
          metrics["bleu"] /= (total_lines - 1)
          metrics["rouge1"] /= (total_lines - 1)
          metrics["rougeL"] /= (total_lines - 1)
          metrics["ttft"] /= (total_lines - 1)
          metrics["tpot"] /= (total_lines - 1)
          metrics["throughput"] /= (total_lines - 1)
          metrics["latency"] /= (total_lines - 1)

          outf.write("Average BLEU: " + str(metrics["bleu"]) + "\n")
          outf.write("Average Rouge-1: " + str(metrics["rouge1"]) + "\n")
          outf.write("Average Rouge-L: " + str(metrics["rougeL"]) + "\n")
          outf.write("Average TTFT: " + str(metrics["ttft"]) + "\n")
          outf.write("Average TPOT: " + str(metrics["tpot"]) + "\n")
          outf.write("Average Throughput: " + str(metrics["throughput"]) + "\n")
          outf.write("Average Latency: " + str(metrics["latency"]) + "\n")

          if results is not None:
              results.append(metrics)

          return metrics

def simulate_user(self, num_users, avg_res_time):
      metrics = {
          "bleu": 0,
          "rouge1": 0,
          "rougeL": 0,
          "ttft": 0,
          "tpot": 0,
          "throughput": 0,
          "latency": 0,
      }
      total_files = len(self.files)
      metrics_file = self.simulation_metrics_base + self.model_name + ".txt"
      with open(metrics_file, "w") as mf:
          mf.write("Model: " + self.llm.model + "\n\n")
          mf.write("Number of users: " + str(num_users) + "\n")
          mf.write("Average response time: " + str(avg_res_time) + "\n\n")
          for file_num, file in tqdm(enumerate(self.files, start=1), total=total_files, desc="Processing files",
                                     unit="files"):
              input_file = self.input_base + file + ".txt"
              reference_file = self.reference_base + file + ".txt"
              output_file = self.simulation_output_base + file + "/"
              os.makedirs(output_file, exist_ok=True)
              fmetrics = {
                  "bleu": 0,
                  "rouge1": 0,
                  "rougeL": 0,
                  "ttft": 0,
                  "tpot": 0,
                  "throughput": 0,
                  "latency": 0,
              }

              threads = []
              results = []
              for user_id in range(num_users):
                  usr_output_file = output_file + str(user_id) + ".txt"
                  thread = threading.Thread(target=self.evaluate,
                                            args=(input_file, usr_output_file, reference_file, user_id, avg_res_time, results))
                  thread.start()
                  threads.append(thread)

              for thread in threads:
                  thread.join()

              for res in results:
                  fmetrics["bleu"] += res["bleu"]
                  fmetrics["rouge1"] += res["rouge1"]
                  fmetrics["rougeL"] += res["rougeL"]
                  fmetrics["ttft"] += res["ttft"]
                  fmetrics["tpot"] += res["tpot"]
                  fmetrics["throughput"] += res["throughput"]
                  fmetrics["latency"] += res["latency"]

              fmetrics["bleu"] /= num_users
              fmetrics["rouge1"] /= num_users
              fmetrics["rougeL"] /= num_users
              fmetrics["ttft"] /= num_users
              fmetrics["tpot"] /= num_users
              fmetrics["throughput"] /= num_users
              fmetrics["latency"] /= num_users

              metrics["bleu"] += fmetrics["bleu"]
              metrics["rouge1"] += fmetrics["rouge1"]
              metrics["rougeL"] += fmetrics["rougeL"]
              metrics["ttft"] += fmetrics["ttft"]
              metrics["tpot"] += fmetrics["tpot"]
              metrics["throughput"] += fmetrics["throughput"]
              metrics["latency"] += fmetrics["latency"]

              mf.write("filee: " + file + "\n")
              mf.write("BLEU: " + str(fmetrics["bleu"]) + "\n")
              mf.write("Rouge-1: " + str(fmetrics["rouge1"]) + "\n")
              mf.write("Rouge-L: " + str(fmetrics["rougeL"]) + "\n")
              mf.write("TTFT: " + str(fmetrics["ttft"]) + "\n")
              mf.write("TPOT: " + str(fmetrics["tpot"]) + "\n")
              mf.write("Throughput: " + str(fmetrics["throughput"]) + "\n")
              mf.write("Latency: " + str(fmetrics["latency"]) + "\n\n")

          metrics["bleu"] /= total_files
          metrics["rouge1"] /= total_files
          metrics["rougeL"] /= total_files
          metrics["ttft"] /= total_files
          metrics["tpot"] /= total_files
          metrics["throughput"] /= total_files
          metrics["latency"] /= total_files

          mf.write("Average BLEU: " + str(metrics["bleu"]) + "\n")
          mf.write("Average Rouge-1: " + str(metrics["rouge1"]) + "\n")
          mf.write("Average Rouge-L: " + str(metrics["rougeL"]) + "\n")
          mf.write("Average TTFT: " + str(metrics["ttft"]) + "\n")
          mf.write("Average TPOT: " + str(metrics["tpot"]) + "\n")
          mf.write("Average Throughput: " + str(metrics["throughput"]) + "\n")
          mf.write("Average Latency: " + str(metrics["latency"]) + "\n")

          return metrics

@savannahfung
Copy link
Author

@nehalvaghasiya That's weird, because just before it hangs the GPU KV cache usage was just 4.2% and the CPU KV cache usage was 0.0%. Unless it just suddenly spiked to 99.9%.

INFO 02-05 04:53:21 llm_engine.py:706] Avg prompt throughput: 1137.2 tokens/s, Avg generation throughput: 193.7 tokens/s, Running: 5 reqs, Swapped: 0 reqs, Pending: 0 reqs, GPU KV cache usage: 4.2%, CPU KV cache usage: 0.0%

@nehalvaghasiya
Copy link

image
Hey @savannahfung, just wanted to let you know that I've been running 100 concurrent requests smoothly. However, when I try to send the 101st request, I've noticed that the GPU KV Cache usage spikes up to 99.4%. Consequently, all subsequent requests end up in a 'pending' state.

@hmellor
Copy link
Collaborator

hmellor commented Mar 5, 2024

@savannahfung I meant the part of your script that makes the requests, you've provided too much extra code to find where the problem is. Can you make a minimal reproducer?

I'm following up because I am also now seeing this issue where 100 concurrent requests will indefinitely be swapped in and out of the pending queue as described by @nehalvaghasiya.

@tilmanbeck
Copy link

tilmanbeck commented Apr 19, 2024

Hi all,
I am experiencing the same issue.
Python==3.11.5
vllm==0.4.0.post1
openai==1.23.1

This is how I start the openai server:
CUDA_VISIBLE_DEVICES=0 python -m vllm.entrypoints.openai.api_server --model mistralai/Mistral-7B-Instruct-v0.2 --uvicorn-log-level debug --port 8001 > vllm_server_log.txt 2>&1 &

This is my Python code to produce the error:

import asyncio
from openai import AsyncOpenAI

model_name='mistralai/Mistral-7B-Instruct-v0.2'
client=AsyncOpenAI(api_key="EMPTY",base_url=f"http://localhost:8001/v1/")

async def _send_chat_completion(messages):
    completion = await client.chat.completions.create(model=model_name, messages=messages, temperature=0.0)
    return completion.choices[0].message.content.strip()

async def _send_async_requests(prompts_messages):
    tasks = [_send_chat_completion(msgs) for msgs in prompts_messages]
    responses = await asyncio.gather(*tasks)
    return responses

prompts_msgs = [{'role': 'user', 'content': 'suggest a dinner meal'}]
print('Starting first run..')
responses = asyncio.run(_send_async_requests([prompts_msgs] * 5))
print('Starting second run..')
responses = asyncio.run(_send_async_requests([prompts_msgs] * 5))

The second run never finishes and the server logs don't even mention it as incoming requests.

I want to point other users facing similar issue to the issue on the corresponding openai github page where they reported that they are actively working on the fix but it seems to be a more serious issue related to other modules used by openai (see openai/openai-python#769).

My workaround was to use raw requests where I did not see this error happening (albeit openai reports in the above linked issue that you might encounter the same issue there, too). Adjusting above code looks like this:

import asyncio
import aiohttp
async def _send_chat_completion(messages):
    print('starting openai request')
    async with aiohttp.ClientSession() as session:
        response = await session.post(url="http://localhost:8001/v1/chat/completions",
                                      json={"messages": messages, "model": "mistralai/Mistral-7B-Instruct-v0.2"},
                                      headers={"Content-Type": "application/json"})
        return await response.json()

async def _send_async_requests(prompts_messages):
    tasks = [_send_chat_completion(msgs) for msgs in prompts_messages]
    responses = await asyncio.gather(*tasks)
    responses = [resp['choices'][0]['message']['content'].strip() for resp in responses]
    return responses

prompts_msgs = [{'role': 'user', 'content': 'suggest a dinner meal'}]
print('Starting first run..')
responses = asyncio.run(_send_async_requests([prompts_msgs] * 5))
print('Starting second run..')
responses = asyncio.run(_send_async_requests([prompts_msgs] * 5))

Let's hope it gets fixed quickly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants