Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator: Interval #55

Closed
benjchristensen opened this issue Jan 18, 2013 · 14 comments
Closed

Operator: Interval #55

benjchristensen opened this issue Jan 18, 2013 · 14 comments

Comments

@benjchristensen
Copy link
Member

http://msdn.microsoft.com/en-us/library/hh229027(v=vs.103).aspx
http://msdn.microsoft.com/en-us/library/hh228911(v=vs.103).aspx

jmhofer added a commit to jmhofer/RxJava that referenced this issue Apr 8, 2013
@jmhofer jmhofer mentioned this issue Apr 8, 2013
@mairbek
Copy link
Contributor

mairbek commented Apr 14, 2013

It seems like Interval shares logic with the Timer from issue #92.

Interval could be implemented as a Timer call with dueTime == 0.

@jmhofer
Copy link
Contributor

jmhofer commented Apr 15, 2013

Sure, as soon as Timer is implemented or schedulers can do periodic scheduling, the implementation of Interval will get a lot simpler.

@samuelgruetter
Copy link
Contributor

Is the interval operator intended to work if there are several subscribers? It looks as if this was simply forgotten...

@jmhofer
Copy link
Contributor

jmhofer commented Sep 12, 2013

Actually, I'm not sure whether interval itself should or shouldn't support this. - However, you should always be able to use publish/connect. If that doesn't work, then there's definitely something wrong. I can go check that later. I probably didn't think of that when implementing it...

@samuelgruetter
Copy link
Contributor

I just tried it out in C#. This code:

static void Main() {
    var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
    oneNumberPerSecond.Subscribe(
    x => Console.WriteLine("subscriber 1 got " + x)
    );
    oneNumberPerSecond.Subscribe(
    x => Console.WriteLine("subscriber 2 got " + x)
    );
    Console.ReadLine();
}

produces this output:

subscriber 2 got 0
subscriber 1 got 0
subscriber 2 got 1
subscriber 1 got 1
subscriber 2 got 2
subscriber 1 got 2
subscriber 2 got 3
subscriber 1 got 3
subscriber 2 got 4
subscriber 1 got 4

So I think we should update the RxJava version to support multiple subscribers.

@samuelgruetter
Copy link
Contributor

Compare to Java. Code:

public static void main(String[] args) {
    Observable<Long> oneNumberPerSecond = Observable.interval(1, TimeUnit.SECONDS).take(5);
    oneNumberPerSecond.subscribe(new Action1<Long>() {
        public void call(Long x) {
            System.out.println("subscriber 1 got " + x);
        }           
    });
    oneNumberPerSecond.subscribe(new Action1<Long>() {
        public void call(Long x) {
            System.out.println("subscriber 2 got " + x);
        }           
    });     
}

Output:

subscriber 1 got 0
subscriber 2 got 1
subscriber 1 got 2
subscriber 2 got 3
subscriber 1 got 4
subscriber 2 got 5
subscriber 1 got 6
subscriber 2 got 7
subscriber 1 got 8
subscriber 2 got 9

I think this will be used a lot for small examples and so it should be fixed soon. @benjchristensen could you please reopen this issue?

@jmhofer
Copy link
Contributor

jmhofer commented Sep 13, 2013

What does Rx.NET do when you wait a bit before subscribing the second time?

Will it do this?

subscriber 1 got 0
subscriber 1 got 1
subscriber 1 got 2
subscriber 2 got 2
subscriber 1 got 3
subscriber 2 got 3
subscriber 1 got 4
subscriber 2 got 4

Or will it still start at 0 with subscriber 2?

@jmhofer
Copy link
Contributor

jmhofer commented Sep 13, 2013

I added a first test for that use case here: jmhofer@2fe6da7

It fails, as expected from your comment above.

@jmhofer
Copy link
Contributor

jmhofer commented Sep 13, 2013

The Rx Design Guidelines (5.10) say:

As many observable sequences are cold (see cold vs. hot on Channel 9), each subscription will have a separate set of side-effects. Certain situations require that these side-effects occur only once. The Publish operator provides a mechanism to share subscriptions by broadcasting a single subscription to multiple subscribers.

So I guess it would be okay for the second subscriber to always start at 0 too when subscribing to the same observable later (and not using publish/connect)?

@jmhofer
Copy link
Contributor

jmhofer commented Sep 13, 2013

There are probably other operators that are affected by this, too, because multiple subscribers are currently normally not getting tested by the unit tests...

@samuelgruetter
Copy link
Contributor

Here's another example from C#:

    static void Main() {
        var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
        var watch = new Stopwatch();
        watch.Start();

        Thread.Sleep(2200);

        Console.WriteLine("subscriber 1 subscribes at t=" + watch.ElapsedMilliseconds);
        oneNumberPerSecond.Subscribe(
            x => Console.WriteLine("subscriber 1 got " + x + " at t=" + watch.ElapsedMilliseconds)
        );

        Thread.Sleep(1300);

        Console.WriteLine("subscriber 2 subscribes at t=" + watch.ElapsedMilliseconds);
        oneNumberPerSecond.Subscribe(
            x => Console.WriteLine("subscriber 2 got " + x + " at t=" + watch.ElapsedMilliseconds)
        );

        Console.ReadLine();
    }

outputs:

subscriber 1 subscribes at t=2200
subscriber 1 got 0 at t=3322
subscriber 2 subscribes at t=3615
subscriber 1 got 1 at t=4319
subscriber 2 got 0 at t=4642
subscriber 1 got 2 at t=5329
subscriber 2 got 1 at t=5643
subscriber 1 got 3 at t=6331
subscriber 2 got 2 at t=6643
subscriber 1 got 4 at t=7344
subscriber 2 got 3 at t=7655
subscriber 2 got 4 at t=8656

So each subscriber starts at 0.

@jmhofer
Copy link
Contributor

jmhofer commented Sep 13, 2013

Great, thanks. This means that my PR above should fix this.

@benjchristensen
Copy link
Member Author

Yes, every new subscriber should start the Observable from the beginning. I have tried to make sure that's the case everywhere but apparently missed this one.

If an Observable does not want that behavior that is what the various multicast options are for such as publish, replay, cache etc.

@benjchristensen
Copy link
Member Author

Merged in #379 so closing again.

rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants