Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncEventQueue (AsyncEventBus replacement). #275

Merged
merged 23 commits into from
Jun 15, 2022
Merged

AsyncEventQueue (AsyncEventBus replacement). #275

merged 23 commits into from
Jun 15, 2022

Conversation

cheatfate
Copy link
Collaborator

Because its impossible to achieve goals which was initially set for AsyncEventBus. So this new structure will become as replacement for AsyncEventBus.

Copy link
Member

@arnetheduck arnetheduck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, as I see it, the principal difference between AsyncEventBus and AsyncEventQueue is that the former gets a single piece of data to users whereas the latter gets many.

Both are useful - ie it's often the case that one wants to notify some listeners about a change, but only the latest version of the change is interesting (ie a counter that changed and has to be displayed on a screen - if it changed many times before it's displayed, the latest value should be "read" in the event - is that the current semantic of AsyncEventBus? If yes, I'd think we should keep both primitives rather than this being a replacement.

In terms of this queue, we'll need the option to set bounds on it so that we can deal with slow subscribers (by kicking them out, more or less).

chronos/asyncsync.nim Outdated Show resolved Hide resolved
var retFuture = newFuture[void]("AsyncEventQueue.closeWait()")
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be a cancel?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? to raise CancelledError from closeWait() procedure?
The idea here is to ensure that all event consumers will receive notification about that AsyncEventQueue is get closed, and because all this callbacks get schedule inside ab.close() call, it means that continuation procedure will be scheduled after all the consumer notification callbacks will be already processed.


let length = len(ab.queue) + ab.offset
doAssert(length >= ab.readers[index].offset)
if length == ab.readers[index].offset:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all this offset stuff is to avoid copying the list of readers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, offset indicates from which point in the queue you should start reading events.

@cheatfate
Copy link
Collaborator Author

AsyncEventBus is not flowed only when producer emits event with emitWait(). But this means that in our use-case processing queue could be blocked until consumer will not receive event with waitEvent().

AsyncEventBus issue happens when producer emits events with emit(). Inside emit() consumers list is notified with first event and this consumers list is got emptied. So until consumer will not be scheduled all calls to emit() just discarding events...

So as AsyncEventBus it does not provide the core functionality, and its why it should be deprecated/removed.

@cheatfate
Copy link
Collaborator Author

Because every consumer has its own offset field, which tracks its position in the shared queue, its unlikely to be happened in such case. But on other side there could be very slow event's emitter.

chronos/asyncsync.nim Outdated Show resolved Hide resolved
@cheatfate cheatfate merged commit f403b06 into master Jun 15, 2022
@cheatfate cheatfate deleted the eventqueue branch June 15, 2022 21:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants