Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throttleLatest to Observable #1396

Merged
merged 8 commits into from
Apr 24, 2021
Merged

Conversation

Wosin
Copy link
Contributor

@Wosin Wosin commented Mar 23, 2021

This closes #1385.

This PR introduces throttleLatest as defined in here. PR introduces some tests to verify the correctness of the behaviour implemented.

Comment on lines 117 to 121
out.onNext(lastEvent).syncTryFlatten.syncOnContinue {
isDone = true
out.onComplete()
task.cancel()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why hasValue and shouldEmitNext are @volatile?

This seems to be the only place where something can be written outside of self.synchronized block (if Ack fails to be synchronous) and it might theoretically be possible that the last event gets emitted twice if that block runs in parallel with the one in def run since the isDone = true here might not be propagated to the thread doing def run. Otherwise, JSR-133 should guarantee memory barriers when entering and exiting synchronized blocks.

The simple fix I can think of is to just do

if (!isDone) {
  isDone = true
  ... rest of the branching code ...
}

to both tell the def run to stop itself and keep the write in synchronized block and not in a Future continuation so everything can see it.

I'm going to tag @alexandru so he can tell me that I'm wrong there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh! I think it doesn't anymore, I've added that when I was thinking about synchronization. But I will verify this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right @oleg-py and I think we could even do:

if (!isDone) { 
  isDone = true 
  task.cancel()
  ... rest of the branching code ...
}

Comment on lines 36 to 39
val o = Observable
.intervalAtFixedRate(2.second)
.take(sourceCount.toLong)
.throttleLatest(1.second, true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the rate of output is at 2 seconds and you're throttling by 1 second after first element (as per RxJava doc you linked), isn't that a noop?

Copy link
Contributor Author

@Wosin Wosin Mar 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, probably true. I will think about something, but as far as I know we can't control the exact elements that are going to be throttled in this test, same as in SampleOnceSuite.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to do something like

Observable(1).delayOnNext(200.millis) ++ Observable(2).delayOnNext(100.millis) ++ ...

to create a predictable chain.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also concerned about Sample two lines below, as that is literally saying it's not dropping anything from the input (2nd parameter of sample is expected output size). The purpose of throttling is to drop some so we should check this if possible. I think ThrottleFirstSuite has an example where it drops half.

Copy link
Contributor Author

@Wosin Wosin Mar 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to do something like

Observable(1).delayOnNext(200.millis) ++ Observable(2).delayOnNext(100.millis) ++ ...

to create a predictable chain.

Not sure what's the difference between that and just doing the intervalAtFixedRate to be honest :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something like that can be useful if we would like to have a specific test case in a test outside BaseOperatorSuite, e.g. ReactiveX example

Copy link
Collaborator

@oleg-py oleg-py left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are few suspicious spots. I'd also like to see test that shows exactly which elements have been retained when using a producer that's significantly more frequent than the throttle interval.

def createObservable(sourceCount: Int) = Some {
if (sourceCount == 1) {
val o = Observable.now(100L).delayExecution(500.millis).throttleLatest(1.second, true)
Sample(o, 1, 100, 500.millis, 1.second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please test with something with more than one element in the output :D That might prove that emitLast handling is correct but does very little to actually prove that throttling is working as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we are tsting with one element on the output here :) I think createObservable gets called from outside and we pass the sourceCount fromBaseOperatorSuite, so that is randomized.
I've added this sourceCount ==1 check because I've noticed that in several suites, but now that I've started chcecking I think it's not needed :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Disregard that line comment but please adjust your additional tests to have more than 1-2 elements :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, sounds reasonable I will add few more tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could still test cases from SampleOnceSuite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, gonna take a look at them.

Copy link
Collaborator

@Avasil Avasil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the PR @Wosin and sorry for the delay with review

Comment on lines 117 to 121
out.onNext(lastEvent).syncTryFlatten.syncOnContinue {
isDone = true
out.onComplete()
task.cancel()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right @oleg-py and I think we could even do:

if (!isDone) { 
  isDone = true 
  task.cancel()
  ... rest of the branching code ...
}

private[this] var lastEvent: A = _
@volatile private[this] var hasValue = false
@volatile private[this] var shouldEmitNext = true
private[this] var lastAck: Future[Ack] = _
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to be unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will remove that.

override def onComplete(): Unit = self.synchronized {
if (!isDone) {
if (emitLast && hasValue) {
out.onNext(lastEvent).syncTryFlatten.syncOnContinue {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed that the last onNext has returned?

It seems like shouldEmitNext handles back-pressure for onNext but I'm not so sure for onComplete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixd :)

def createObservable(sourceCount: Int) = Some {
if (sourceCount == 1) {
val o = Observable.now(100L).delayExecution(500.millis).throttleLatest(1.second, true)
Sample(o, 1, 100, 500.millis, 1.second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could still test cases from SampleOnceSuite

Comment on lines 36 to 39
val o = Observable
.intervalAtFixedRate(2.second)
.take(sourceCount.toLong)
.throttleLatest(1.second, true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something like that can be useful if we would like to have a specific test case in a test outside BaseOperatorSuite, e.g. ReactiveX example

@Wosin Wosin requested review from oleg-py and Avasil April 15, 2021 20:59
* no matter if interval has passed or not
*/
final def throttleLatest(period: FiniteDuration, emitLast: Boolean): Observable[A] =
new ThrottleLatestObservable[A](self, period, emitLast)
/** Emit the most recent items emitted by the source within
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Wosin shouldn't be a line break between the two methods? :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which methods do You mean ?:P

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean line 3375 and 3376, the end of throttleLatest and the scaladoc for sample.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah You are right ofc. I thought scalafmt should get that.

Copy link
Collaborator

@Avasil Avasil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM 👍

Copy link
Collaborator

@oleg-py oleg-py left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, looks good 👍

@Avasil Avasil merged commit 02ee9b9 into monix:series/3.x Apr 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Observable.throttleLatest support
5 participants