Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/java/io/reactivesocket/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ public void wrap(final int streamId, final FrameType type, final ByteBuffer data
// SETUP specific getters
public static class Setup
{

private Setup() {}

public static Frame from(
int flags,
int keepaliveInterval,
Expand Down Expand Up @@ -317,6 +320,9 @@ public static String dataMimeType(final Frame frame)

public static class Error
{

private Error() {}

public static Frame from(
int streamId,
final Throwable throwable,
Expand Down Expand Up @@ -360,6 +366,8 @@ public static int errorCode(final Frame frame)

public static class Lease
{
private Lease() {}

public static Frame from(int ttl, int numberOfRequests, ByteBuffer metadata)
{
final Frame frame = POOL.acquireFrame(LeaseFrameFlyweight.computeFrameLength(metadata.remaining()));
Expand All @@ -383,6 +391,8 @@ public static int numberOfRequests(final Frame frame)

public static class RequestN
{
private RequestN() {}

public static Frame from(int streamId, int requestN)
{
final Frame frame = POOL.acquireFrame(RequestNFrameFlyweight.computeFrameLength());
Expand All @@ -400,6 +410,8 @@ public static long requestN(final Frame frame)

public static class Request
{
private Request() {}

public static Frame from(int streamId, FrameType type, Payload payload, int initialRequestN)
{
final ByteBuffer d = payload.getData() != null ? payload.getData() : NULL_BYTEBUFFER;
Expand Down Expand Up @@ -473,6 +485,9 @@ public static boolean isRequestChannelComplete(final Frame frame)

public static class Response
{

private Response() {}

public static Frame from(int streamId, FrameType type, Payload payload)
{
final ByteBuffer data = payload.getData() != null ? payload.getData() : NULL_BYTEBUFFER;
Expand Down Expand Up @@ -507,6 +522,9 @@ public static Frame from(int streamId, FrameType type)

public static class Cancel
{

private Cancel() {}

public static Frame from(int streamId)
{
final Frame frame =
Expand All @@ -520,6 +538,9 @@ public static Frame from(int streamId)

public static class Keepalive
{

private Keepalive() {}

public static Frame from(ByteBuffer data, boolean respond)
{
final Frame frame =
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivesocket/FrameType.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public enum FrameType

private static class Flags
{
private Flags() {}

private static final int CAN_HAVE_DATA = 0b0001;
private static final int CAN_HAVE_METADATA = 0b0010;
private static final int CAN_HAVE_METADATA_AND_DATA = 0b0011;
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/reactivesocket/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import static java.nio.charset.StandardCharsets.UTF_8;

public class Exceptions {

private Exceptions() {}

public static Throwable from(Frame frame) {
final int errorCode = Frame.Error.errorCode(frame);
String message = "<empty message>";
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivesocket/internal/PublisherUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

public class PublisherUtils {

private PublisherUtils() {}

// TODO: be better about using scheduler for this
public static final ScheduledExecutorService SCHEDULER_THREAD = Executors.newScheduledThreadPool(1,
(r) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

public class ByteBufferUtil
{

private ByteBufferUtil() {}

/**
* Slice a portion of the {@link ByteBuffer} while preserving the buffers position and limit.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import java.nio.ByteOrder;

public class ErrorFrameFlyweight {

private ErrorFrameFlyweight() {}

// defined error codes
public static final int INVALID_SETUP = 0x0001;
public static final int UNSUPPORTED_SETUP = 0x0002;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
*/
public class FrameHeaderFlyweight
{

private FrameHeaderFlyweight() {}

public static final ByteBuffer NULL_BYTEBUFFER = ByteBuffer.allocate(0);

public static final int FRAME_HEADER_LENGTH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

public class KeepaliveFrameFlyweight
{
private KeepaliveFrameFlyweight() {}

private static final int PAYLOAD_OFFSET = FrameHeaderFlyweight.FRAME_HEADER_LENGTH;

public static int computeFrameLength(final int dataLength)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

public class LeaseFrameFlyweight
{
private LeaseFrameFlyweight() {}

// relative to start of passed offset
private static final int TTL_FIELD_OFFSET = FrameHeaderFlyweight.FRAME_HEADER_LENGTH;
private static final int NUM_REQUESTS_FIELD_OFFSET = TTL_FIELD_OFFSET + BitUtil.SIZE_OF_INT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

public class RequestFrameFlyweight
{

private RequestFrameFlyweight() {}

public static final int FLAGS_REQUEST_CHANNEL_C = 0b0001_0000_0000_0000;
public static final int FLAGS_REQUEST_CHANNEL_N = 0b0000_1000_0000_0000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

public class RequestNFrameFlyweight
{
private RequestNFrameFlyweight() {}

// relative to start of passed offset
private static final int REQUEST_N_FIELD_OFFSET = FrameHeaderFlyweight.FRAME_HEADER_LENGTH;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

public class SetupFrameFlyweight
{
private SetupFrameFlyweight() {}

public static final int FLAGS_WILL_HONOR_LEASE = 0b0010_0000;
public static final int FLAGS_STRICT_INTERPRETATION = 0b0001_0000;

Expand Down
2 changes: 2 additions & 0 deletions src/test/java/io/reactivesocket/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

public class TestUtil
{
private TestUtil() {}

public static Frame utf8EncodedRequestFrame(final int streamId, final FrameType type, final String data, final int initialRequestN)
{
return Frame.Request.from(streamId, type, new Payload()
Expand Down