Skip to content

Commit

Permalink
reworks FluxPublish internals to relay on predictable state machine (#…
Browse files Browse the repository at this point in the history
…3538)

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
  • Loading branch information
OlegDokuka committed Aug 9, 2023
1 parent 5e66d9a commit 4399fc5
Show file tree
Hide file tree
Showing 4 changed files with 496 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;

@BenchmarkMode({Mode.AverageTime})
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class FluxPublishBenchmark {
@Param({"0", "10", "1000", "100000"})
int rangeSize;

Flux<Integer> source;

@Setup(Level.Invocation)
public void setup() {
source = Flux.range(0, rangeSize)
.hide()
.publish()
.autoConnect(Runtime.getRuntime()
.availableProcessors());
}


@State(Scope.Thread)
public static class JmhSubscriber<T> extends CountDownLatch implements CoreSubscriber<T> {

Blackhole blackhole;

Subscription s;

public JmhSubscriber() {
super(1);
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
s.request(Long.MAX_VALUE);
}
}

@Override
public void onNext(T t) {
blackhole.consume(t);
}

@Override
public void onError(Throwable t) {
blackhole.consume(t);
countDown();
}

@Override
public void onComplete() {
countDown();
}
}

@SuppressWarnings("unused")
@Benchmark
@Threads(Threads.MAX)
public Object measureThroughput(Blackhole blackhole, JmhSubscriber<Integer> subscriber) throws InterruptedException {
subscriber.blackhole = blackhole;
source.subscribe(subscriber);
subscriber.await();
return subscriber;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.openjdk.jcstress.infra.results.IIIIII_Result;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
import org.openjdk.jcstress.infra.results.III_Result;
import reactor.core.Disposable;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

Expand Down Expand Up @@ -445,4 +447,50 @@ public void arbiter(IIIIII_Result r) {
r.r6 = subscriber2.onErrorCalls.get();
}
}

// TODO: uncomment me. Proper discard is not supported yet since we dont have stable
// downstream context available all the time. This should be uncommented once we have
// an explicitly passed onDiscard handler
// @JCStressTest
// @Outcome(id = {"10, 1, 0"}, expect = ACCEPTABLE, desc = "all values and completion delivered")
// @Outcome(id = {"10, 0, 1"}, expect = ACCEPTABLE, desc = "some values are delivered some dropped since overflow")
// @State
// public static class ConcurrentDisposeAndProduceStressTest {
//
// final Sinks.Many<Integer> producer = Sinks.unsafe().many().multicast().directAllOrNothing();
//
// final ConnectableFlux<Integer> sharedSource = producer.asFlux().publish(5);
//
// final StressSubscriber<Integer> subscriber = new StressSubscriber<>();
//
// final Disposable disposable;
//
// {
// sharedSource.subscribe(subscriber);
// disposable = sharedSource.connect();
// }
//
// @Actor
// public void dispose() {
// disposable.dispose();
// }
//
// @Actor
// public void emitValues() {
// for (int i = 0; i < 10; i++) {
// if (producer.tryEmitNext(i) != Sinks.EmitResult.OK) {
// Operators.onDiscard(i, subscriber.context);
// }
// }
//
// producer.tryEmitComplete();
// }
//
// @Arbiter
// public void arbiter(III_Result r) {
// r.r1 = subscriber.onNextCalls.get() + subscriber.onNextDiscarded.get();
// r.r2 = subscriber.onCompleteCalls.get();
// r.r3 = subscriber.onErrorCalls.get();
// }
// }
}
Loading

0 comments on commit 4399fc5

Please sign in to comment.