Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule the first element in intervals asynchronously #1186

Merged
merged 6 commits into from
May 31, 2020

Conversation

ghostbuster91
Copy link
Contributor

This fixes #1185

@ghostbuster91
Copy link
Contributor Author

There is a problem with one test - monix.reactive.internal.operators.SwitchMapSuite it passes on JVM but fails on JS and I have no idea why.

@Avasil
Copy link
Collaborator

Avasil commented May 19, 2020

Thanks for PR, I will investigate SwitchMapSuite issue

@Avasil
Copy link
Collaborator

Avasil commented May 19, 2020

The test runs

Observable.interval(mainPeriod).take(sourceCount).endWithError(ex)
    .switchMap(i => Observable.interval(1.second))
    .bufferTimed(mainPeriod)
    .map(_.sum)

and it seems like there is a race condition - onError is called on switchMap before the onNext on the last child in switchMap due to extra async boundary. I think it also happens on JVM, it's just more likely on JS.

Anyway, I wouldn't consider it a bug.

I'm not sure what's the best way to fix the test, maybe change

createObservableEndingInError(Observable.interval(mainPeriod).take(sourceCount)), ex)

to

createObservableEndingInError(Observable.interval(mainPeriod).take(sourceCount).doOnComplete(Task.shift), ex)

Hopefully it will negate the change and won't be flaky :D

else
task := s.scheduleOnce(initialDelay.length, initialDelay.unit, runnable)

task := s.scheduleOnce(initialDelay.length, initialDelay.unit, runnable)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we could do a small optimization and call

s.execute(runnable) in case of initialDelay.length <= 0.

It makes the following test fail:

  test("issue #468 - concat is cancellable") { implicit sc =>
    var items = 0

    val a = Observable.now(1L)
    val b = Observable.interval(1.second)
    val c = (a ++ b).doOnNextF(_ => Task { items += 1 })
    val d = c.take(10).subscribe()

    assert(items > 0, "items > 0")
    assert(sc.state.tasks.nonEmpty, "tasks.nonEmpty")

    d.cancel()
    assert(sc.state.tasks.isEmpty, "tasks.isEmpty")
  }

But I believe it is because the test needs to be adjusted.

Before the change it processes a then immediately the first element from b
Now it processes a then schedules b which is waiting to be executed so it's stuck in tasks queue.
In both cases the b is cancelled.

I think in this case it would be OK to do s.tick(); d.cancel() in the test

WDYT? Any thoughts @alexandru ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.execute(runnable) in case of initialDelay.length <= 0.

I think that optimization should be done in the Scheduler implementations, not here. We could double check if it is, but I remember the optimization is being applied.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, doing the optimization here as well is fine too, I personally don't mind it.

@alexandru
Copy link
Member

alexandru commented May 26, 2020

That switchMap test is so annoying. It's probably not your fault. Or the switchMap's implementation.

@Avasil
Copy link
Collaborator

Avasil commented May 26, 2020

That switchMap test is so annoying. It's probably not your fault. Or the switchMap's implementation.

Another idea to fix it: Observable.eval(firstElem) ++ interval with initialDelay

@ghostbuster91
Copy link
Contributor Author

ghostbuster91 commented May 28, 2020

That switchMap test is so annoying. It's probably not your fault. Or the switchMap's implementation.

Another idea to fix it: Observable.eval(firstElem) ++ interval with initialDelay

I tried with

Observable.eval(1L) ++ 
  Observable.intervalWithFixedDelay(mainPeriod, mainPeriod).take(sourceCount - 1), ex)

and run it couple of times locally, unfortunately it still fails sometimes.

@Avasil
Copy link
Collaborator

Avasil commented May 30, 2020

I have forgotten the problem was with the interval inside switchMap - I've pushed a fix to your branch - hopefully it will pass consistently. 😅 I've ran it 10/10 times locally so far

@Avasil Avasil merged commit 46059d1 into monix:master May 31, 2020
@Avasil
Copy link
Collaborator

Avasil commented May 31, 2020

It seems to fix the test, thank you for the contribution!

@ghostbuster91
Copy link
Contributor Author

Awesome, thanks for your invaluable help! :)

@ghostbuster91 ghostbuster91 deleted the schedule-intervaly-async branch June 1, 2020 08:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Schedule first tick in Observable.intervalAtFixed rate asynchronously
3 participants