Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# limitations under the License.
#

version=0.11.16-SNAPSHOT
version=0.12.1-SNAPSHOT
10 changes: 10 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,15 @@ public ByteBuf sliceMetadata() {
public ByteBuf sliceData() {
return SetupFrameFlyweight.data(setupFrame);
}

@Override
public ByteBuf data() {
return sliceData();
}

@Override
public ByteBuf metadata() {
return sliceMetadata();
}
}
}
20 changes: 18 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/Payload.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public interface Payload extends ReferenceCounted {
boolean hasMetadata();

/**
* Returns the Payload metadata. Always non-null, check {@link #hasMetadata()} to differentiate
* null from "".
* Returns a slice Payload metadata. Always non-null, check {@link #hasMetadata()} to
* differentiate null from "".
*
* @return payload metadata.
*/
Expand All @@ -46,6 +46,22 @@ public interface Payload extends ReferenceCounted {
*/
ByteBuf sliceData();

/**
* Returns the Payloads' data without slicing if possible. This is not safe and editing this could
* effect the payload. It is recommended to call sliceData().
*
* @return data as a bytebuf or slice of the data
*/
ByteBuf data();

/**
* Returns the Payloads' metadata without slicing if possible. This is not safe and editing this
* could effect the payload. It is recommended to call sliceMetadata().
*
* @return metadata as a bytebuf or slice of the metadata
*/
ByteBuf metadata();

/** Increases the reference count by {@code 1}. */
@Override
Payload retain();
Expand Down
8 changes: 5 additions & 3 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
Expand Down Expand Up @@ -220,7 +221,6 @@ private Mono<Void> handleFireAndForget(Payload payload) {
false,
payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
payload.sliceData().retain());

payload.release();
sendProcessor.onNext(requestFrame);
}));
Expand Down Expand Up @@ -292,12 +292,12 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
false,
payload.sliceMetadata().retain(),
payload.sliceData().retain());
payload.release();

UnicastMonoProcessor<Payload> receiver = UnicastMonoProcessor.create();
receivers.put(streamId, receiver);

sendProcessor.onNext(requestFrame);

return receiver
.doOnError(
t ->
Expand Down Expand Up @@ -472,8 +472,10 @@ private void handleIncomingFrames(ByteBuf frame) {
} else {
handleFrame(streamId, type, frame);
}
} finally {
frame.release();
} catch (Throwable t) {
ReferenceCountUtil.safeRelease(frame);
throw reactor.core.Exceptions.propagate(t);
}
}

Expand Down
14 changes: 3 additions & 11 deletions rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.exceptions.InvalidSetupException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.fragmentation.FragmentationDuplexConnection;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.SetupFrameFlyweight;
import io.rsocket.frame.VersionFlyweight;
Expand Down Expand Up @@ -216,7 +215,7 @@ private class StartClient implements Start<RSocket> {
public Mono<RSocket> start() {
return transportClient
.get()
.connect()
.connect(mtu)
.flatMap(
connection -> {
ByteBuf setupFrame =
Expand All @@ -231,10 +230,6 @@ public Mono<RSocket> start() {
setupPayload.sliceMetadata(),
setupPayload.sliceData());

if (mtu > 0) {
connection = new FragmentationDuplexConnection(connection, mtu);
}

ClientServerInputMultiplexer multiplexer =
new ClientServerInputMultiplexer(connection, plugins);

Expand Down Expand Up @@ -333,10 +328,6 @@ public Mono<T> start() {
.get()
.start(
connection -> {
if (mtu > 0) {
connection = new FragmentationDuplexConnection(connection, mtu);
}

ClientServerInputMultiplexer multiplexer =
new ClientServerInputMultiplexer(connection, plugins);

Expand All @@ -345,7 +336,8 @@ public Mono<T> start() {
.receive()
.next()
.flatMap(setupFrame -> processSetupFrame(multiplexer, setupFrame));
});
},
mtu);
}

private Mono<Void> processSetupFrame(
Expand Down
43 changes: 37 additions & 6 deletions rsocket-core/src/main/java/io/rsocket/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
Expand Down Expand Up @@ -330,8 +331,10 @@ private void handleFrame(ByteBuf frame) {
new IllegalStateException("ServerRSocket: Unexpected frame type: " + frameType));
break;
}
} finally {
ReferenceCountUtil.safeRelease(frame);
} catch (Throwable t) {
ReferenceCountUtil.safeRelease(frame);
throw Exceptions.propagate(t);
}
}

Expand All @@ -345,11 +348,28 @@ private void handleFireAndForget(int streamId, Mono<Void> result) {
private void handleRequestResponse(int streamId, Mono<Payload> response) {
response
.doOnSubscribe(subscription -> sendingSubscriptions.put(streamId, subscription))
.map(payload -> PayloadFrameFlyweight.encodeNextComplete(allocator, streamId, payload))
.map(
payload -> {
ByteBuf byteBuf = null;
try {
byteBuf = PayloadFrameFlyweight.encodeNextComplete(allocator, streamId, payload);
} catch (Throwable t) {
if (byteBuf != null) {
ReferenceCountUtil.safeRelease(byteBuf);
ReferenceCountUtil.safeRelease(payload);
Copy link
Member

@yschimke yschimke Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this release as well as line 362? I can see this repeated below also, so assuming deliberate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can probably remove the payload one. I might have added it when I was chasing done some memory leaks.

}
}
payload.release();
return byteBuf;
})
.switchIfEmpty(
Mono.fromCallable(() -> PayloadFrameFlyweight.encodeComplete(allocator, streamId)))
.doFinally(signalType -> sendingSubscriptions.remove(streamId))
.subscribe(t1 -> sendProcessor.onNext(t1), t -> handleError(streamId, t));
.subscribe(
t1 -> {
sendProcessor.onNext(t1);
},
t -> handleError(streamId, t));
}

private void handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
Expand All @@ -364,9 +384,20 @@ private void handleStream(int streamId, Flux<Payload> response, int initialReque
})
.doFinally(signalType -> sendingSubscriptions.remove(streamId))
.subscribe(
payload ->
sendProcessor.onNext(
PayloadFrameFlyweight.encodeNext(allocator, streamId, payload)),
payload -> {
ByteBuf byteBuf = null;
try {
byteBuf = PayloadFrameFlyweight.encodeNext(allocator, streamId, payload);
} catch (Throwable t) {
if (byteBuf != null) {
ReferenceCountUtil.safeRelease(byteBuf);
ReferenceCountUtil.safeRelease(payload);
}
throw Exceptions.propagate(t);
}
payload.release();
sendProcessor.onNext(byteBuf);
},
t -> handleError(streamId, t),
() -> sendProcessor.onNext(PayloadFrameFlyweight.encodeComplete(allocator, streamId)));
}
Expand Down
Loading