Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to solve Connection reset by remote peer? #985

Open
sunkenQ opened this issue Sep 5, 2023 · 3 comments
Open

How to solve Connection reset by remote peer? #985

sunkenQ opened this issue Sep 5, 2023 · 3 comments

Comments

@sunkenQ
Copy link

sunkenQ commented Sep 5, 2023

I constructed a simple server using UCX-py to continuously accept data. During the stress test, I found that some requests would be lost due to network fluctuations. I guess UCX-py /ucx has a certain setting that limits the time of trying to connect.

And I don't want to server side to give the client a return value to determine whether the connection is successful, because I found in the experiment, if you want to return value will lead to take to increase very much, if there are other ways can let me avoid request will not be lost because the network fluctuation?

Simple Server code:

import asyncio
import time
import ucp
import numpy as np

n_bytes = 2**30
host = ucp.get_address(ifname='ib0')  # ethernet device name
port = 13337

async def send(ep):
    # recv buffer
    arr = np.empty(n_bytes, dtype='u1')
    await ep.recv(arr)
    await ep.close()

async def main():
    global lf
    lf = ucp.create_listener(send, port)

    while not lf.closed():
        await asyncio.sleep(0.1)

if __name__ == '__main__':
    asyncio.run(main())

ERROR Message:

Task exception was never retrieved
future: <Task finished name='Task-3536' coro=<_listener_handler_coroutine() done, defined at /home/qiusy/miniconda3/envs/ucx/lib/python3.9/site-packages/ucp/core.py:128> exception=UCXError('<stream_recv>: Connection reset by remote peer')>
Traceback (most recent call last):
  File "/home/qiusy/miniconda3/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 143, in _listener_handler_coroutine
    peer_info = await exchange_peer_info(
  File "/home/qiusy/miniconda3/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 52, in exchange_peer_info
    await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes)
ucp._libs.exceptions.UCXError: <stream_recv>: Connection reset by remote peer
@pentschev
Copy link
Member

Hi @sunkenQ , apologies for the late reply but I've been out of the office for the past couple of weeks. Could you elaborate what you mean by "stress test"? Is it that you run the same code many times in sequence, many times but all in parallel, or some other situation?

From the looks of this error it seems that you're hitting race conditions during the phase where connections are established. In that situation we require the use of the UCX stream API which can cause such collisions and, unfortunately, there's no good solution for that currently. Exchanging peer information is required so that endpoints know which tag to use to communicate both ways when using the tag API (Endpoint.recv()/ep.send()), that is not a requirement for the active messages API (Endpoint.am_recv()/Endpoint.am_send()) and you should be able to use that instead, but that is not as well tested as the tag API currently. Even if you use the active messages API exchanging peer information will still happen and probably cause the same issue, but that is technically not a requirement for the active messages API and we could easily resolve that if you can confirm the active messages API would work for your use case and clarify what your stress test is doing.

@sunkenQ
Copy link
Author

sunkenQ commented Sep 22, 2023

@pentschev Thanks for your reply!
First let me clarify that what "stress test" does is 8 clients send requests to the Server at the same time. In my test, there are two services A and B listening on two ports on the Server side, and each client will send two requests every 2 seconds in the form of scheduled task. The first request sends a bytearray with length 278616000 to service A. The second request then sends a bytearray with length 7000 to service B, both are sent by await ep.send(buffer).
My first test lasted 4 hours, with over 80,000 requests from 8 clients, and I found that no requests were lost for service A, but 20% lost for service B. I retested in TRACE mode and found the following message wireup_cm.c:303 UCX DEBUG CM rdmacm private data buffer is too small to pack UCP endpoint info, cm max_conn_priv 54, service data version 0, size 11, address length 68 . So I increased the bytearray sent to service B from 7000 to 70000 in the second test. Although the message still appeared in log, there was no loss of requests. It means I didn't actually find real solution, just no errors for now.

In addition, I also tested single client to send a request to service A and B every two seconds in the way of a scheduled task. The message The UCX ERROR failed to create a dummy ud qp. Cannot allocate memory appears after the client has sent a total of 50,000 requests, and can be steadily repeated. After profiling I found that there was a slow and steady memory leak when creating the Endpoint. What confused me was that my client had 256GB memory. However, I tested that only 250MB of memory was leaked when I sent a total of 10000 requests.I tried calling gc.collect() and del ep after each request but nothing worked.

2023-09-04 02:20:42.383 | INFO     | __main__:multiprocessing_test:183 - process-0 feature update time cost:555.520ms
2023-09-04 02:20:42.464 | INFO     | __main__:multiprocessing_test:188 - process-0 inference time cost:80.901ms
[1693765242.624676] [wy-10-41-6-73:141330:0]    rdmacm_cm_ep.c:347  UCX  ERROR failed to create a dummy ud qp. Cannot allocate memory
Traceback (most recent call last):
  File "/home/qiusy/experiment/client.py", line 203, in <module>
    multiprocessing_test(0)
  File "/home/qiusy/experiment/client.py", line 174, in multiprocessing_test
    asyncio.run(
  File "/home/qiusy/.conda/envs/ucx/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/qiusy/.conda/envs/ucx/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/qiusy/experiment/client.py", line 150, in update_request
    await self.send_data(keepalive_request=keepalive_request, infer_type="update")
  File "/home/qiusy/experiment/client.py", line 118, in send_data
    ep = await ucp.create_endpoint(self.server_host, self.update_port)
  File "/home/qiusy/.conda/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 1004, in create_endpoint
    return await _get_ctx().create_endpoint(
  File "/home/qiusy/.conda/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 316, in create_endpoint
    peer_info = await exchange_peer_info(
  File "/home/qiusy/.conda/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 54, in exchange_peer_info
    await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes)
ucp._libs.exceptions.UCXError: <stream_recv>: Input/output error

The log shows the first memory increase at line 167:
313.1328 MiB 9.7969 MiB 2 ep = await ucp.create_endpoint(self.server_host, self.infer_port)

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
   154 303.3359 MiB 303.3359 MiB           1       @profile(precision=4,stream=open('send_data.log','w+'))
   155                                             async def send_data(self, keepalive_request: bool, request_type: str = "update"):
   156                                                 """
   157                                                 Sends the data to the server.
   158                                         
   159                                                 Args:
   160                                                     keepalive_request: Indicates if it's a keepalive request.
   161                                                     request_type: The type of inference ("update" or "infer").
   162                                                 """
   163 303.3359 MiB   0.0000 MiB           1           if request_type == "update":
   164                                                     ep = await ucp.create_endpoint(self.server_host, self.update_port)
   165                                                     await ep.send(self.update_buffer)
   166                                                 else:
   167 313.1328 MiB   9.7969 MiB           2               ep = await ucp.create_endpoint(self.server_host, self.infer_port)
   168 313.1328 MiB   0.0000 MiB           1               await ep.send(self.infer_buffer)
   169                                         
   170 313.1328 MiB   0.0000 MiB           1           if keepalive_request is True:
   171                                                     ok = await ep.recv_obj()
   172                                                     ok = ok.decode("utf-8")
   173 313.3555 MiB   0.2227 MiB           2           await ep.close()

Below is the memory trend graph:
image

If you need any other information, please let me know at any time, thank you!

@pentschev
Copy link
Member

@pentschev Thanks for your reply! First let me clarify that what "stress test" does is 8 clients send requests to the Server at the same time. In my test, there are two services A and B listening on two ports on the Server side, and each client will send two requests every 2 seconds in the form of scheduled task. The first request sends a bytearray with length 278616000 to service A. The second request then sends a bytearray with length 7000 to service B, both are sent by await ep.send(buffer).

My first test lasted 4 hours, with over 80,000 requests from 8 clients, and I found that no requests were lost for service A, but 20% lost for service B. I retested in TRACE mode and found the following message wireup_cm.c:303 UCX DEBUG CM rdmacm private data buffer is too small to pack UCP endpoint info, cm max_conn_priv 54, service data version 0, size 11, address length 68 . So I increased the bytearray sent to service B from 7000 to 70000 in the second test. Although the message still appeared in log, there was no loss of requests. It means I didn't actually find real solution, just no errors for now.

This sounds a lot like the problem is related to the transport that gets used to transfer. Depending on your setup it's likely that 7000 bytes gets sent as an eager message and 70000 bytes gets sent as a rendezvous message. You could try setting UCX_RNDV_THRESH=1 and rerunning with the 7000 bytes, if my suspicion is correct you'd then see the same behavior for both cases.

In addition, I also tested single client to send a request to service A and B every two seconds in the way of a scheduled task. The message The UCX ERROR failed to create a dummy ud qp. Cannot allocate memory appears after the client has sent a total of 50,000 requests, and can be steadily repeated. After profiling I found that there was a slow and steady memory leak when creating the Endpoint. What confused me was that my client had 256GB memory. However, I tested that only 250MB of memory was leaked when I sent a total of 10000 requests.I tried calling gc.collect() and del ep after each request but nothing worked.

2023-09-04 02:20:42.383 | INFO     | __main__:multiprocessing_test:183 - process-0 feature update time cost:555.520ms
2023-09-04 02:20:42.464 | INFO     | __main__:multiprocessing_test:188 - process-0 inference time cost:80.901ms
[1693765242.624676] [wy-10-41-6-73:141330:0]    rdmacm_cm_ep.c:347  UCX  ERROR failed to create a dummy ud qp. Cannot allocate memory
Traceback (most recent call last):
  File "/home/qiusy/experiment/client.py", line 203, in <module>
    multiprocessing_test(0)
  File "/home/qiusy/experiment/client.py", line 174, in multiprocessing_test
    asyncio.run(
  File "/home/qiusy/.conda/envs/ucx/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/qiusy/.conda/envs/ucx/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/qiusy/experiment/client.py", line 150, in update_request
    await self.send_data(keepalive_request=keepalive_request, infer_type="update")
  File "/home/qiusy/experiment/client.py", line 118, in send_data
    ep = await ucp.create_endpoint(self.server_host, self.update_port)
  File "/home/qiusy/.conda/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 1004, in create_endpoint
    return await _get_ctx().create_endpoint(
  File "/home/qiusy/.conda/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 316, in create_endpoint
    peer_info = await exchange_peer_info(
  File "/home/qiusy/.conda/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 54, in exchange_peer_info
    await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes)
ucp._libs.exceptions.UCXError: <stream_recv>: Input/output error

The failed to create a dummy ud qp error does not refer to system memory, but rather to InfiniBand queue pairs. In short, that means you have too many open endpoints and UCX cannot establish another one. When you say you have 50k request, do you mean only send/recv requests or endpoints being established? For the first case I would not expect this to be a problem, but for the latter case it definitely would be.

The log shows the first memory increase at line 167: 313.1328 MiB 9.7969 MiB 2 ep = await ucp.create_endpoint(self.server_host, self.infer_port)

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
   154 303.3359 MiB 303.3359 MiB           1       @profile(precision=4,stream=open('send_data.log','w+'))
   155                                             async def send_data(self, keepalive_request: bool, request_type: str = "update"):
   156                                                 """
   157                                                 Sends the data to the server.
   158                                         
   159                                                 Args:
   160                                                     keepalive_request: Indicates if it's a keepalive request.
   161                                                     request_type: The type of inference ("update" or "infer").
   162                                                 """
   163 303.3359 MiB   0.0000 MiB           1           if request_type == "update":
   164                                                     ep = await ucp.create_endpoint(self.server_host, self.update_port)
   165                                                     await ep.send(self.update_buffer)
   166                                                 else:
   167 313.1328 MiB   9.7969 MiB           2               ep = await ucp.create_endpoint(self.server_host, self.infer_port)
   168 313.1328 MiB   0.0000 MiB           1               await ep.send(self.infer_buffer)
   169                                         
   170 313.1328 MiB   0.0000 MiB           1           if keepalive_request is True:
   171                                                     ok = await ep.recv_obj()
   172                                                     ok = ok.decode("utf-8")
   173 313.3555 MiB   0.2227 MiB           2           await ep.close()

Below is the memory trend graph: image

If you need any other information, please let me know at any time, thank you!

From what I've heard from UCX developers, we should be ok with up to some 16K endpoints, but I'm not sure now if you're supposed to have at most only 8 clients * 2 listeners = 16 endpoint or more than 50000, as written above. Again, 16 endpoints should be totally ok, but 50k+ would definitely be a problem.

Would you be able to provide a minimal reproducer and/or logs for this? If you can't provide a minimal reproducer, at least a log of the processes for fewer iterations (for now let's say 10 or so) would be very useful, for that please execute UCXPY_LOG_LEVEL=DEBUG python ... > ucxpy.log.

Based on your description of your use case, I believe it may be somewhat similar to this test, therefore I would suggest having a look to see if they are similar, and if not, we would at least like to see what's different between the two cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants