diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java index 76580f7fe..8448d8439 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ByteBufferUtil.java @@ -21,8 +21,7 @@ public class ByteBufferUtil { - private ByteBufferUtil() { - } + private ByteBufferUtil() {} /** * Slice a portion of the {@link ByteBuffer} while preserving the buffers position and limit. diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java index fdb9f57e0..4340faaa1 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ErrorFrameFlyweight.java @@ -39,11 +39,11 @@ private ErrorFrameFlyweight() {} public static final int INVALID_SETUP = 0x0001; public static final int UNSUPPORTED_SETUP = 0x0002; public static final int REJECTED_SETUP = 0x0003; - public static final int CONNECTION_ERROR = 0x0011; - public static final int APPLICATION_ERROR = 0x0021; + public static final int CONNECTION_ERROR = 0x0101; + public static final int APPLICATION_ERROR = 0x0201; public static final int REJECTED = 0x0022; - public static final int CANCEL = 0x0023; - public static final int INVALID = 0x0024; + public static final int CANCEL = 0x0203; + public static final int INVALID = 0x0204; // relative to start of passed offset private static final int ERROR_CODE_FIELD_OFFSET = FrameHeaderFlyweight.FRAME_HEADER_LENGTH; diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java index ae1285bbe..ce8a633ca 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/FrameHeaderFlyweight.java @@ -35,8 +35,7 @@ * * Not thread-safe. Assumed to be used single-threaded */ -public class FrameHeaderFlyweight -{ +public class FrameHeaderFlyweight { private FrameHeaderFlyweight() {} @@ -62,14 +61,10 @@ private FrameHeaderFlyweight() {} public static final int FLAGS_REQUEST_CHANNEL_F = 0b0010_0000_0000_0000; - static - { - if (INCLUDE_FRAME_LENGTH) - { + static { + if (INCLUDE_FRAME_LENGTH) { FRAME_LENGTH_FIELD_OFFSET = 0; - } - else - { + } else { FRAME_LENGTH_FIELD_OFFSET = -BitUtil.SIZE_OF_INT; } @@ -81,8 +76,7 @@ private FrameHeaderFlyweight() {} FRAME_HEADER_LENGTH = PAYLOAD_OFFSET; } - public static int computeFrameHeaderLength(final FrameType frameType, int metadataLength, final int dataLength) - { + public static int computeFrameHeaderLength(final FrameType frameType, int metadataLength, final int dataLength) { return PAYLOAD_OFFSET + computeMetadataLength(metadataLength) + dataLength; } @@ -92,10 +86,9 @@ public static int encodeFrameHeader( final int frameLength, final int flags, final FrameType frameType, - final int streamId) - { - if (INCLUDE_FRAME_LENGTH) - { + final int streamId + ) { + if (INCLUDE_FRAME_LENGTH) { mutableDirectBuffer.putInt(offset + FRAME_LENGTH_FIELD_OFFSET, frameLength, ByteOrder.BIG_ENDIAN); } @@ -110,13 +103,12 @@ public static int encodeMetadata( final MutableDirectBuffer mutableDirectBuffer, final int frameHeaderStartOffset, final int metadataOffset, - final ByteBuffer metadata) - { + final ByteBuffer metadata + ) { int length = 0; final int metadataLength = metadata.remaining(); - if (0 < metadataLength) - { + if (0 < metadataLength) { int flags = mutableDirectBuffer.getShort(frameHeaderStartOffset + FLAGS_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); flags |= FLAGS_M; mutableDirectBuffer.putShort(frameHeaderStartOffset + FLAGS_FIELD_OFFSET, (short)flags, ByteOrder.BIG_ENDIAN); @@ -132,13 +124,12 @@ public static int encodeMetadata( public static int encodeData( final MutableDirectBuffer mutableDirectBuffer, final int dataOffset, - final ByteBuffer data) - { + final ByteBuffer data + ) { int length = 0; final int dataLength = data.remaining(); - if (0 < dataLength) - { + if (0 < dataLength) { mutableDirectBuffer.putBytes(dataOffset, data, dataLength); length += dataLength; } @@ -154,14 +145,13 @@ public static int encode( int flags, final FrameType frameType, final ByteBuffer metadata, - final ByteBuffer data) - { + final ByteBuffer data + ) { final int frameLength = computeFrameHeaderLength(frameType, metadata.remaining(), data.remaining()); final FrameType outFrameType; - switch (frameType) - { + switch (frameType) { case COMPLETE: outFrameType = FrameType.RESPONSE; flags |= FLAGS_RESPONSE_C; @@ -182,30 +172,23 @@ public static int encode( return length; } - public static int flags(final DirectBuffer directBuffer, final int offset) - { + public static int flags(final DirectBuffer directBuffer, final int offset) { return directBuffer.getShort(offset + FLAGS_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); } - public static FrameType frameType(final DirectBuffer directBuffer, final int offset) - { + public static FrameType frameType(final DirectBuffer directBuffer, final int offset) { FrameType result = FrameType.from(directBuffer.getShort(offset + TYPE_FIELD_OFFSET, ByteOrder.BIG_ENDIAN)); - if (FrameType.RESPONSE == result) - { + if (FrameType.RESPONSE == result) { final int flags = flags(directBuffer, offset); final int dataLength = dataLength(directBuffer, offset, 0); - if (FLAGS_RESPONSE_C == (flags & FLAGS_RESPONSE_C) && 0 < dataLength) - { + boolean complete = FLAGS_RESPONSE_C == (flags & FLAGS_RESPONSE_C); + if (complete && 0 < dataLength) { result = FrameType.NEXT_COMPLETE; - } - else if (FLAGS_RESPONSE_C == (flags & FLAGS_RESPONSE_C)) - { + } else if (complete) { result = FrameType.COMPLETE; - } - else - { + } else { result = FrameType.NEXT; } } @@ -213,83 +196,71 @@ else if (FLAGS_RESPONSE_C == (flags & FLAGS_RESPONSE_C)) return result; } - public static int streamId(final DirectBuffer directBuffer, final int offset) - { + public static int streamId(final DirectBuffer directBuffer, final int offset) { return directBuffer.getInt(offset + STREAM_ID_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); } - public static ByteBuffer sliceFrameData(final DirectBuffer directBuffer, final int offset, final int length) - { + public static ByteBuffer sliceFrameData(final DirectBuffer directBuffer, final int offset, final int length) { final int dataLength = dataLength(directBuffer, offset, length); final int dataOffset = dataOffset(directBuffer, offset); ByteBuffer result = NULL_BYTEBUFFER; - if (0 < dataLength) - { + if (0 < dataLength) { result = preservingSlice(directBuffer.byteBuffer(), dataOffset, dataOffset + dataLength); } return result; } - public static ByteBuffer sliceFrameMetadata(final DirectBuffer directBuffer, final int offset, final int length) - { + public static ByteBuffer sliceFrameMetadata(final DirectBuffer directBuffer, final int offset, final int length) { final int metadataLength = Math.max(0, metadataFieldLength(directBuffer, offset) - BitUtil.SIZE_OF_INT); final int metadataOffset = metadataOffset(directBuffer, offset) + BitUtil.SIZE_OF_INT; ByteBuffer result = NULL_BYTEBUFFER; - if (0 < metadataLength) - { + if (0 < metadataLength) { result = preservingSlice(directBuffer.byteBuffer(), metadataOffset, metadataOffset + metadataLength); } return result; } - private static int frameLength(final DirectBuffer directBuffer, final int offset, final int externalFrameLength) - { + private static int frameLength(final DirectBuffer directBuffer, final int offset, final int externalFrameLength) { int frameLength = externalFrameLength; - if (INCLUDE_FRAME_LENGTH) - { + if (INCLUDE_FRAME_LENGTH) { frameLength = directBuffer.getInt(offset + FRAME_LENGTH_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); } return frameLength; } - private static int computeMetadataLength(final int metadataPayloadLength) - { + private static int computeMetadataLength(final int metadataPayloadLength) { return metadataPayloadLength + ((0 == metadataPayloadLength) ? 0 : BitUtil.SIZE_OF_INT); } - private static int metadataFieldLength(final DirectBuffer directBuffer, final int offset) - { + private static int metadataFieldLength(final DirectBuffer directBuffer, final int offset) { int metadataLength = 0; - if (FLAGS_M == (FLAGS_M & directBuffer.getShort(offset + FLAGS_FIELD_OFFSET, ByteOrder.BIG_ENDIAN))) - { + short flags = directBuffer.getShort(offset + FLAGS_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); + if (FLAGS_M == (FLAGS_M & flags)) { metadataLength = directBuffer.getInt(metadataOffset(directBuffer, offset), ByteOrder.BIG_ENDIAN) & 0xFFFFFF; } return metadataLength; } - private static int dataLength(final DirectBuffer directBuffer, final int offset, final int externalLength) - { + private static int dataLength(final DirectBuffer directBuffer, final int offset, final int externalLength) { final int frameLength = frameLength(directBuffer, offset, externalLength); final int metadataLength = metadataFieldLength(directBuffer, offset); return offset + frameLength - metadataLength - payloadOffset(directBuffer, offset); } - private static int payloadOffset(final DirectBuffer directBuffer, final int offset) - { + private static int payloadOffset(final DirectBuffer directBuffer, final int offset) { final FrameType frameType = FrameType.from(directBuffer.getShort(offset + TYPE_FIELD_OFFSET, ByteOrder.BIG_ENDIAN)); int result = offset + PAYLOAD_OFFSET; - switch (frameType) - { + switch (frameType) { case SETUP: result = SetupFrameFlyweight.payloadOffset(directBuffer, offset); break; @@ -317,13 +288,11 @@ private static int payloadOffset(final DirectBuffer directBuffer, final int offs return result; } - private static int metadataOffset(final DirectBuffer directBuffer, final int offset) - { + private static int metadataOffset(final DirectBuffer directBuffer, final int offset) { return payloadOffset(directBuffer, offset); } - private static int dataOffset(final DirectBuffer directBuffer, final int offset) - { + private static int dataOffset(final DirectBuffer directBuffer, final int offset) { return payloadOffset(directBuffer, offset) + metadataFieldLength(directBuffer, offset); } } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java index a25350539..0ca96fa8c 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/KeepaliveFrameFlyweight.java @@ -21,22 +21,20 @@ import java.nio.ByteBuffer; -public class KeepaliveFrameFlyweight -{ +public class KeepaliveFrameFlyweight { private KeepaliveFrameFlyweight() {} private static final int PAYLOAD_OFFSET = FrameHeaderFlyweight.FRAME_HEADER_LENGTH; - public static int computeFrameLength(final int dataLength) - { + public static int computeFrameLength(final int dataLength) { return FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.SETUP, 0, dataLength); } public static int encode( final MutableDirectBuffer mutableDirectBuffer, final int offset, - final ByteBuffer data) - { + final ByteBuffer data + ) { final int frameLength = computeFrameLength(data.remaining()); int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, 0, FrameType.KEEPALIVE, 0); @@ -46,8 +44,7 @@ public static int encode( return length; } - public static int payloadOffset(final DirectBuffer directBuffer, final int offset) - { + public static int payloadOffset(final DirectBuffer directBuffer, final int offset) { return offset + PAYLOAD_OFFSET; } } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java index ceed7ee62..ff3147e04 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/LeaseFrameFlyweight.java @@ -23,8 +23,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; -public class LeaseFrameFlyweight -{ +public class LeaseFrameFlyweight { private LeaseFrameFlyweight() {} // relative to start of passed offset @@ -32,10 +31,8 @@ private LeaseFrameFlyweight() {} private static final int NUM_REQUESTS_FIELD_OFFSET = TTL_FIELD_OFFSET + BitUtil.SIZE_OF_INT; private static final int PAYLOAD_OFFSET = NUM_REQUESTS_FIELD_OFFSET + BitUtil.SIZE_OF_INT; - public static int computeFrameLength(final int metadataLength) - { + public static int computeFrameLength(final int metadataLength) { int length = FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.SETUP, metadataLength, 0); - return length + BitUtil.SIZE_OF_INT * 2; } @@ -44,8 +41,8 @@ public static int encode( final int offset, final int ttl, final int numRequests, - final ByteBuffer metadata) - { + final ByteBuffer metadata + ) { final int frameLength = computeFrameLength(metadata.remaining()); int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, 0, FrameType.LEASE, 0); @@ -59,18 +56,15 @@ public static int encode( return length; } - public static int ttl(final DirectBuffer directBuffer, final int offset) - { + public static int ttl(final DirectBuffer directBuffer, final int offset) { return directBuffer.getInt(offset + TTL_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); } - public static int numRequests(final DirectBuffer directBuffer, final int offset) - { + public static int numRequests(final DirectBuffer directBuffer, final int offset) { return directBuffer.getInt(offset + NUM_REQUESTS_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); } - public static int payloadOffset(final DirectBuffer directBuffer, final int offset) - { + public static int payloadOffset(final DirectBuffer directBuffer, final int offset) { return offset + PAYLOAD_OFFSET; } } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java index c3222dbc5..e4aa0f0c7 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadBuilder.java @@ -27,8 +27,7 @@ /** * Builder for appending buffers that grows dataCapacity as necessary. Similar to Aeron's PayloadBuilder. */ -public class PayloadBuilder -{ +public class PayloadBuilder { public static final int INITIAL_CAPACITY = Math.max(Frame.DATA_MTU, Frame.METADATA_MTU); private final MutableDirectBuffer dataMutableDirectBuffer; @@ -41,8 +40,7 @@ public class PayloadBuilder private int dataCapacity; private int metadataCapacity; - public PayloadBuilder() - { + public PayloadBuilder() { dataCapacity = BitUtil.findNextPositivePowerOfTwo(INITIAL_CAPACITY); metadataCapacity = BitUtil.findNextPositivePowerOfTwo(INITIAL_CAPACITY); dataBuffer = new byte[dataCapacity]; @@ -51,24 +49,20 @@ public PayloadBuilder() metadataMutableDirectBuffer = new UnsafeBuffer(metadataBuffer); } - public Payload payload() - { - return new Payload() - { + public Payload payload() { + return new Payload() { public ByteBuffer getData() { return ByteBuffer.wrap(dataBuffer, 0, dataLimit); } - public ByteBuffer getMetadata() - { + public ByteBuffer getMetadata() { return ByteBuffer.wrap(metadataBuffer, 0, metadataLimit); } }; } - public void append(final Payload payload) - { + public void append(final Payload payload) { final ByteBuffer payloadMetadata = payload.getMetadata(); final ByteBuffer payloadData = payload.getData(); final int metadataLength = payloadMetadata.remaining(); @@ -83,18 +77,15 @@ public void append(final Payload payload) dataLimit += dataLength; } - private void ensureDataCapacity(final int additionalCapacity) - { + private void ensureDataCapacity(final int additionalCapacity) { final int requiredCapacity = dataLimit + additionalCapacity; - if (requiredCapacity < 0) - { + if (requiredCapacity < 0) { final String s = String.format("Insufficient data capacity: dataLimit=%d additional=%d", dataLimit, additionalCapacity); throw new IllegalStateException(s); } - if (requiredCapacity > dataCapacity) - { + if (requiredCapacity > dataCapacity) { final int newCapacity = findSuitableCapacity(dataCapacity, requiredCapacity); final byte[] newBuffer = Arrays.copyOf(dataBuffer, newCapacity); @@ -104,18 +95,15 @@ private void ensureDataCapacity(final int additionalCapacity) } } - private void ensureMetadataCapacity(final int additionalCapacity) - { + private void ensureMetadataCapacity(final int additionalCapacity) { final int requiredCapacity = metadataLimit + additionalCapacity; - if (requiredCapacity < 0) - { + if (requiredCapacity < 0) { final String s = String.format("Insufficient metadata capacity: metadataLimit=%d additional=%d", metadataLimit, additionalCapacity); throw new IllegalStateException(s); } - if (requiredCapacity > metadataCapacity) - { + if (requiredCapacity > metadataCapacity) { final int newCapacity = findSuitableCapacity(metadataCapacity, requiredCapacity); final byte[] newBuffer = Arrays.copyOf(metadataBuffer, newCapacity); @@ -125,13 +113,10 @@ private void ensureMetadataCapacity(final int additionalCapacity) } } - private static int findSuitableCapacity(int capacity, final int requiredCapacity) - { - do - { + private static int findSuitableCapacity(int capacity, final int requiredCapacity) { + do { capacity <<= 1; - } - while (capacity < requiredCapacity); + } while (capacity < requiredCapacity); return capacity; } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java index 03b1a2c55..ee474bd0d 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadFragmenter.java @@ -27,10 +27,8 @@ * * Not thread-safe */ -public class PayloadFragmenter implements Iterable, Iterator -{ - private enum Type - { +public class PayloadFragmenter implements Iterable, Iterator { + private enum Type { RESPONSE, RESPONSE_COMPLETE, REQUEST_CHANNEL } @@ -44,51 +42,43 @@ private enum Type private int streamId; private int initialRequestN; - public PayloadFragmenter(final int metadataMtu, final int dataMtu) - { + public PayloadFragmenter(final int metadataMtu, final int dataMtu) { this.metadataMtu = metadataMtu; this.dataMtu = dataMtu; } - public void resetForResponse(final int streamId, final Payload payload) - { + public void resetForResponse(final int streamId, final Payload payload) { reset(streamId, payload); type = Type.RESPONSE; } - public void resetForResponseComplete(final int streamId, final Payload payload) - { + public void resetForResponseComplete(final int streamId, final Payload payload) { reset(streamId, payload); type = Type.RESPONSE_COMPLETE; } - public void resetForRequestChannel(final int streamId, final Payload payload, final int initialRequestN) - { + public void resetForRequestChannel(final int streamId, final Payload payload, final int initialRequestN) { reset(streamId, payload); type = Type.REQUEST_CHANNEL; this.initialRequestN = initialRequestN; } - public static boolean requiresFragmenting(final int metadataMtu, final int dataMtu, final Payload payload) - { + public static boolean requiresFragmenting(final int metadataMtu, final int dataMtu, final Payload payload) { final ByteBuffer metadata = payload.getMetadata(); final ByteBuffer data = payload.getData(); return metadata.remaining() > metadataMtu || data.remaining() > dataMtu; } - public Iterator iterator() - { + public Iterator iterator() { return this; } - public boolean hasNext() - { + public boolean hasNext() { return dataOffset < data.remaining() || metadataOffset < metadata.remaining(); } - public Frame next() - { + public Frame next() { final int metadataLength = Math.min(metadataMtu, metadata.remaining() - metadataOffset); final int dataLength = Math.min(dataMtu, data.remaining() - dataOffset); @@ -106,28 +96,21 @@ public Frame next() final boolean isMoreFollowing = metadataOffset < metadata.remaining() || dataOffset < data.remaining(); int flags = 0; - if (Type.RESPONSE == type) - { - if (isMoreFollowing) - { + if (Type.RESPONSE == type) { + if (isMoreFollowing) { flags |= FrameHeaderFlyweight.FLAGS_RESPONSE_F; } result = Frame.Response.from(streamId, FrameType.NEXT, metadataBuffer, dataBuffer, flags); } - if (Type.RESPONSE_COMPLETE == type) - { - if (isMoreFollowing) - { + if (Type.RESPONSE_COMPLETE == type) { + if (isMoreFollowing) { flags |= FrameHeaderFlyweight.FLAGS_RESPONSE_F; } result = Frame.Response.from(streamId, FrameType.NEXT_COMPLETE, metadataBuffer, dataBuffer, flags); - } - else if (Type.REQUEST_CHANNEL == type) - { - if (isMoreFollowing) - { + } else if (Type.REQUEST_CHANNEL == type) { + if (isMoreFollowing) { flags |= FrameHeaderFlyweight.FLAGS_REQUEST_CHANNEL_F; } @@ -138,8 +121,7 @@ else if (Type.REQUEST_CHANNEL == type) return result; } - private void reset(final int streamId, final Payload payload) - { + private void reset(final int streamId, final Payload payload) { data = payload.getData(); metadata = payload.getMetadata(); metadataOffset = 0; diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java index 6d5212028..ab5f41cd8 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/PayloadReassembler.java @@ -22,44 +22,36 @@ import org.reactivestreams.Subscription; -public class PayloadReassembler implements Subscriber -{ +public class PayloadReassembler implements Subscriber { private final Subscriber child; private final Int2ObjectHashMap payloadByStreamId = new Int2ObjectHashMap<>(); - private PayloadReassembler(final Subscriber child) - { + private PayloadReassembler(final Subscriber child) { this.child = child; } - public static PayloadReassembler with(final Subscriber child) - { + public static PayloadReassembler with(final Subscriber child) { return new PayloadReassembler(child); } - public void resetStream(final int streamId) - { + public void resetStream(final int streamId) { payloadByStreamId.remove(streamId); } - public void onSubscribe(Subscription s) - { + public void onSubscribe(Subscription s) { // reset } - public void onNext(Frame frame) - { + public void onNext(Frame frame) { // if frame has no F bit and no waiting payload, then simply pass on final int streamId = frame.getStreamId(); PayloadBuilder payloadBuilder = payloadByStreamId.get(streamId); - if (FrameHeaderFlyweight.FLAGS_RESPONSE_F != (frame.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F)) - { + if (FrameHeaderFlyweight.FLAGS_RESPONSE_F != (frame.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F)) { Payload deliveryPayload = frame; // terminal frame - if (null != payloadBuilder) - { + if (null != payloadBuilder) { payloadBuilder.append(frame); deliveryPayload = payloadBuilder.payload(); payloadByStreamId.remove(streamId); @@ -67,10 +59,8 @@ public void onNext(Frame frame) child.onNext(deliveryPayload); } - else - { - if (null == payloadBuilder) - { + else { + if (null == payloadBuilder) { payloadBuilder = new PayloadBuilder(); payloadByStreamId.put(streamId, payloadBuilder); } @@ -79,13 +69,11 @@ public void onNext(Frame frame) } } - public void onError(Throwable t) - { + public void onError(Throwable t) { // reset and pass through } - public void onComplete() - { + public void onComplete() { // reset and pass through } } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java index 6db2ba997..c9e6bcf3a 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/RequestFrameFlyweight.java @@ -23,8 +23,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; -public class RequestFrameFlyweight -{ +public class RequestFrameFlyweight { private RequestFrameFlyweight() {} @@ -34,12 +33,10 @@ private RequestFrameFlyweight() {} // relative to start of passed offset private static final int INITIAL_REQUEST_N_FIELD_OFFSET = FrameHeaderFlyweight.FRAME_HEADER_LENGTH; - public static int computeFrameLength(final FrameType type, final int metadataLength, final int dataLength) - { + public static int computeFrameLength(final FrameType type, final int metadataLength, final int dataLength) { int length = FrameHeaderFlyweight.computeFrameHeaderLength(type, metadataLength, dataLength); - if (type.hasInitialRequestN()) - { + if (type.hasInitialRequestN()) { length += BitUtil.SIZE_OF_INT; } @@ -54,8 +51,8 @@ public static int encode( final FrameType type, final int initialRequestN, final ByteBuffer metadata, - final ByteBuffer data) - { + final ByteBuffer data + ) { final int frameLength = computeFrameLength(type, metadata.remaining(), data.remaining()); flags |= FLAGS_REQUEST_CHANNEL_N; @@ -77,8 +74,8 @@ public static int encode( final int flags, final FrameType type, final ByteBuffer metadata, - final ByteBuffer data) - { + final ByteBuffer data + ) { final int frameLength = computeFrameLength(type, metadata.remaining(), data.remaining()); int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, flags, type, streamId); @@ -89,17 +86,14 @@ public static int encode( return length; } - public static int initialRequestN(final DirectBuffer directBuffer, final int offset) - { + public static int initialRequestN(final DirectBuffer directBuffer, final int offset) { return directBuffer.getInt(offset + INITIAL_REQUEST_N_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); } - public static int payloadOffset(final FrameType type, final DirectBuffer directBuffer, final int offset) - { + public static int payloadOffset(final FrameType type, final DirectBuffer directBuffer, final int offset) { int result = offset + FrameHeaderFlyweight.FRAME_HEADER_LENGTH; - if (type.hasInitialRequestN()) - { + if (type.hasInitialRequestN()) { result += BitUtil.SIZE_OF_INT; } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java index 7ff679a90..863be53b0 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/RequestNFrameFlyweight.java @@ -22,15 +22,13 @@ import java.nio.ByteOrder; -public class RequestNFrameFlyweight -{ +public class RequestNFrameFlyweight { private RequestNFrameFlyweight() {} // relative to start of passed offset private static final int REQUEST_N_FIELD_OFFSET = FrameHeaderFlyweight.FRAME_HEADER_LENGTH; - public static int computeFrameLength() - { + public static int computeFrameLength() { int length = FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.REQUEST_N, 0, 0); return length + BitUtil.SIZE_OF_INT; @@ -40,8 +38,8 @@ public static int encode( final MutableDirectBuffer mutableDirectBuffer, final int offset, final int streamId, - final int requestN) - { + final int requestN + ) { final int frameLength = computeFrameLength(); int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, 0, FrameType.REQUEST_N, streamId); @@ -51,13 +49,11 @@ public static int encode( return length + BitUtil.SIZE_OF_INT; } - public static int requestN(final DirectBuffer directBuffer, final int offset) - { + public static int requestN(final DirectBuffer directBuffer, final int offset) { return directBuffer.getInt(offset + REQUEST_N_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); } - public static int payloadOffset(final DirectBuffer directBuffer, final int offset) - { + public static int payloadOffset(final DirectBuffer directBuffer, final int offset) { return offset + FrameHeaderFlyweight.FRAME_HEADER_LENGTH + BitUtil.SIZE_OF_INT; } } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java index bf7d06896..537230da5 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/SetupFrameFlyweight.java @@ -24,8 +24,7 @@ import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; -public class SetupFrameFlyweight -{ +public class SetupFrameFlyweight { private SetupFrameFlyweight() {} public static final int FLAGS_WILL_HONOR_LEASE = 0b0010_0000; @@ -43,8 +42,8 @@ public static int computeFrameLength( final String metadataMimeType, final String dataMimeType, final int metadataLength, - final int dataLength) - { + final int dataLength + ) { int length = FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.SETUP, metadataLength, dataLength); length += BitUtil.SIZE_OF_INT * 3; @@ -63,8 +62,8 @@ public static int encode( final String metadataMimeType, final String dataMimeType, final ByteBuffer metadata, - final ByteBuffer data) - { + final ByteBuffer data + ) { final int frameLength = computeFrameLength(metadataMimeType, dataMimeType, metadata.remaining(), data.remaining()); int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, flags, FrameType.SETUP, 0); @@ -84,29 +83,24 @@ public static int encode( return length; } - public static int version(final DirectBuffer directBuffer, final int offset) - { + public static int version(final DirectBuffer directBuffer, final int offset) { return directBuffer.getInt(offset + VERSION_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); } - public static int keepaliveInterval(final DirectBuffer directBuffer, final int offset) - { + public static int keepaliveInterval(final DirectBuffer directBuffer, final int offset) { return directBuffer.getInt(offset + KEEPALIVE_INTERVAL_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); } - public static int maxLifetime(final DirectBuffer directBuffer, final int offset) - { + public static int maxLifetime(final DirectBuffer directBuffer, final int offset) { return directBuffer.getInt(offset + MAX_LIFETIME_FIELD_OFFSET, ByteOrder.BIG_ENDIAN); } - public static String metadataMimeType(final DirectBuffer directBuffer, final int offset) - { + public static String metadataMimeType(final DirectBuffer directBuffer, final int offset) { final byte[] bytes = getMimeType(directBuffer, offset + METADATA_MIME_TYPE_LENGTH_OFFSET); return new String(bytes, StandardCharsets.UTF_8); } - public static String dataMimeType(final DirectBuffer directBuffer, final int offset) - { + public static String dataMimeType(final DirectBuffer directBuffer, final int offset) { int fieldOffset = offset + METADATA_MIME_TYPE_LENGTH_OFFSET; fieldOffset += 1 + directBuffer.getByte(fieldOffset); @@ -115,8 +109,7 @@ public static String dataMimeType(final DirectBuffer directBuffer, final int off return new String(bytes, StandardCharsets.UTF_8); } - public static int payloadOffset(final DirectBuffer directBuffer, final int offset) - { + public static int payloadOffset(final DirectBuffer directBuffer, final int offset) { int fieldOffset = offset + METADATA_MIME_TYPE_LENGTH_OFFSET; final int metadataMimeTypeLength = directBuffer.getByte(fieldOffset); @@ -129,8 +122,7 @@ public static int payloadOffset(final DirectBuffer directBuffer, final int offse } private static int putMimeType( - final MutableDirectBuffer mutableDirectBuffer, final int fieldOffset, final String mimeType) - { + final MutableDirectBuffer mutableDirectBuffer, final int fieldOffset, final String mimeType) { byte[] bytes = mimeType.getBytes(StandardCharsets.UTF_8); mutableDirectBuffer.putByte(fieldOffset, (byte) bytes.length); @@ -139,8 +131,7 @@ private static int putMimeType( return 1 + bytes.length; } - private static byte[] getMimeType(final DirectBuffer directBuffer, final int fieldOffset) - { + private static byte[] getMimeType(final DirectBuffer directBuffer, final int fieldOffset) { final int length = directBuffer.getByte(fieldOffset); final byte[] bytes = new byte[length]; diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java index b7b6e1368..ab3ad4083 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ThreadLocalFramePool.java @@ -22,8 +22,7 @@ import java.nio.ByteBuffer; -public class ThreadLocalFramePool implements FramePool -{ +public class ThreadLocalFramePool implements FramePool { private static final int MAX_CAHED_FRAMES_PER_THREAD = 16; private static final ThreadLocal> PER_THREAD_FRAME_QUEUE = @@ -32,21 +31,18 @@ public class ThreadLocalFramePool implements FramePool private static final ThreadLocal> PER_THREAD_DIRECTBUFFER_QUEUE = ThreadLocal.withInitial(() -> new OneToOneConcurrentArrayQueue<>(MAX_CAHED_FRAMES_PER_THREAD)); - public Frame acquireFrame(int size) - { + public Frame acquireFrame(int size) { final MutableDirectBuffer directBuffer = acquireMutableDirectBuffer(size); Frame frame = pollFrame(); - if (null == frame) - { + if (null == frame) { frame = Frame.allocate(directBuffer); } return frame; } - public Frame acquireFrame(ByteBuffer byteBuffer) - { + public Frame acquireFrame(ByteBuffer byteBuffer) { return Frame.allocate(new UnsafeBuffer(byteBuffer)); } @@ -55,45 +51,36 @@ public void release(Frame frame) PER_THREAD_FRAME_QUEUE.get().offer(frame); } - public Frame acquireFrame(MutableDirectBuffer mutableDirectBuffer) - { + public Frame acquireFrame(MutableDirectBuffer mutableDirectBuffer) { Frame frame = pollFrame(); - if (null == frame) - { + if (null == frame) { frame = Frame.allocate(mutableDirectBuffer); } return frame; } - public MutableDirectBuffer acquireMutableDirectBuffer(ByteBuffer byteBuffer) - { + public MutableDirectBuffer acquireMutableDirectBuffer(ByteBuffer byteBuffer) { MutableDirectBuffer directBuffer = pollMutableDirectBuffer(); - if (null == directBuffer) - { + if (null == directBuffer) { directBuffer = new UnsafeBuffer(byteBuffer); } return directBuffer; } - public MutableDirectBuffer acquireMutableDirectBuffer(int size) - { + public MutableDirectBuffer acquireMutableDirectBuffer(int size) { UnsafeBuffer directBuffer = (UnsafeBuffer)pollMutableDirectBuffer(); - if (null == directBuffer || directBuffer.byteBuffer().capacity() < size) - { + if (null == directBuffer || directBuffer.byteBuffer().capacity() < size) { directBuffer = new UnsafeBuffer(ByteBuffer.allocate(size)); - } - else - { + } else { directBuffer.byteBuffer().limit(size).position(0); } return directBuffer; } - public void release(MutableDirectBuffer mutableDirectBuffer) - { + public void release(MutableDirectBuffer mutableDirectBuffer) { PER_THREAD_DIRECTBUFFER_QUEUE.get().offer(mutableDirectBuffer); } @@ -102,8 +89,7 @@ private Frame pollFrame() return PER_THREAD_FRAME_QUEUE.get().poll(); } - private MutableDirectBuffer pollMutableDirectBuffer() - { + private MutableDirectBuffer pollMutableDirectBuffer() { return PER_THREAD_DIRECTBUFFER_QUEUE.get().poll(); } } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java index bd0e00cbf..defff1721 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/ThreadSafeFramePool.java @@ -22,107 +22,85 @@ import java.nio.ByteBuffer; -public class ThreadSafeFramePool implements FramePool -{ +public class ThreadSafeFramePool implements FramePool { private static final int MAX_CACHED_FRAMES = 16; private final OneToOneConcurrentArrayQueue frameQueue; private final OneToOneConcurrentArrayQueue directBufferQueue; - public ThreadSafeFramePool() - { + public ThreadSafeFramePool() { this(MAX_CACHED_FRAMES, MAX_CACHED_FRAMES); } - public ThreadSafeFramePool(final int frameQueueLength, final int directBufferQueueLength) - { + public ThreadSafeFramePool(final int frameQueueLength, final int directBufferQueueLength) { frameQueue = new OneToOneConcurrentArrayQueue<>(frameQueueLength); directBufferQueue = new OneToOneConcurrentArrayQueue<>(directBufferQueueLength); } - public Frame acquireFrame(int size) - { + public Frame acquireFrame(int size) { final MutableDirectBuffer directBuffer = acquireMutableDirectBuffer(size); Frame frame = pollFrame(); - if (null == frame) - { + if (null == frame) { frame = Frame.allocate(directBuffer); } return frame; } - public Frame acquireFrame(ByteBuffer byteBuffer) - { + public Frame acquireFrame(ByteBuffer byteBuffer) { return Frame.allocate(new UnsafeBuffer(byteBuffer)); } - public Frame acquireFrame(MutableDirectBuffer mutableDirectBuffer) - { + public Frame acquireFrame(MutableDirectBuffer mutableDirectBuffer) { Frame frame = pollFrame(); - if (null == frame) - { + if (null == frame) { frame = Frame.allocate(mutableDirectBuffer); } return frame; } - public MutableDirectBuffer acquireMutableDirectBuffer(ByteBuffer byteBuffer) - { + public MutableDirectBuffer acquireMutableDirectBuffer(ByteBuffer byteBuffer) { MutableDirectBuffer directBuffer = pollMutableDirectBuffer(); - if (null == directBuffer) - { + if (null == directBuffer) { directBuffer = new UnsafeBuffer(byteBuffer); } return directBuffer; } - public MutableDirectBuffer acquireMutableDirectBuffer(int size) - { + public MutableDirectBuffer acquireMutableDirectBuffer(int size) { UnsafeBuffer directBuffer = (UnsafeBuffer)pollMutableDirectBuffer(); - if (null == directBuffer || directBuffer.capacity() < size) - { + if (null == directBuffer || directBuffer.capacity() < size) { directBuffer = new UnsafeBuffer(ByteBuffer.allocate(size)); - } - else - { + } else { directBuffer.byteBuffer().limit(size).position(0); } return directBuffer; } - public void release(Frame frame) - { - synchronized (frameQueue) - { + public void release(Frame frame) { + synchronized (frameQueue) { frameQueue.offer(frame); } } - public void release(MutableDirectBuffer mutableDirectBuffer) - { - synchronized (directBufferQueue) - { + public void release(MutableDirectBuffer mutableDirectBuffer) { + synchronized (directBufferQueue) { directBufferQueue.offer(mutableDirectBuffer); } } - private Frame pollFrame() - { - synchronized (frameQueue) - { + private Frame pollFrame() { + synchronized (frameQueue) { return frameQueue.poll(); } } - private MutableDirectBuffer pollMutableDirectBuffer() - { - synchronized (directBufferQueue) - { + private MutableDirectBuffer pollMutableDirectBuffer() { + synchronized (directBufferQueue) { return directBufferQueue.poll(); } } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java index 09f04288f..76883a944 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/frame/UnpooledFrame.java @@ -24,42 +24,32 @@ /** * On demand creation for Frames, MutableDirectBuffer backed by ByteBuffers of required capacity */ -public class UnpooledFrame implements FramePool -{ +public class UnpooledFrame implements FramePool { /* * TODO: have all gneration of UnsafeBuffer and ByteBuffer hidden behind acquire() calls (private for ByteBuffer) */ - public Frame acquireFrame(int size) - { + public Frame acquireFrame(int size) { return Frame.allocate(new UnsafeBuffer(ByteBuffer.allocate(size))); } - public Frame acquireFrame(ByteBuffer byteBuffer) - { + public Frame acquireFrame(ByteBuffer byteBuffer) { return Frame.allocate(new UnsafeBuffer(byteBuffer)); } - public void release(Frame frame) - { - } + public void release(Frame frame) {} - public Frame acquireFrame(MutableDirectBuffer mutableDirectBuffer) - { + public Frame acquireFrame(MutableDirectBuffer mutableDirectBuffer) { return Frame.allocate(mutableDirectBuffer); } - public MutableDirectBuffer acquireMutableDirectBuffer(ByteBuffer byteBuffer) - { + public MutableDirectBuffer acquireMutableDirectBuffer(ByteBuffer byteBuffer) { return new UnsafeBuffer(byteBuffer); } - public MutableDirectBuffer acquireMutableDirectBuffer(int size) - { + public MutableDirectBuffer acquireMutableDirectBuffer(int size) { return new UnsafeBuffer(ByteBuffer.allocate(size)); } - public void release(MutableDirectBuffer mutableDirectBuffer) - { - } + public void release(MutableDirectBuffer mutableDirectBuffer) {} }