Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions rsocket-core/src/jmh/java/io/rsocket/FragmentationPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ public void setup(Blackhole bh) {
ByteBuffer data = createRandomBytes(1 << 18);
ByteBuffer metadata = createRandomBytes(1 << 18);
largeFrame =
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
Frame.Request.from(
1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
largeFrameFragmenter = new FrameFragmenter(1024);

data = createRandomBytes(16);
metadata = createRandomBytes(16);
smallFrame =
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
Frame.Request.from(
1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
smallFrameFragmenter = new FrameFragmenter(2);
smallFramesIterable =
smallFrameFragmenter
Expand Down
1 change: 0 additions & 1 deletion rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.rsocket.RSocketFactory.Start;
import io.rsocket.perfutil.TestDuplexConnection;
import io.rsocket.util.DefaultPayload;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public ConnectionSetupPayload retain(int increment) {
}

public abstract ConnectionSetupPayload touch();

public abstract ConnectionSetupPayload touch(Object hint);

private static final class DefaultConnectionSetupPayload extends ConnectionSetupPayload {
Expand All @@ -106,11 +107,7 @@ private static final class DefaultConnectionSetupPayload extends ConnectionSetup
private final int flags;

public DefaultConnectionSetupPayload(
String metadataMimeType,
String dataMimeType,
ByteBuf data,
ByteBuf metadata,
int flags) {
String metadataMimeType, String dataMimeType, ByteBuf data, ByteBuf metadata, int flags) {
this.metadataMimeType = metadataMimeType;
this.dataMimeType = dataMimeType;
this.data = data;
Expand Down
3 changes: 2 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ public static Frame from(
String metadataMimeType,
String dataMimeType,
Payload payload) {
final ByteBuf metadata = payload.hasMetadata() ? payload.sliceMetadata() : Unpooled.EMPTY_BUFFER;
final ByteBuf metadata =
payload.hasMetadata() ? payload.sliceMetadata() : Unpooled.EMPTY_BUFFER;
final ByteBuf data = payload.sliceData();

final Frame frame = RECYCLER.get();
Expand Down
28 changes: 11 additions & 17 deletions rsocket-core/src/main/java/io/rsocket/Payload.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
package io.rsocket;

import io.netty.buffer.ByteBuf;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import io.netty.util.ReferenceCounted;
import io.netty.util.ResourceLeakDetector;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/** Payload of a {@link Frame}. */
public interface Payload extends ReferenceCounted {
Expand All @@ -47,30 +45,26 @@ public interface Payload extends ReferenceCounted {
*/
ByteBuf sliceData();

/**
* Increases the reference count by {@code 1}.
*/
/** Increases the reference count by {@code 1}. */
@Override
Payload retain();

/**
* Increases the reference count by the specified {@code increment}.
*/
/** Increases the reference count by the specified {@code increment}. */
@Override
Payload retain(int increment);

/**
* Records the current access location of this object for debugging purposes.
* If this object is determined to be leaked, the information recorded by this operation will be provided to you
* via {@link ResourceLeakDetector}. This method is a shortcut to {@link #touch(Object) touch(null)}.
* Records the current access location of this object for debugging purposes. If this object is
* determined to be leaked, the information recorded by this operation will be provided to you via
* {@link ResourceLeakDetector}. This method is a shortcut to {@link #touch(Object) touch(null)}.
*/
@Override
Payload touch();

/**
* Records the current access location of this object with an additional arbitrary information for debugging
* purposes. If this object is determined to be leaked, the information recorded by this operation will be
* provided to you via {@link ResourceLeakDetector}.
* Records the current access location of this object with an additional arbitrary information for
* debugging purposes. If this object is determined to be leaked, the information recorded by this
* operation will be provided to you via {@link ResourceLeakDetector}.
*/
@Override
Payload touch(Object hint);
Expand All @@ -90,4 +84,4 @@ default String getMetadataUtf8() {
default String getDataUtf8() {
return sliceData().toString(StandardCharsets.UTF_8);
}
}
}
14 changes: 8 additions & 6 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.rsocket.exceptions.Exceptions;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;

import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -64,7 +63,8 @@ class RSocketClient implements RSocket {
Function<Frame, ? extends Payload> frameDecoder,
Consumer<Throwable> errorConsumer,
StreamIdSupplier streamIdSupplier) {
this(connection, frameDecoder, errorConsumer, streamIdSupplier, Duration.ZERO, Duration.ZERO, 0);
this(
connection, frameDecoder, errorConsumer, streamIdSupplier, Duration.ZERO, Duration.ZERO, 0);
}

RSocketClient(
Expand Down Expand Up @@ -372,11 +372,13 @@ public Flux<Payload> get() {
public Frame apply(Payload payload) {
final Frame requestFrame;
if (firstPayload.compareAndSet(true, false)) {
requestFrame = Frame.Request.from(
streamId, requestType, payload, l);
requestFrame =
Frame.Request.from(
streamId, requestType, payload, l);
} else {
requestFrame = Frame.PayloadFrame.from(
streamId, FrameType.NEXT, payload);
requestFrame =
Frame.PayloadFrame.from(
streamId, FrameType.NEXT, payload);
}
payload.release();
return requestFrame;
Expand Down
11 changes: 7 additions & 4 deletions rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.EmptyPayload;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import io.rsocket.util.EmptyPayload;
import reactor.core.publisher.Mono;

/** Factory for creating RSocket clients and servers. */
Expand Down Expand Up @@ -244,7 +243,10 @@ public Mono<RSocket> start() {
.doOnNext(
rSocket ->
new RSocketServer(
multiplexer.asServerConnection(), rSocket, frameDecoder, errorConsumer))
multiplexer.asServerConnection(),
rSocket,
frameDecoder,
errorConsumer))
.then(finalConnection.sendOne(setupFrame))
.then(wrappedRSocketClient);
});
Expand Down Expand Up @@ -359,7 +361,8 @@ private Mono<? extends Void> processSetupFrame(
sender -> acceptor.get().accept(setupPayload, sender).map(plugins::applyServer))
.map(
handler ->
new RSocketServer(multiplexer.asClientConnection(), handler, frameDecoder, errorConsumer))
new RSocketServer(
multiplexer.asClientConnection(), handler, frameDecoder, errorConsumer))
.then();
}
}
Expand Down
15 changes: 8 additions & 7 deletions rsocket-core/src/main/java/io/rsocket/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.rsocket.exceptions.ApplicationException;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;

import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -314,7 +313,8 @@ private Mono<Void> handleRequestResponse(int streamId, Mono<Payload> response) {
if (payload.hasMetadata()) {
flags = Frame.setFlag(flags, FLAGS_M);
}
final Frame frame = Frame.PayloadFrame.from(streamId, FrameType.NEXT_COMPLETE, payload, flags);
final Frame frame =
Frame.PayloadFrame.from(streamId, FrameType.NEXT_COMPLETE, payload, flags);
payload.release();
return frame;
})
Expand All @@ -330,11 +330,12 @@ private Mono<Void> handleRequestResponse(int streamId, Mono<Payload> response) {

private Mono<Void> handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
response
.map(payload -> {
final Frame frame = Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload);
payload.release();
return frame;
})
.map(
payload -> {
final Frame frame = Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload);
payload.release();
return frame;
})
.transform(
frameFlux -> {
LimitableRequestPublisher<Frame> frames = LimitableRequestPublisher.wrap(frameFlux);
Expand Down
124 changes: 62 additions & 62 deletions rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,71 +10,71 @@
import reactor.core.publisher.Operators;

public final class SwitchTransform<T, R> extends Flux<R> {

final Publisher<? extends T> source;

final Publisher<? extends T> source;
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;

public SwitchTransform(
Publisher<? extends T> source, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
this.source = Objects.requireNonNull(source, "source");
this.transformer = Objects.requireNonNull(transformer, "transformer");
}

@Override
public void subscribe(CoreSubscriber<? super R> actual) {
Flux.from(source).subscribe(new SwitchTransformSubscriber<>(actual, transformer));
}

static final class SwitchTransformSubscriber<T, R> implements CoreSubscriber<T> {
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<SwitchTransformSubscriber> ONCE =
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once");

final CoreSubscriber<? super R> actual;
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;

public SwitchTransform(
Publisher<? extends T> source, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
this.source = Objects.requireNonNull(source, "source");
this.transformer = Objects.requireNonNull(transformer, "transformer");
final UnboundedProcessor<T> processor = new UnboundedProcessor<>();
Subscription s;
volatile int once;

SwitchTransformSubscriber(
CoreSubscriber<? super R> actual,
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
this.actual = actual;
this.transformer = transformer;
}

@Override
public void subscribe(CoreSubscriber<? super R> actual) {
Flux.from(source).subscribe(new SwitchTransformSubscriber<>(actual, transformer));
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
processor.onSubscribe(s);
}
}

static final class SwitchTransformSubscriber<T, R> implements CoreSubscriber<T> {
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<SwitchTransformSubscriber> ONCE =
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once");

final CoreSubscriber<? super R> actual;
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
final UnboundedProcessor<T> processor = new UnboundedProcessor<>();
Subscription s;
volatile int once;

SwitchTransformSubscriber(
CoreSubscriber<? super R> actual,
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
this.actual = actual;
this.transformer = transformer;
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
processor.onSubscribe(s);
}
}

@Override
public void onNext(T t) {
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
try {
Publisher<? extends R> result =
Objects.requireNonNull(
transformer.apply(t, processor), "The transformer returned a null value");
Flux.from(result).subscribe(actual);
} catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return;
}
}
processor.onNext(t);
}

@Override
public void onError(Throwable t) {
processor.onError(t);
}

@Override
public void onComplete() {
processor.onComplete();

@Override
public void onNext(T t) {
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
try {
Publisher<? extends R> result =
Objects.requireNonNull(
transformer.apply(t, processor), "The transformer returned a null value");
Flux.from(result).subscribe(actual);
} catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return;
}
}
processor.onNext(t);
}

@Override
public void onError(Throwable t) {
processor.onError(t);
}

@Override
public void onComplete() {
processor.onComplete();
}
}
}
}
2 changes: 1 addition & 1 deletion rsocket-core/src/main/java/io/rsocket/lease/Lease.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.rsocket.lease;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;

/** A contract for RSocket lease, which is sent by a request acceptor and is time bound. */
public interface Lease {
Expand Down
Loading