Skip to content

Commit

Permalink
Merge #2943 into 3.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Mar 4, 2022
2 parents ff383ff + 62aa692 commit 36b3d86
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 6 deletions.
4 changes: 2 additions & 2 deletions reactor-core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -162,7 +162,7 @@ task japicmp(type: JapicmpTask) {
// TODO after a .0 release, bump the gradle.properties baseline
// TODO after a .0 release, remove the reactor-core exclusions below if any
classExcludes = [ ]
methodExcludes = [ ]
methodExcludes = [ 'reactor.core.publisher.Sinks$EmitFailureHandler#busyLooping(java.time.Duration)' ]
}

gradle.taskGraph.afterTask { task, state ->
Expand Down
38 changes: 36 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/Sinks.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,6 @@
import java.util.Queue;

import org.reactivestreams.Subscriber;

import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
Expand Down Expand Up @@ -222,6 +221,24 @@ public EmitResult getReason() {
}
}

/**
*
* @author Animesh Chaturvedi
*/
static class OptimisticEmitFailureHandler implements EmitFailureHandler {

private final long deadline;

OptimisticEmitFailureHandler(Duration duration){
this.deadline = System.nanoTime() + duration.toNanos();
}

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) && (System.nanoTime() < this.deadline);
}
}

/**
* A handler supporting the emit API (eg. {@link Many#emitNext(Object, Sinks.EmitFailureHandler)}),
* checking non-successful emission results from underlying {@link Many#tryEmitNext(Object) tryEmit}
Expand All @@ -242,6 +259,23 @@ public interface EmitFailureHandler {
*/
EmitFailureHandler FAIL_FAST = (signalType, emission) -> false;

/**
* Create an {@link EmitFailureHandler} which will busy loop in case of concurrent use
* of the sink ({@link EmitResult#FAIL_NON_SERIALIZED}, up to a deadline.
* The deadline is computed immediately from the current time (construction time)
* + provided {@link Duration}.
* <p>
* As a result there will always be some delay between this computation and the actual first
* use of the handler (at a minimum, the time it takes for the first sink emission attempt).
* Consider this when choosing the {@link Duration}, and probably prefer something above 100ms.
*
* @param duration {@link Duration} for the deadline
* @return an optimistic and bounded busy-looping {@link EmitFailureHandler}
*/
static EmitFailureHandler busyLooping(Duration duration){
return new OptimisticEmitFailureHandler(duration);
}

/**
* Decide whether the emission should be retried, depending on the provided {@link EmitResult}
* and the type of operation that was attempted (represented as a {@link SignalType}).
Expand Down
56 changes: 54 additions & 2 deletions reactor-core/src/test/java/reactor/core/publisher/SinksTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,7 +34,6 @@
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;

import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.StepVerifierOptions;
Expand Down Expand Up @@ -125,6 +124,59 @@ Stream<DynamicContainer> checkSemantics() {
}
}

@Nested
class OptimisticEmitFailureHandlerTest {
@Test
void shouldRetryOptimistically() {
Sinks.One<Object> sink = new InternalOneSinkTest.InternalOneSinkAdapter<Object>() {
final long duration = Duration.ofMillis(1000).toNanos() + System.nanoTime();
@Override
public Sinks.EmitResult tryEmitValue(Object value) {
return System.nanoTime() > duration ? Sinks.EmitResult.OK : Sinks.EmitResult.FAIL_NON_SERIALIZED;
}

@Override
public Sinks.EmitResult tryEmitEmpty() {
throw new IllegalStateException();
}

@Override
public Sinks.EmitResult tryEmitError(Throwable error) {
throw new IllegalStateException();
}
};
assertThatNoException().isThrownBy(() -> {
sink.emitValue("Hello",
Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(1000)));
});
}

@Test
void shouldFailRetryOptimistically() {
Sinks.One<Object> sink = new InternalOneSinkTest.InternalOneSinkAdapter<Object>() {
final long duration = Duration.ofMillis(1000).toNanos() + System.nanoTime();
@Override
public Sinks.EmitResult tryEmitValue(Object value) {
return System.nanoTime() > duration ? Sinks.EmitResult.OK : Sinks.EmitResult.FAIL_NON_SERIALIZED;
}

@Override
public Sinks.EmitResult tryEmitEmpty() {
throw new IllegalStateException();
}

@Override
public Sinks.EmitResult tryEmitError(Throwable error) {
throw new IllegalStateException();
}
};
assertThatExceptionOfType(Sinks.EmissionException.class).isThrownBy(() -> {
sink.emitValue("Hello",
Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
});
}
}

@Nested
class MulticastReplayN {

Expand Down

0 comments on commit 36b3d86

Please sign in to comment.