Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregation of events from same transaction. #32

Open
cend-gh opened this issue Sep 29, 2023 · 9 comments
Open

Aggregation of events from same transaction. #32

cend-gh opened this issue Sep 29, 2023 · 9 comments

Comments

@cend-gh
Copy link

cend-gh commented Sep 29, 2023

If a transaction is made, e.g. put 3 key changes at once, then its not possible via a watcher to get this changes at once. Therefore, it is not possible to determine when related changes have been fully received.

Each event of the same response is processed by its own, a watcher is not able to detect the end of a transaction this way.
watcher.py:

 for event in response.events:
        await self._process_callback(
            watcher_callback,
            rtypes.Event(
                event.type,
                event.kv,
                event.prev_kv if watcher_callback.prev_kv else None,
            ),
        )
@martyanov
Copy link
Owner

@cend-gh thanks for the report! What do you suggest?

@martyanov
Copy link
Owner

martyanov commented Oct 3, 2023

Could you please describe what are you going to achieve from a client perspective?

@martyanov
Copy link
Owner

As far as I know there is no indication in etcd RPC responses that changes somehow belong to the specific transaction, but you always can track revision changes.

@cend-gh
Copy link
Author

cend-gh commented Oct 4, 2023

Have to generate config files for a downstream program. The "sender" put data into ETCD, the "receiver" get signalled about this changes via watches. For some KVs its possible to generate the config without dependencies, for some other KVs they must be processed as a set, otherwise the config is incomplete.
If e.g. someone put 3 KVs that belong together and must be processed at once into the same transaction, i cannot detect as "receiver" when they are received completely.
So, when i do this:

watch = await client.watch_prefix(b'input_config/')

async for event in watch:
    event # process event

After receiving an event, it is not clear whether more events are to be expected or not. The receiver does not know how many KVs are in a transaction or have the same mod_revision.

Non acceptable solution: Wait some time and when nothing else happens or new events have a different mod_revision (last+1), interpret it as "done" and start generating configs.

From ETCD docs:
https://etcd.io/docs/v3.4/learning/api/:

Transaction
For modifications to the key-value store, this means the store’s revision is incremented only once for the transaction and all
events generated by the transaction will have the same revision.

Watch streams
Atomic - a list of events is guaranteed to encompass complete revisions; updates in the same revision over multiple keys will not
be split over several lists of events.

From this i assume, that KVs from same transaction are not split over several WatchResponses.

To avoid break existing user code, a solution might be:

watch = await client.watch_prefix(b'input_config/', aggregated=True)

async for events in watch:
   # get always all events from same response 
   for event in events:
       pass
   # signal downstream tooling to start work

Now, you get all events from same transaction (+ maybe more KV changes with different mod_revision), one can start immediately processing the changes (my usecase).
In general, the information which events are received at once (in same response) should not get lost by aetcd.

@martyanov
Copy link
Owner

I think the easiest thing we could do is to annotate distinct KVs from watch responses with an unique auto incrementing IDs to distinguish watch events, this way all the events from single watch response will be annotated with the same ID.

I'm not aware of etcd internals in respect to how it handles transactional events with huge number of KVs, though.

@cend-gh
Copy link
Author

cend-gh commented Oct 4, 2023

not sure if i understand you, The client code can already distinguish if events belong together via the KeyValue.mod_revision. The only thing what is currently missing is to know when all Events (of a specific watch) with the same mod_revision are received .
Not clear what the need is for a additional ID.

I'm not aware of etcd internals in respect to how it handles transactional events with huge number of KVs, though.

Its a TCP stream, so size is not a problem. The amount of events in a response is also not limited.

https://etcd.io/docs/v3.4/learning/api/

message WatchResponse {
  ResponseHeader header = 1;
  int64 watch_id = 2;
  bool created = 3;
  bool canceled = 4;
  int64 compact_revision = 5;

  repeated mvccpb.Event events = 11;
}

i locally patched it in a simple way (but this breaks client code: 'async for event in watch' vs. async for events in watch'):

aetcd/watcher.py	2023-09-29 10:39:50.805351520 +0200
+++ patches/watcher.py	2023-09-29 10:38:45.190774406 +0200
@@ -148,15 +148,15 @@
             await self.cancel(response.watch_id)
             return
 
-        for event in response.events:
-            await self._process_callback(
-                watcher_callback,
-                rtypes.Event(
-                    event.type,
-                    event.kv,
-                    event.prev_kv if watcher_callback.prev_kv else None,
-                ),
-            )
+        # aggregate all events from same response into one callback call
+        await self._process_callback(
+            watcher_callback,
+            [rtypes.Event(
+                event.type,
+                event.kv,
+                event.prev_kv if watcher_callback.prev_kv else None,
+            ) for event in response.events],
+        )

@martyanov
Copy link
Owner

I've already mentioned revision in one of my previous replies and you're right the revision itself is not enough to distinguish the last event from watch response, that is why we can somehow annotate it to overcome a wait and a sort of manual tracking if there are no more subsequent watch responses. How to implement the API is an open question and PRs are welcome.

repeated is not a stream, it is limited by gRPC message length and available resources. Please see the MR for example https://github.com/etcd-io/etcd/pull/4556/files. I'm not sure how it is handled nowadays.

@cend-gh
Copy link
Author

cend-gh commented Oct 4, 2023

I did a test and packed a large number of KV changes in one transaction, then i got:
aetcd.exceptions.ClientError: grpc: received message larger than max (6266019 vs. 2097152)

so it seems, its no handled at all/wrong usage, with other words the normal case is that all KV changes must fit into a gRPC response

regarding your annotation suggestion, a new is_last field or similar could be helpful:

class Event(_Slotted):
    """Reperesents a watch event."""

    __slots__ = [
        'kind',
        'kv',
        'prev_kv',
        'is_last'
    ]

its not perfect, ''is_last' is a bit irritating for normal use, because it has only a reasonable meaning in the context of changes based on transactions

@martyanov
Copy link
Owner

Agree with you about is_last. The issue needs more thought.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants