Skip to content

Commit

Permalink
Merge branch 'release/0.12.1-RC4'
Browse files Browse the repository at this point in the history
  • Loading branch information
robertroeser committed Apr 16, 2019
2 parents 04b82b3 + d835b14 commit 9a831f1
Show file tree
Hide file tree
Showing 121 changed files with 7,479 additions and 1,503 deletions.
12 changes: 8 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ plugins {
id 'com.github.sherter.google-java-format' version '0.7.1' apply false
id 'com.jfrog.artifactory' version '4.7.3' apply false
id 'com.jfrog.bintray' version '1.8.4' apply false
id 'me.champeau.gradle.jmh' version '0.4.7' apply false
id 'io.spring.dependency-management' version '1.0.6.RELEASE' apply false
id 'me.champeau.gradle.jmh' version '0.4.8' apply false
id 'io.spring.dependency-management' version '1.0.7.RELEASE' apply false
id 'io.morethan.jmhreport' version '0.9.0' apply false
}

Expand All @@ -35,7 +35,7 @@ subprojects {
ext['netty.version'] = '4.1.31.Final'
ext['netty-boringssl.version'] = '2.0.18.Final'
ext['hdrhistogram.version'] = '2.1.10'
ext['mockito.version'] = '2.23.0'
ext['mockito.version'] = '2.25.1'
ext['slf4j.version'] = '1.7.25'
ext['jmh.version'] = '1.21'
ext['junit.version'] = '5.1.0'
Expand All @@ -60,8 +60,12 @@ subprojects {
dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}"
dependency "org.assertj:assertj-core:${ext['assertj.version']}"
dependency "org.hdrhistogram:HdrHistogram:${ext['hdrhistogram.version']}"
dependency "org.mockito:mockito-core:${ ext['mockito.version']}"
dependency "org.slf4j:slf4j-api:${ext['slf4j.version']}"

dependencySet(group: 'org.mockito', version: ext['mockito.version']) {
entry 'mockito-junit-jupiter'
entry 'mockito-core'
}

dependencySet(group: 'org.junit.jupiter', version: ext['junit.version']) {
entry 'junit-jupiter-api'
Expand Down
3 changes: 1 addition & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

version=0.12.1-RC2
version=0.12.1-RC4
2 changes: 2 additions & 0 deletions rsocket-core/jmh.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ dependencies {
jmh configurations.implementation
jmh 'org.openjdk.jmh:jmh-core'
jmh 'org.openjdk.jmh:jmh-generator-annprocess'
jmh 'io.projectreactor:reactor-test'
jmh project(':rsocket-transport-local')
}

jmhCompileGeneratedClasses.enabled = false
Expand Down
38 changes: 38 additions & 0 deletions rsocket-core/src/jmh/java/io/rsocket/MaxPerfSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.rsocket;

import java.util.concurrent.CountDownLatch;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;

public class MaxPerfSubscriber implements CoreSubscriber<Payload> {

final CountDownLatch latch = new CountDownLatch(1);
final Blackhole blackhole;

public MaxPerfSubscriber(Blackhole blackhole) {
this.blackhole = blackhole;
}

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Payload payload) {
payload.release();
blackhole.consume(payload);
}

@Override
public void onError(Throwable t) {
blackhole.consume(t);
latch.countDown();
}

@Override
public void onComplete() {
latch.countDown();
}
}
42 changes: 42 additions & 0 deletions rsocket-core/src/jmh/java/io/rsocket/PerfSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.rsocket;

import java.util.concurrent.CountDownLatch;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;

public class PerfSubscriber implements CoreSubscriber<Payload> {

final CountDownLatch latch = new CountDownLatch(1);
final Blackhole blackhole;

Subscription s;

public PerfSubscriber(Blackhole blackhole) {
this.blackhole = blackhole;
}

@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(1);
}

@Override
public void onNext(Payload payload) {
payload.release();
blackhole.consume(payload);
s.request(1);
}

@Override
public void onError(Throwable t) {
blackhole.consume(t);
latch.countDown();
}

@Override
public void onComplete() {
latch.countDown();
}
}
142 changes: 142 additions & 0 deletions rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package io.rsocket;

import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.local.LocalClientTransport;
import io.rsocket.transport.local.LocalServerTransport;
import io.rsocket.util.EmptyPayload;
import java.util.stream.IntStream;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@BenchmarkMode(Mode.Throughput)
@Fork(
value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"}
)
@Warmup(iterations = 10)
@Measurement(iterations = 10, time = 20)
@State(Scope.Benchmark)
public class RSocketPerf {

static final Payload PAYLOAD = EmptyPayload.INSTANCE;
static final Mono<Payload> PAYLOAD_MONO = Mono.just(PAYLOAD);
static final Flux<Payload> PAYLOAD_FLUX =
Flux.fromArray(IntStream.range(0, 100000).mapToObj(__ -> PAYLOAD).toArray(Payload[]::new));

RSocket client;
Closeable server;

@Setup
public void setUp() {
server =
RSocketFactory.receive()
.acceptor(
(setup, sendingSocket) ->
Mono.just(
new AbstractRSocket() {

@Override
public Mono<Void> fireAndForget(Payload payload) {
payload.release();
return Mono.empty();
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
payload.release();
return PAYLOAD_MONO;
}

@Override
public Flux<Payload> requestStream(Payload payload) {
payload.release();
return PAYLOAD_FLUX;
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads);
}
}))
.transport(LocalServerTransport.create("server"))
.start()
.block();

client =
RSocketFactory.connect()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(LocalClientTransport.create("server"))
.start()
.block();
}

@Benchmark
@SuppressWarnings("unchecked")
public PerfSubscriber fireAndForget(Blackhole blackhole) throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
client.fireAndForget(PAYLOAD).subscribe((CoreSubscriber) subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public PerfSubscriber requestResponse(Blackhole blackhole) throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
client.requestResponse(PAYLOAD).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public PerfSubscriber requestStreamWithRequestByOneStrategy(Blackhole blackhole)
throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
client.requestStream(PAYLOAD).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public MaxPerfSubscriber requestStreamWithRequestAllStrategy(Blackhole blackhole)
throws InterruptedException {
MaxPerfSubscriber subscriber = new MaxPerfSubscriber(blackhole);
client.requestStream(PAYLOAD).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public PerfSubscriber requestChannelWithRequestByOneStrategy(Blackhole blackhole)
throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
client.requestChannel(PAYLOAD_FLUX).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public MaxPerfSubscriber requestChannelWithRequestAllStrategy(Blackhole blackhole)
throws InterruptedException {
MaxPerfSubscriber subscriber = new MaxPerfSubscriber(blackhole);
client.requestChannel(PAYLOAD_FLUX).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}
}
Loading

0 comments on commit 9a831f1

Please sign in to comment.