Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Observable.whileBusyAggregateEvents #1320

Merged
merged 5 commits into from
Nov 28, 2020

Conversation

lukestephenson
Copy link
Contributor

This is useful for when a downstream consumer of events is slower than the upstream consumer
and events can be aggregated.

Note the implementation of the operator is largely based on https://github.com/monix/monix/blob/5c24f19ea805d5223c9e38961703aa955d691fdd/monix-reactive/shared/src/main/scala/monix/reactive/internal/operators/BufferWithSelectorObservable.scala

Related issue: #1319

I'd appreciate any feedback!

@larsrh
Copy link
Contributor

larsrh commented Nov 18, 2020

btw, build fails because of some changes in GitHub Actions. #1321 should fix it.

This is useful for when a downstream consumer of events is slower than the upstream consumer
and events can be aggregated.
@lukestephenson
Copy link
Contributor Author

Thanks @larsrh . Just rebased

Copy link
Collaborator

@Avasil Avasil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, that's a nice operator!

Very well done, scala docs are great!

I've left some general comments.
I'm also wondering if we could relax synchronization a bit but I'll have to look a bit more carefully into it.

if (downstreamIsDone) Stop
else {
if (!pendingAck) {
val downstreamAck = downstream.onNext(seed(elem))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually assume that user-provided methods could throw an exception and we should protect against that. There is an example in StateActionObservable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed and enabled the tests to verify this.

}
else {
pendingAck = true
downstreamAck.map { ack =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use onComplete instead of a map to handle Failure case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched this to use onComplete (as well as the instance in emitAggregated

if (!downstreamIsDone) {
lastAck = downstream.onNext(agg)

lastAck.map { _ =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we ignore Stop. We could also look into using some of Ack extensions, e.g. syncOnContinue to prevent redundant async boundaries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

private def emitAggregated(): Unit = {
upstreamSubscriber.synchronized {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some redundancy in synchronized calls to this method - I think we could remove synchronized here and make sure each caller synchronizes which is already done in some places like onComplete and lastAck.map in this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed. It was redundant

def onComplete(): Unit =
// Can't emit the aggregated element immediately as the previous `onNext` may not yet have been acked.
upstreamSubscriber.synchronized {
lastAck.map {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use lastAck.syncOnContinue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed like a good suggestion, but when I tried changing to make use of syncOnContinue some of my tests started to break (the last element which should have been emitted here wasn't received).

@lukestephenson
Copy link
Contributor Author

@Avasil Thanks for reviewing. I've addressed the feedback, but note I didn't have success using syncOnContinue

Copy link
Collaborator

@Avasil Avasil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you:)

@Avasil Avasil marked this pull request as ready for review November 28, 2020 19:15
@Avasil Avasil merged commit 03db44d into monix:series/3.x Nov 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants