Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is it necessary to make refresh_lease one long-lived coroutine? #33

Open
garvenlee opened this issue Dec 5, 2023 · 7 comments
Open

Comments

@garvenlee
Copy link

Here is my code:

    async def refresh_lease(self, lease_id: int, *, period: int):
        refresh = self.aetcd_client.refresh_lease                        
        while True:            
            # stream_stream meth but only request once with request_timeout            
            # so this will raise ConnectionTimeoutError(ClientError),           
            # then it means the service is down in etcd's view, and service supplier            
            # must register itself again            
            reply = await refresh(lease_id)  # let registered key closed with lease itself            
            self.last_refresh_time = monotonic()  # update if succeed            
            logger.debug(f"[EtcdLease] refresh_lease got a reply: {reply.ID} / {self.lease_id}")            
            await sleep(period)

Two hours after the periodic execution of refresh_lease of my code, I got the following exceptions:

Traceback (most recent call last):
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 21, in handler
    return await f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 42, in handler
    return await f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 1033, in refresh_lease
    async for reply in self.leasestub.LeaseKeepAlive(
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/grpc/aio/_call.py", line 356, in _fetch_stream_responses
    await self._raise_for_status()
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/grpc/aio/_call.py", line 263, in _raise_for_status
    raise _create_rpc_error(
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "Internal error from Core"
	debug_error_string = "Failed "execute_batch": (<grpc._cython.cygrpc.SendMessageOperation object at 0x7ff7890dd030>,)"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/chatp-0.0.1-py3.11.egg/chatp/discovery/etcd.py", line 62, in refresh_lease
    reply = await refresh(
            ^^^^^^^^^^^^^^
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/client.py", line 23, in handler
    exceptions._handle_exception(e)
  File "/home/garvenlee/anaconda3/envs/py311/lib/python3.11/site-packages/aetcd/exceptions.py", line 80, in _handle_exception
    raise e(error_details) from error
aetcd.exceptions.InternalError: Internal error from Core
@martyanov
Copy link
Owner

@garvenlee aetcd is mostly a thin wrapper over native gRPC transport used by etcd. In this particular case what refresh_lease does is just iterate over replies https://github.com/martyanov/aetcd/blob/v1.0.0a4/aetcd/client.py#L1032C6-L1038C25, you can see this in your traceback also. I have no idea what is the reason behind the internal error, but it came from gRPC.

@martyanov
Copy link
Owner

martyanov commented Dec 5, 2023

If this is a transient issue may be just spawn a warning and continue, otherwise check the server side and probably network for issues.

@martyanov
Copy link
Owner

Also the thread may be helpful #31.

@garvenlee
Copy link
Author

garvenlee commented Dec 5, 2023

@martyanov

aetcd is mostly a thin wrapper over native gRPC transport used by etcd

Yeah, but I suspect that the frequent creation of stream-stream requests is more stressful for the ETCD server? So can you evaluate this:

async def refresh_lease(self, id):
    await self.id_queue.put(id)
    waiter = create_future()
    self.waiter = waiter
    return await wait_for(waiter, timeout)

async def grpc_lease_keepalive(self, id_queue):
    async def request_iterator():
        while True:
            id = await id_queue.get()
            yield LeaseKeepAliveRequest(ID=id)

    async for reply in stub.LeaseKeepalive(request_iterator()):
       if not self.waiter.done():
           self.waiter.set_result(reply)
        

@martyanov
Copy link
Owner

@garvenlee I think you don't need this operation to perform continuously. Usually there is a sane TTL, for example 10 seconds, and after that you don't need more than TTL / 2 interval checks to maintain the lease. It highly depends on you particular case, but even 1 second interval seems unreasonable.

All the operations share one single channel and connection, so there should not be much overhead except for extreme cases.

@martyanov
Copy link
Owner

The lease machinery by itself solves the original KV TTL problem, when you have many entities and want to maintain TTL for all of them. With lease it is as easy as attach the same lease ID to all of them. I don't argue that there are no projects that will be limited by the one-stream-stream-per-refresh, but It's not an ordinary situation. When you have to maintain a large amount of keys with different TTLs leases are not the biggest concern.

@garvenlee
Copy link
Author

All the operations share one single channel and connection

Indeed, all requests share a channel, but I think that since ETCD has designed LeaseKeepalive as a two-way stream method, reusing this same stream can avoid the cost of stream resources to a certain extent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants