Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for bufferUntil (or windowUntil) with a predicate and a timeout #821

Closed
youngm opened this issue Aug 28, 2017 · 8 comments
Closed
Labels
status/declined We feel we shouldn't currently apply this change/suggestion status/has-workaround This has a known workaround described type/enhancement A general enhancement
Milestone

Comments

@youngm
Copy link

youngm commented Aug 28, 2017

Desired behavior

I have a use case where I'd like to break a Flux up into chunks using a Predicate like bufferUntil. But, if a new buffer is started but not terminated before a given timeout then go ahead and cut the buffer where it is at and continue processing.

Chat log on subject:

Mike Youngstrom @youngm 13:26
Hi all. I'm a bit of a Reactor Noob. If this is the wrong place to ask user questions LMK. I'd like to bufferUntil with a timeout. Any ideas on the best way to go about that?

peter royal @osi 13:28
this is a place to ask questions. you want to buffer items into a collection until a timeout is reached or a Predicate matches for a data item?

Mike Youngstrom @youngm 13:28
Correct

peter royal @osi 13:29
you might need to use bufferWhen

Mike Youngstrom @youngm 13:31
Ok, So, I have an example here:

peter royal @osi 13:31
no, i don't think you can do it with bufferWhen

Stephane Maldini @smaldini 13:31
nope you can't
you might need more reactivefu for this

Mike Youngstrom @youngm 13:31
That's what I was afraid of
Flux orig = Flux.generate(generator -> {
try {
long currentTimeMillis = System.currentTimeMillis();
if(currentTimeMillis % 2 == 0) {
Thread.sleep(10000);
} else {
Thread.sleep(1000);
}
generator.next(currentTimeMillis);
} catch(Exception e) {
e.printStackTrace();
}

    });
    orig.log().bufferUntil((value) -> {
        return value % 2 == 0;
    }).log().subscribe();

I'd like to only ever result in buffers with 1 item.

Stephane Maldini @smaldini 13:33
be careful with blocking call in callbacks
you might need to use subscribeOn() right after generate to be sure you don't block the flows
other than that its possible but slightly convoluted to do this :

Mike Youngstrom @youngm 13:38
@smaldini yeah, I'm still trying to get a grasp on those kinds of rules. In my real use case there won't be anything blocking. Just trying to simulate some time between messages.

peter royal @osi 13:38
use delayElements to simulate a delay 😀
@youngm there's a built-in buffer by time or max buffer size

Mike Youngstrom @youngm 13:40
Right, I saw those. But, I'd like to break on a predicate when I can and only break on timeout after I don't get anything new after X time.
That'll be a fun exercise for my Noobness to rewrite my test using delayElements.
I can implement max message breaking in the predicate. But the tricky thing is breaking on the lack of a message.

Stephane Maldini @smaldini 13:42
orig.publish(f ->
f.bufferWhen(f.filter(match -> match % 2 == 0), match ->
Mono.delay(Duration.ofSeconds())
)
).subscribe();
just for the context
publish allows to dedup a same source of data
bufferWhen will be used to both created the arrays and also as a condition to generates timeout

peter royal @osi 13:43
won't that open a new bucket on every item in orig ?

Stephane Maldini @smaldini 13:43
mhhh no actually it will create as many boundaries as f data damn
yes

peter royal @osi 13:44
you need to filter f as the first param to bufferWhen to sync with the match?
ah, but then you won't see every item and be able to do the closeSelector the same

Stephane Maldini @smaldini 13:45
you need at least 1 initial timeout too
tricky
it might come down to use a scoped processor
annoying

Stephane Maldini @smaldini 13:53
or just with window maybe

Mike Youngstrom @youngm 13:53
I don't think it needs to be buffer.
If window makes it easier.

Stephane Maldini @smaldini 13:54
orig.windowUntil(predicate).concatMap(w -> w.take(Duration).collectlist())
at least you cut the problem in many with predicate, starting with an initial window
and apply for each of them an individual max duration
new matched predicate will emit new window automatically , so you only need the max duration

peter royal @osi 13:56
that could drop events in a window? but that might be ok? a sequence of:
non-match, non-match, , non-match, match
the last non-match would be dropped?

Stephane Maldini @smaldini 13:56
no it will just reroute in a new window
all the data will be visible in the sum of all windows

Mike Youngstrom @youngm 13:57
K, it will take some time for me to digest and see if that will work for my real problem.

peter royal @osi 13:57
ah, nice. i didn't realize cancelling a window pushed the events into the next window

Stephane Maldini @smaldini 13:58
now you say that we had some issues with that in the past let me check

peter royal @osi 14:02
if i'm reading FluxWindowPredicate right, cancelling a window propagates upstream

Stephane Maldini @smaldini 14:03
if both main and child are cancelled
but if not it will just do nothing
annoying
i thought we had that fixed

peter royal @osi 14:04
ah, i went to the wrong cancel impl, yes, you're correct.

Stephane Maldini @smaldini 14:04
we used to have in 3.0 window() argument less fully based on cancel
i was pretty sure we moved away from it because it was unecessary in other scenarios
@youngm can you open an issue for now so we review that asap before RC1

@youngm
Copy link
Author

youngm commented Aug 28, 2017

In case it is significant my use case would be using bufferUntil with cutBefore=true

@simonbasle simonbasle modified the milestones: 3.1.0.RC1, 3.1.0.RELEASE Aug 30, 2017
@simonbasle simonbasle removed this from the 3.1.0.RELEASE milestone Sep 20, 2017
@smaldini smaldini added this to the 3.2.0.RELEASE milestone Oct 9, 2017
@simonbasle simonbasle added type/enhancement A general enhancement and removed stretch goal type/enhancement A general enhancement labels Apr 5, 2018
@simonbasle simonbasle modified the milestones: 3.2.0.RELEASE, [backlog] Apr 5, 2018
@DareUrDream
Copy link

@simonbasle When could this enhancement be expected ?? I have a similar use case. I would like to mark my buffer to be marked complete either based on a predicate or when the timeout is reached

@nonsoiwu
Copy link

I came across a similar issue and solved it with using a custom Collection implementation that overrides size() while using bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier).

One implementation could look like:

public class PredicateList<T> extends ArrayList<T> {
     private final Predicate<T> pred;
     private boolean complete = false;

     public PredicateList(Predicate<T> pred) {
          this.pred = pred;
     }
     
     @Override
     public boolean add(T obj) {
          if (!complete) {
               complete = pred.test(obj);
          }
          return super.add(obj);
     }
     
     @Override
     public int size() {
          return complete ? 1 : 0;
     }
     
     public int realSize() {
          return super.size();
     }
}

Then it would be used as so:

...
     .bufferTimeout(1, someDuration, () -> new PredicateList<>(somePredicate)
...

I settled on this work-around because it doesn't require any more "reactivefu" (as @smaldini put it) and takes advantage of how Flux natively handles buffers.

Hope this helps!

@simonbasle simonbasle added status/declined We feel we shouldn't currently apply this change/suggestion status/has-workaround This has a known workaround described labels Jun 9, 2022
@simonbasle
Copy link
Member

closing this one as a workaround has been exposed and it doesn't look like we'll have time to work on that any time soon (given the complexity of implementing buffer/window operators as soon as you introduce timeouts)

@DareUrDream
Copy link

DareUrDream commented Jun 9, 2022 via email

@simonbasle simonbasle closed this as not planned Won't fix, can't repro, duplicate, stale Jun 9, 2022
@Harepitlord
Copy link

Harepitlord commented Jul 18, 2023

@simonbasle, I have a similar usecase where I required to combine Time Interval and Predicate based buffering but the workaround doesn't seem to work as buffertimeout has a internal event counter which sends the buffer even before the predicate is satisfied

@OlegDokuka
Copy link
Contributor

@Harepitlord can you provide an example which shows it does not work? Also, once you have a reproducer you can open a new issues.

Thanks

@Harepitlord
Copy link

Hi @OlegDokuka, I had a use case where the events would be emitted from a sink(SinkManyUnicast) and I would process the events using a map, transform and other transformative functions on the sink.getFlux() and I had a scenario where I had to cut the flux(window) if incoming events satisfy a given predicate or else I need to cut the flux based on time as I can't wait forever like a timeout.

I tried out different methods and combinations like combining bufferUntil and bufferTimeout and using above mentioned workaround and modifying the workaround but it didn't work.

But combining windowUntil and buffer(Duration) worked.

code:
flux.windowUntil(Predicate) .concatMap(window->window.buffer(Duration))
This also opens up the combination of Predicate, Timeout and Event Count

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/declined We feel we shouldn't currently apply this change/suggestion status/has-workaround This has a known workaround described type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

7 participants