Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
* An abstract implementation of {@link RSocket}. All request handling methods emit {@link
* UnsupportedOperationException} and hence must be overridden to provide a valid implementation.
*
* <p>{@link #close()} and {@link #onClose()} returns a {@code Publisher} that never terminates.
* <p>{@link #close()} returns a {@code Publisher} that immediately terminates. That same Publisher
* is returned by the {@link #onClose()} method.
*/
public abstract class AbstractRSocket implements RSocket {

Expand Down
3 changes: 2 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/Closeable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public interface Closeable {
* <p><em>This method is idempotent and hence can be called as many times at any point with same
* outcome.</em>
*
* @return A {@code Publisher} that completes when this {@code RSocket} close is complete.
* @return A {@code Publisher} that triggers the close when subscribed to and that completes when
* this {@code RSocket} close is complete.
*/
Mono<Void> close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package io.rsocket.frame;

import static io.rsocket.frame.FrameHeaderFlyweight.decodeMetadataLength;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.rsocket.Frame;
import io.rsocket.FrameType;
import org.reactivestreams.Subscriber;

import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -49,11 +49,21 @@ private FrameHeaderFlyweight() {}
private static final int STREAM_ID_FIELD_OFFSET;
private static final int PAYLOAD_OFFSET;

/** (I)gnore flag: a value of 0 indicates the protocol can't ignore this frame */
public static final int FLAGS_I = 0b10_0000_0000;
/** (M)etadata flag: a value of 1 indicates the frame contains metadata */
public static final int FLAGS_M = 0b01_0000_0000;

/**
* (F)ollows: More fragments follow this fragment (in case of fragmented REQUEST_x or PAYLOAD
* frames)
*/
public static final int FLAGS_F = 0b00_1000_0000;
/** (C)omplete: bit to indicate stream completion ({@link Subscriber#onComplete()}) */
public static final int FLAGS_C = 0b00_0100_0000;
/**
* (N)ext: bit to indicate payload or metadata present ({@link Subscriber#onNext(Object)})
*/
public static final int FLAGS_N = 0b00_0010_0000;

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import io.rsocket.FrameType;

public class KeepaliveFrameFlyweight {
/**
* (R)espond: Set by the sender of the KEEPALIVE, to which the responder MUST reply with a
* KEEPALIVE without the R flag set
*/
public static final int FLAGS_KEEPALIVE_R = 0b00_1000_0000;

private KeepaliveFrameFlyweight() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,20 @@
public class SetupFrameFlyweight {
private SetupFrameFlyweight() {}

/**
* A flag used to indicate that the client requires connection resumption, if possible (the frame
* contains a Resume Identification Token)
*/
public static final int FLAGS_RESUME_ENABLE = 0b00_1000_0000;
/** A flag used to indicate that the client will honor LEASE sent by the server */
public static final int FLAGS_WILL_HONOR_LEASE = 0b00_0100_0000;
/**
* (obsolete) flag indicating that the server should reject the SETUP if it finds anything
* in the data or metadata that it doesn't understand
*
* @deprecated removed between protocol version 0.2 and 1.0RC
*/
@Deprecated
public static final int FLAGS_STRICT_INTERPRETATION = 0b00_0010_0000;

public static final int VALID_FLAGS =
Expand Down