-
Notifications
You must be signed in to change notification settings - Fork 525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Python] Fix duplicate event on drain callback #3119
Conversation
Also: * Exit python wrapper following a termination signal. * Await a coroutine if returned by drain or termination callback (relates to nuclio/nuclio-sdk-py#60). [NUC-119](https://jira.iguazeng.com/browse/NUC-119)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking really good!
I'm not convinced this will fix all of the event duplications issues (as we discussed, there are other possible race conditions that can cause duplications e.g. drain was done and an event is already on its way to the runtime) but this should be handled in a WIP fix by @rokatyy .
Other than that - was this tested somehow?
See our drain/termination tests, and add a testcase with an async termination handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure it does fix the race condition, because the callback is only ever called within the finally clause.
I tested it using the repro function attached to NUC-119.
Edit: actually, I was able to simplify the repro function, because it's no longer necessary to commit the offsets from another thread:
from nuclio_sdk import QualifiedOffset
import threading
import asyncio
def init_context(context):
async def drain_callback():
context.logger.info("Drain callback called")
for qualified_offset in context.qualified_offsets.values():
context.logger.info(f"111 Committing offset: worker={context.worker_id}, topic={qualified_offset.topic}, partition={qualified_offset.partition}, offset={qualified_offset.offset}")
await context.platform.explicit_ack(qualified_offset)
context.qualified_offsets = {}
context.shard_dict = {}
context.logger.info("Drain callback done")
context.qualified_offsets = {}
context.platform.set_drain_callback(drain_callback)
context.shard_dict = {}
async def handler(context, event):
if event.shard_id not in context.shard_dict:
context.shard_dict[event.shard_id] = True
context.logger.info(f"111 First event: worker={context.worker_id}, topic={event.path}, shard={event.shard_id}, offset={event.offset}")
context.qualified_offsets[(event.path, event.shard_id)] = QualifiedOffset(event.path, event.shard_id, event.offset)
await asyncio.sleep(0.01)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR indeed fixes "event duplication", because it doesn't allow draining happen on context switch. But we definitely still need to cover another issue with processing events after draining.
Not sure I understand the sense of adding return
statement to handlers. @gtopper could you please clarify the flow where it is useful?
@rokatyy, please see the second bullet in the description regarding your question. It allows for the user callback(s) to be coroutines. Without it, the coroutines won't be awaited (in the finally block). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last minor comment
Co-authored-by: TomerShor <90552140+TomerShor@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀
NUC-119
Also:
explicit_ack()
without having to start a separate thread. E.g.:instead of