Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zipWith requests data too eagerly #19

Closed
nebhale opened this issue Feb 8, 2016 · 1 comment
Closed

zipWith requests data too eagerly #19

nebhale opened this issue Feb 8, 2016 · 1 comment
Labels
type/enhancement A general enhancement
Milestone

Comments

@nebhale
Copy link
Member

nebhale commented Feb 8, 2016

Currently when using zipWith (and perhaps others) data is always requested at PlatformDependent.XS_BUFFER_SIZE. In some cases, this is too much data. For example, if implementing an exponential back off, You might want to have a Stream.range() concatenated with a Mono.error in order to flag when too many retries have happened. If the number of elements in the range is smaller than XS_BUFFER_SIZE, the error is immediately generated, not when the end of the range is reached. An example of this behavior would look like:

import reactor.core.publisher.Mono;
import reactor.rx.Stream;

import java.util.concurrent.CountDownLatch;

import static java.util.concurrent.TimeUnit.SECONDS;

public final class Test {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Stream
            .interval(0, 1, SECONDS)
            .zipWith(Stream
                .range(0, 5)
                .concatWith(Mono.error(new IllegalStateException("A request too far"))))
            .consume(
                System.out::println,

                (throwable) -> {
                    throwable.printStackTrace();
                    latch.countDown();
                },

                latch::countDown);

        latch.await(); // Should print 0,0, 1,1, 2,2, and 3,3 before printing an IllegalStateException
    }

}

There should be away to ensure that a subscriber can request as much data as they want, but can only request in batches of a certain size allowing producers to specify that size.

@smaldini smaldini added the type/enhancement A general enhancement label Feb 9, 2016
@smaldini smaldini modified the milestone: 2.5.0.RC1 Feb 18, 2016
@smaldini smaldini modified the milestones: 2.5.0.RC1, 2.5.0.M2 Feb 28, 2016
@smaldini
Copy link
Contributor

Fixed with new zipWith / zip signatures

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

2 participants