Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/io/reactivesocket/ConnectionSetupPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ public int getFlags()

public boolean willClientHonorLease()
{
return (HONOR_LEASE == (getFlags() & HONOR_LEASE));
return HONOR_LEASE == (getFlags() & HONOR_LEASE);
}

public boolean doesClientRequestStrictInterpretation()
{
return (STRICT_INTERPRETATION == (getFlags() & STRICT_INTERPRETATION));
return STRICT_INTERPRETATION == (getFlags() & STRICT_INTERPRETATION);
}
}
4 changes: 2 additions & 2 deletions src/main/java/io/reactivesocket/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public static Frame from(
final Throwable throwable,
ByteBuffer metadata
) {
String data = (throwable.getMessage() == null ? "" : throwable.getMessage());
String data = throwable.getMessage() == null ? "" : throwable.getMessage();
byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
final ByteBuffer dataBuffer = ByteBuffer.wrap(bytes);

Expand Down Expand Up @@ -525,7 +525,7 @@ public static Frame from(ByteBuffer data, boolean respond)
final Frame frame =
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.KEEPALIVE, 0, data.remaining()));

final int flags = (respond ? FrameHeaderFlyweight.FLAGS_KEEPALIVE_R : 0);
final int flags = respond ? FrameHeaderFlyweight.FLAGS_KEEPALIVE_R : 0;

frame.length = FrameHeaderFlyweight.encode(
frame.directBuffer, frame.offset, 0, flags, FrameType.KEEPALIVE, Frame.NULL_BYTEBUFFER, data);
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/reactivesocket/FrameType.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,22 @@ public int getEncodedType() {

public boolean isRequestType()
{
return (Flags.IS_REQUEST_TYPE == (flags & Flags.IS_REQUEST_TYPE));
return Flags.IS_REQUEST_TYPE == (flags & Flags.IS_REQUEST_TYPE);
}

public boolean hasInitialRequestN()
{
return (Flags.HAS_INITIAL_REQUEST_N == (flags & Flags.HAS_INITIAL_REQUEST_N));
return Flags.HAS_INITIAL_REQUEST_N == (flags & Flags.HAS_INITIAL_REQUEST_N);
}

public boolean canHaveData()
{
return (Flags.CAN_HAVE_DATA == (flags & Flags.CAN_HAVE_DATA));
return Flags.CAN_HAVE_DATA == (flags & Flags.CAN_HAVE_DATA);
}

public boolean canHaveMetadata()
{
return (Flags.CAN_HAVE_METADATA == (flags & Flags.CAN_HAVE_METADATA));
return Flags.CAN_HAVE_METADATA == (flags & Flags.CAN_HAVE_METADATA);
}

// TODO: offset of metadata and data (simplify parsing) naming: endOfFrameHeaderOffset()
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/reactivesocket/RequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@
public abstract class RequestHandler {

public static final Function<Payload, Publisher<Payload>> NO_REQUEST_RESPONSE_HANDLER =
(payload -> PublisherUtils.errorPayload(new RuntimeException("No 'requestResponse' handler")));
payload -> PublisherUtils.errorPayload(new RuntimeException("No 'requestResponse' handler"));

public static final Function<Payload, Publisher<Payload>> NO_REQUEST_STREAM_HANDLER =
(payload -> PublisherUtils.errorPayload(new RuntimeException("No 'requestStream' handler")));
payload -> PublisherUtils.errorPayload(new RuntimeException("No 'requestStream' handler"));

public static final Function<Payload, Publisher<Payload>> NO_REQUEST_SUBSCRIPTION_HANDLER =
(payload -> PublisherUtils.errorPayload(new RuntimeException("No 'requestSubscription' handler")));
payload -> PublisherUtils.errorPayload(new RuntimeException("No 'requestSubscription' handler"));

public static final Function<Payload, Publisher<Void>> NO_FIRE_AND_FORGET_HANDLER =
(payload -> PublisherUtils.errorVoid(new RuntimeException("No 'fireAndForget' handler")));
payload -> PublisherUtils.errorVoid(new RuntimeException("No 'fireAndForget' handler"));

public static final Function<Publisher<Payload>, Publisher<Payload>> NO_REQUEST_CHANNEL_HANDLER =
(payloads) -> PublisherUtils.errorPayload(new RuntimeException("No 'requestChannel' handler"));
payloads -> PublisherUtils.errorPayload(new RuntimeException("No 'requestChannel' handler"));

public static final Function<Payload, Publisher<Void>> NO_METADATA_PUSH_HANDLER =
(payload -> PublisherUtils.errorVoid(new RuntimeException("No 'metadataPush' handler")));
payload -> PublisherUtils.errorVoid(new RuntimeException("No 'metadataPush' handler"));

public abstract Publisher<Payload> handleRequestResponse(final Payload payload);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void cancel()
}

public static final Publisher<Frame> fromIterable(Iterable<Frame> is) {
return new PublisherIterableSource<Frame>(is);
return new PublisherIterableSource<>(is);
}

public static final class PublisherIterableSource<T> extends AtomicBoolean implements Publisher<T> {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/reactivesocket/internal/UnicastSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class UnicastSubject<T> implements Subscriber<T>, Publisher<T> {
private boolean subscribedTo = false;

public static <T> UnicastSubject<T> create() {
return new UnicastSubject<T>(null, r -> {});
return new UnicastSubject<>(null, r -> {});
}

/**
Expand All @@ -46,15 +46,15 @@ public static <T> UnicastSubject<T> create() {
* @return
*/
public static <T> UnicastSubject<T> create(BiConsumer<UnicastSubject<T>, Long> onConnect, Consumer<Long> onRequest) {
return new UnicastSubject<T>(onConnect, onRequest);
return new UnicastSubject<>(onConnect, onRequest);
}

/**
* @param onConnect Called when first requestN > 0 occurs.
* @return
*/
public static <T> UnicastSubject<T> create(BiConsumer<UnicastSubject<T>, Long> onConnect) {
return new UnicastSubject<T>(onConnect, r -> {});
return new UnicastSubject<>(onConnect, r -> {});
}

private UnicastSubject(BiConsumer<UnicastSubject<T>, Long> onConnect, Consumer<Long> onRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private static int metadataFieldLength(final DirectBuffer directBuffer, final in

if (FLAGS_M == (FLAGS_M & directBuffer.getShort(offset + FLAGS_FIELD_OFFSET, ByteOrder.BIG_ENDIAN)))
{
metadataLength = (directBuffer.getInt(metadataOffset(directBuffer, offset), ByteOrder.BIG_ENDIAN) & 0xFFFFFF);
metadataLength = directBuffer.getInt(metadataOffset(directBuffer, offset), ByteOrder.BIG_ENDIAN) & 0xFFFFFF;
}

return metadataLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static boolean requiresFragmenting(final int metadataMtu, final int dataM
final ByteBuffer metadata = payload.getMetadata();
final ByteBuffer data = payload.getData();

return (metadata.remaining() > metadataMtu || data.remaining() > dataMtu);
return metadata.remaining() > metadataMtu || data.remaining() > dataMtu;
}

public Iterator<Frame> iterator()
Expand All @@ -84,7 +84,7 @@ public Iterator<Frame> iterator()

public boolean hasNext()
{
return (dataOffset < data.capacity() || metadataOffset < metadata.remaining());
return dataOffset < data.capacity() || metadataOffset < metadata.remaining();
}

public Frame next()
Expand All @@ -103,7 +103,7 @@ public Frame next()
metadataOffset += metadataLength;
dataOffset += dataLength;

final boolean isMoreFollowing = (metadataOffset < metadata.remaining() || dataOffset < data.remaining());
final boolean isMoreFollowing = metadataOffset < metadata.remaining() || dataOffset < data.remaining();
int flags = 0;

if (Type.RESPONSE == type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public final class PerfUnicastSubjectNoBackpressure<T> implements Observable<T>,
private boolean subscribedTo = false;

public static <T> PerfUnicastSubjectNoBackpressure<T> create() {
return new PerfUnicastSubjectNoBackpressure<T>(null);
return new PerfUnicastSubjectNoBackpressure<>(null);
}

/**
* @param onConnect Called when first requestN > 0 occurs.
* @return
*/
public static <T> PerfUnicastSubjectNoBackpressure<T> create(Consumer<PerfUnicastSubjectNoBackpressure<T>> onConnect) {
return new PerfUnicastSubjectNoBackpressure<T>(onConnect);
return new PerfUnicastSubjectNoBackpressure<>(onConnect);
}

private PerfUnicastSubjectNoBackpressure(Consumer<PerfUnicastSubjectNoBackpressure<T>> onConnect) {
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/reactivesocket/ReactiveSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public class ReactiveSocketTest {
private ReactiveSocket socketServer;
private ReactiveSocket socketClient;
private AtomicBoolean helloSubscriptionRunning = new AtomicBoolean(false);
private AtomicReference<String> lastFireAndForget = new AtomicReference<String>();
private AtomicReference<String> lastMetadataPush = new AtomicReference<String>();
private AtomicReference<Throwable> lastServerError = new AtomicReference<Throwable>();
private AtomicReference<String> lastFireAndForget = new AtomicReference<>();
private AtomicReference<String> lastMetadataPush = new AtomicReference<>();
private AtomicReference<Throwable> lastServerError = new AtomicReference<>();
private CountDownLatch lastServerErrorCountDown;
private CountDownLatch fireAndForgetOrMetadataPush;

Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/reactivesocket/TestTransportRequestN.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void testRequestChannelUpstreamWithNFromTransport() throws InterruptedExc
private ReactiveSocket socketServer;
private ReactiveSocket socketClient;
private AtomicBoolean helloSubscriptionRunning = new AtomicBoolean(false);
private AtomicReference<Throwable> lastServerError = new AtomicReference<Throwable>();
private AtomicReference<Throwable> lastServerError = new AtomicReference<>();
private CountDownLatch lastServerErrorCountDown;

public void setup(TestConnectionWithControlledRequestN clientConnection, TestConnectionWithControlledRequestN serverConnection) throws InterruptedException {
Expand Down