Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxJava's PublishSubject equivalent in reactor-core #79

Closed
codependent opened this issue May 3, 2016 · 17 comments
Closed

RxJava's PublishSubject equivalent in reactor-core #79

codependent opened this issue May 3, 2016 · 17 comments
Labels
for/stackoverflow Questions are best asked on SO or Gitter type/enhancement A general enhancement
Milestone

Comments

@codependent
Copy link

codependent commented May 3, 2016

As seen on a Stackoverflow question, apparently there is not a equivalent Publisher in reactor-core.

So far, I haven't been able to migrate this behaviour from RxJava:

  • PublishSubject that observes a real time sensor (using subscribeOn).
  • Several Subscribers that subscribe to that PublishSubject. Since they get real time data they don't want any prefetched values.

This is one of the ways I tried (among others):

        Flux<Float> randomNumberGenerator = Flux.<Float>yield( consumer -> {
            SecureRandom sr = new SecureRandom();
            int i = 1;
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                Emission emission = consumer.emit(sr.nextFloat());
            }
        });
        randomNumberGenerator.log().subscribeOn(Computations.concurrent()).publishOn(Computations.concurrent()).subscribe();    

        Thread.sleep(6000);
        System.out.println("WAKE UP");
        RnApp app = new RnApp("APP");
        RnApp xxx = new RnApp("XXX");
        randomNumberGenerator.subscribe(app);
        randomNumberGenerator.subscribe(xxx);
        Thread.sleep(6000);
        System.out.println("WAKE UP 2"); 
        app.request(5);
        xxx.request(5);

In this case, the subscriber xxx never gets notified.

@smaldini
Copy link
Contributor

smaldini commented May 3, 2016

What about the bounded equivalent : EmitterProcessor then it's up to SignalEmitter to drop or not like how you're doing :

EmitterProcessor<Float> randomNumberGenerator = EmitterProcessor.create()

randomNumberGenerator.log().subscribeOn(Computations.concurrent()).publishOn(Computations.concurrent()).subscribe();    


SignalEmitter<Float> emitter = randomNumberGenerator.connectEmitter();

SecureRandom sr = new SecureRandom();
int i = 1;
while(true){
     try {
             Thread.sleep(1000);
     } catch (Exception e) {
             e.printStackTrace();
     }
     Emission emission = emitter.emit(sr.nextFloat());
}

@smaldini
Copy link
Contributor

smaldini commented May 3, 2016

Also note that unlike Schedulers equivalent in RxJava, we don't have static registries of Schedulers and Computations only create a specific kind of Scheduler with a new thread pool. You should share a ref to it and leave the operators use createWorker when a Subscriber subscribes.

@smaldini smaldini added this to the 2.5.0.M4 milestone May 3, 2016
@smaldini smaldini added type/enhancement A general enhancement for/stackoverflow Questions are best asked on SO or Gitter labels May 3, 2016
@smaldini
Copy link
Contributor

smaldini commented May 3, 2016

Note that yield is now called create.

@codependent
Copy link
Author

@smaldini I take note of the correct way of using Computations.

Regarding the use of EmitterProcessor, the sample code doesn't compile: randomNumberGenerator.log()... can never be reached as is located after a while(true). Shouldn't that loop be placed as a callback somewhere else?

@smaldini
Copy link
Contributor

smaldini commented May 4, 2016

Edited indeed :

@codependent
Copy link
Author

codependent commented May 4, 2016

I'm afraid I still don't see the whole picture :-(

What I am missing is how to subscribe some subscribers to the randomNumberGenerator.

On the one hand I have:

    public class Generator{

        private EmitterProcessor<Float> randomNumberGenerator;

        public void start(){
            randomNumberGenerator = EmitterProcessor.create();
            randomNumberGenerator.log().subscribeOn(Computations.concurrent()).publishOn(Computations.concurrent()).subscribe();    

            SignalEmitter<Float> emitter = randomNumberGenerator.connectEmitter();
            SecureRandom sr = new SecureRandom();
            int i = 1;
            while(true){
                 try {
                         Thread.sleep(1000);
                 } catch (Exception e) {
                         e.printStackTrace();
                 }
                 Emission emission = emitter.emit(sr.nextFloat());
            }
        }

        public EmitterProcessor<Float> getRandomNumberGenerator() {
            return randomNumberGenerator;
        }
    }

On the other:

               ...
        Generator generator = new Generator();
        generator.start();
        Thread.sleep(6000);
        System.out.println("WAKE UP");
        RnApp app = new RnApp("APP");
        RnApp xxx = new RnApp("XXX");
        generator.getRandomNumberGenerator().subscribe(app);
        generator.getRandomNumberGenerator().subscribe(xxx);
        Thread.sleep(6000);
        System.out.println("WAKE UP 2"); 
        app.request(5);
        xxx.request(5);

        Thread.sleep(30000);
        System.out.println("WAKE UP 3");
        app.request(5);
        xxx.request(5);

        latch.await();
    }

The problem here is that generator.start() blocks the thread so RnApp's never subscribe.

This is how I did something similar in RxJava:

    public void testRealTimeSubject() throws InterruptedException{
        CountDownLatch latch = new CountDownLatch(1);

        ConnectableObservable<Integer> observable = Observable.range(1, 1000)
        .doOnCompleted(() -> {
            latch.countDown();
        })
        .subscribeOn(Schedulers.io())
        .publish();

        PublishSubject<Integer> realTimeSubject = PublishSubject.<Integer>create();

        SensorReaderObserver sensor1 = new SensorReaderObserver();
        SensorReaderObserver sensor2 = new SensorReaderObserver();

        observable.subscribe(sensor1);
        observable.subscribe(realTimeSubject);
        observable.connect();
        Thread.sleep(40);
        realTimeSubject.subscribe(sensor2);

        latch.await();

        Assert.assertEquals(sensor1.getValues().size(), 1000);
        Assert.assertNotEquals(sensor2.getValues().size(), 1000);
    }

@artembilan
Copy link

artembilan commented May 4, 2016

The problem here is that generator.start() blocks the thread so RnApp's never subscribe.

Your code in the start() method:

 while(true){
...
}

Indeed, it never will exit!
Invoke that infinite loop from the different Thread:

Computations.single().schedule(() -> {
     while(true){
      ...
     }    
});

@codependent
Copy link
Author

codependent commented May 5, 2016

@artembilan thanks, I just didn't know where to put that loop so that it didn't block the main thread. With the following modifications both RunApp instances receive the messages but, they keep receiving prefreched values:

Generator class:

public class Generator{

    private EmitterProcessor<Float> randomNumberGenerator;
    private Scheduler concurrent = Computations.concurrent();

    public void start(){
        randomNumberGenerator = EmitterProcessor.create();
        randomNumberGenerator.log().subscribeOn(concurrent).publishOn(concurrent).subscribe();    

        Computations.single().schedule( () -> {
            SignalEmitter<Float> emitter = randomNumberGenerator.connectEmitter();
            SecureRandom sr = new SecureRandom();
            int i = 1;
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                Emission emission = emitter.emit(sr.nextFloat());
            }
        });
    }

    public EmitterProcessor<Float> getRandomNumberGenerator() {
        return randomNumberGenerator;
    }
}

Test:

Generator generator = new Generator();
generator.start();
Thread.sleep(6000);
System.out.println("WAKE UP");
RnApp app = new RnApp("APP");
RnApp xxx = new RnApp("XXX");
generator.getRandomNumberGenerator().subscribe(app);
generator.getRandomNumberGenerator().subscribe(xxx);
Thread.sleep(6000);
System.out.println("WAKE UP 2"); 
app.request(5);
xxx.request(5);

Thread.sleep(30000);
System.out.println("WAKE UP 3");
app.request(5);
xxx.request(5);

latch.await();

As you can see in the following logs, everything that is emitted between the subscribe() call and the request() call is cached and delivered later so still it doesn't behave like a PublishSubscriber:

...
08:58:09.790 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.3271616)
WAKE UP
08:58:10.790 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.24437588)
08:58:11.791 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.10515916)
08:58:12.791 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.9924408)
08:58:13.792 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.63165975)
08:58:14.793 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.6097309)
08:58:15.793 [single-1] INFO  reactor.core.publisher.FluxLog -  onNext(0.46791637)
WAKE UP 2
main-APP got ------> 0.24437588
main-APP got ------> 0.10515916
main-APP got ------> 0.9924408
...
main-XXX got ------> 0.10515916
main-XXX got ------> 0.9924408

@artembilan
Copy link

Not sure where is a problem with your code, but for me it works as expected:

    @Test
    public void testPublishSubscribe() throws InterruptedException {
        Scheduler concurrent = Computations.concurrent();

        EmitterProcessor<Long> timeGenerator = EmitterProcessor.create();

        timeGenerator
                .subscribeOn(concurrent)
                .publishOn(concurrent)
                .subscribe(v -> System.out.println("0: " + v));

        SignalEmitter<Long> emitter = timeGenerator.connectEmitter();

        Computations.single().schedule(() -> {

            while (true) {
                try {
                    Thread.sleep(100);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("NEW VALUE");
                emitter.emit(System.currentTimeMillis());
            }

        });

        Thread.sleep(100);

        System.out.println("WAKE UP");

        timeGenerator.subscribe(v -> System.out.println("1: " + v));
        timeGenerator.subscribe(v -> System.out.println("2: " + v));

        Thread.sleep(100);

        System.out.println("ONE MORE SUBSCRIBER");

        timeGenerator.subscribe(v -> System.out.println("3: " + v));

        Thread.sleep(2000);
    }

The logs look like:

NEW VALUE
0: 1462472571887
NEW VALUE
0: 1462472571988
NEW VALUE
0: 1462472572088
NEW VALUE
0: 1462472572217
WAKE UP
NEW VALUE
1: 1462472572317
2: 1462472572317
0: 1462472572317
NEW VALUE
1: 1462472572417
0: 1462472572417
2: 1462472572417
NEW VALUE
0: 1462472572523
1: 1462472572523
2: 1462472572523
NEW VALUE
1: 1462472572625
2: 1462472572625
0: 1462472572625
NEW VALUE
1: 1462472572725
0: 1462472572725
2: 1462472572725
ONE MORE SUBSCRIBER
NEW VALUE
1: 1462472572825
0: 1462472572825
2: 1462472572825
3: 1462472572825
NEW VALUE
1: 1462472572925
0: 1462472572925
2: 1462472572925
3: 1462472572925

So, I really see a new value on each emission and all new subscribers don't receive any old (cached ?) values. Plus all subscribers received the same value.
IMO the PublishSubject goal is achieved.

@smaldini
Copy link
Contributor

smaldini commented May 6, 2016

Just added a buffer-less Processor called DirectProcessor see 9b381c8. @akarnokd can confirm but I think it's the direct equivalent.
So this works with unbounded demand and what you are seeing is correct for the RS semantics as subscribers will see all event from the time on they have subscribed. It will not be dropping automatically when demand is not enough.

@codependent
Copy link
Author

codependent commented May 6, 2016

@artembilan You subscribe with a consumer that prints every value that is emitted since the moment of the subscription:

timeGenerator.subscribe(v -> System.out.println("1: " + v));
timeGenerator.subscribe(v -> System.out.println("2: " + v));

In my case I subscribe with a Subscriber:

public class RnApp implements Subscriber<Float>{
    ...
    @Override
    public void onNext(Float f) {
        System.out.println(Thread.currentThread().getName()+"-"+name+ " got ------> "+f);
    }
}
...
RnApp app = new RnApp("APP");
RnApp xxx = new RnApp("XXX");
generator.getRandomNumberGenerator().subscribe(app);
generator.getRandomNumberGenerator().subscribe(xxx);

Thread.sleep(6000);

System.out.println("WAKE UP 2"); 
app.request(5);
xxx.request(5);

@smaldini What got me mixed up is that I was expecting that the subscriber would get the values from the moment that it invoked request(n). However, as you say, every value emitted between the moment of the subscription and the invocation of request(n) is buffered and delivered when the subscriber actually calls request(n). Talking from the complete ignorance, couldn't this be a problem? I mean, somewhere there's a buffer of values that won't be delivered until request(n) invocation. What if it is never called?

@smaldini
Copy link
Contributor

smaldini commented May 6, 2016

@codependent I think that's the same problem hit by operators such as cache etc and the classic solution is usually to attach a simple onBackpressureDrop() right after. The emitter is bounded so if you onNext to it directly, first you need to know what you're doing vs using connectEmitter() and the emit methods. Then the onNext will spin-wait if the buffer is full until one room frees up downstream. It's still better than uncontroller buffer grow.

Now you want to deal with individual "suspect" behavior from a subscriber perspective not the processor itself, e.g. : timeout(), onBackpressureDrop... You can even wrap the chain as single Processor if you need it :

FluxProcessor<X, X> droppingProcessor = FluxProcessor.wrap(emitterProcessor, emitterProcessor.onBackpressureDrop());

Or just return generator.onBackpressureDrop() as a consuming-side Flux.

@codependent
Copy link
Author

@smaldini That's it, with the additional FluxProcessor the subscribers won't get buffered values,

Thank you everyone for looking into this.

Final code:

     @Test
     public void testPublishSubscribe() throws InterruptedException {

        Scheduler concurrent = Computations.concurrent();

        EmitterProcessor<Float> timeGenerator = EmitterProcessor.create();
        timeGenerator
            .subscribeOn(concurrent)
            .publishOn(concurrent);

        SignalEmitter<Float> emitter = timeGenerator.connectEmitter();

        Computations.single().schedule(() -> {
            SecureRandom sr = new SecureRandom();
            while (true) {
                try {
                    Thread.sleep(1000);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                float random = sr.nextFloat();
                System.out.println("NEW VALUE ---------------"+random);
                emitter.emit(random);
            }
        });

        FluxProcessor<Float,Float> droppingProcessor = FluxProcessor.<Float,Float>wrap(timeGenerator, timeGenerator.onBackpressureDrop());

        Thread.sleep(4000);

        System.out.println("WAKE UP");

        RnApp aaa = new RnApp("AAA");
        RnApp zzz = new RnApp("ZZZ");
        droppingProcessor.subscribe(aaa);
        droppingProcessor.subscribe(zzz);

        Thread.sleep(4000);

        System.out.println("REQUESTING 5");
        aaa.request(5);
        zzz.request(5);

And the log sequence:

NEW VALUE ---------------0.7953491
NEW VALUE ---------------0.09977782
NEW VALUE ---------------0.15835232
WAKE UP
NEW VALUE ---------------0.28263623
NEW VALUE ---------------0.513252
NEW VALUE ---------------0.8830035
NEW VALUE ---------------0.31092662
REQUESTING 5
NEW VALUE ---------------0.42508847
single-1-AAA got ------> 0.42508847
single-1-ZZZ got ------> 0.42508847

@smaldini
Copy link
Contributor

smaldini commented May 6, 2016

@codependent if you intent to drop I advise the new DirectProcessor, which internally is simpler and buffer-less but fails if downstream demand != Long.Max. Because you use an onBackpressureXxx operator right after we know this is only going to be the case.

@codependent
Copy link
Author

codependent commented May 6, 2016

@smaldini Since the requests won't happen immediately it fails and doesn't recover from it. In the end the subscribers never get anything.

Maybe I'm not using it right:

     @Test
     public void testPublishSubscribe() throws InterruptedException {

        Scheduler concurrent = Computations.concurrent();

        DirectProcessor<Float> timeGenerator = DirectProcessor.create();
        timeGenerator
            .subscribeOn(concurrent)
            .publishOn(concurrent);

        SignalEmitter<Float> emitter = timeGenerator.connectEmitter();

        Computations.single().schedule(() -> {
            SecureRandom sr = new SecureRandom();
            while (true) {
                try {
                    Thread.sleep(1000);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                float random = sr.nextFloat();
                System.out.println("NEW VALUE ---------------"+random);
                emitter.emit(random);
            }
        });

        Thread.sleep(4000);

        System.out.println("WAKE UP");

        RnApp aaa = new RnApp("AAA");
        RnApp zzz = new RnApp("ZZZ");
        timeGenerator.subscribe(aaa);
        timeGenerator.subscribe(zzz);

        Thread.sleep(4000);

        System.out.println("REQUESTING 5");
        aaa.request(5);
        zzz.request(5);

        Thread.sleep(4000);

After the faifure the subscribers request five values but don't get anything:

NEW VALUE ---------------0.6337121
NEW VALUE ---------------0.998461
NEW VALUE ---------------0.6341994
WAKE UP
NEW VALUE ---------------0.2999627
java.lang.IllegalStateException: Can't deliver value due to lack of requests
    at reactor.core.publisher.DirectProcessor$DirectProcessorSubscription.onNext(DirectProcessor.java:314)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:100)
    at reactor.core.subscriber.SubmissionEmitter.emit(SubmissionEmitter.java:150)
    at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:258)
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:918)
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:882)
    at reactor.core.publisher.TopicProcessor$TopicSubscriberLoop.run(TopicProcessor.java:877)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
java.lang.IllegalStateException: Can't deliver value due to lack of requests
    at reactor.core.publisher.DirectProcessor$DirectProcessorSubscription.onNext(DirectProcessor.java:314)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:100)
    at reactor.core.subscriber.SubmissionEmitter.emit(SubmissionEmitter.java:150)
    at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:258)
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:918)
    at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:882)
    at reactor.core.publisher.TopicProcessor$TopicSubscriberLoop.run(TopicProcessor.java:877)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
NEW VALUE ---------------0.961773
NEW VALUE ---------------0.08782846
NEW VALUE ---------------0.22994196
REQUESTING 5
NEW VALUE ---------------0.3262939
NEW VALUE ---------------0.14760607
NEW VALUE ---------------0.6409442
NEW VALUE ---------------0.97169876

@smaldini
Copy link
Contributor

smaldini commented May 6, 2016

You would still need onBackpressureDrop as with the other Processor, that forces request Long.Max on subscribe.

@codependent
Copy link
Author

codependent commented May 6, 2016

Got it, changed from
EmitterProcessor<Float> timeGenerator = EmitterProcessor.create();
to
DirectProcessor<Float> timeGenerator = DirectProcessor.create();

keeping the dropping processor:
FluxProcessor<Float,Float> droppingProcessor = FluxProcessor.<Float,Float>wrap(timeGenerator, timeGenerator.onBackpressureDrop());

Works like a charm :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/stackoverflow Questions are best asked on SO or Gitter type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

3 participants