Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/main/java/io/reactivesocket/DuplexConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

import org.reactivestreams.Publisher;

/**
* Represents a connection with input/output that the protocol uses.
*/
public interface DuplexConnection {
// TODO should we call this 'Connection'? 'SocketConnection'? 'ReactiveSocketConnection'?

Publisher<Message> getInput();

Publisher<Void> write(Message o);
Publisher<Void> write(Publisher<Message> o);

}
263 changes: 134 additions & 129 deletions src/main/java/io/reactivesocket/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,135 +17,140 @@

import java.nio.ByteBuffer;

/**
* Represents a Message sent over a {@link DuplexConnection}.
* <p>
* This provides encoding, decoding and field accessors.
*/
public class Message {

private Message() {
}

// not final so we can reuse this object
private ByteBuffer b;
private int messageId;
private MessageType type;
private String message;

public ByteBuffer getBytes() {
return b;
}

public String getMessage() {
if (type == null) {
decode();
}
return message;
}

public int getMessageId() {
if (type == null) {
decode();
}
return messageId;
}

public MessageType getMessageType() {
if (type == null) {
decode();
}
return type;
}

/**
* Mutates this Frame to contain the given ByteBuffer
*
* @param b
*/
public void wrap(ByteBuffer b) {
this.messageId = -1;
this.type = null;
this.message = null;
this.b = b;
}

/**
* Construct a new Frame from the given ByteBuffer
*
* @param b
* @return
*/
public static Message from(ByteBuffer b) {
Message f = new Message();
f.b = b;
return f;
}

/**
* Mutates this Frame to contain the given message.
*
* @param messageId
* @param type
* @param message
*/
public void wrap(int messageId, MessageType type, String message) {
this.messageId = messageId;
this.type = type;
this.message = message;
this.b = getBytes(messageId, type, message);
}

/**
* Construct a new Frame with the given message.
*
* @param messageId
* @param type
* @param message
* @return
*/
public static Message from(int messageId, MessageType type, String message) {
Message f = new Message();
f.b = getBytes(messageId, type, message);
f.messageId = messageId;
f.type = type;
f.message = message;
return f;
}

private static ByteBuffer getBytes(int messageId, MessageType type, String message) {
// TODO replace with binary
/**
* This is NOT how we want it for real. Just representing the idea for discussion.
*/
String s = "[" + type.ordinal() + "]" + getIdString(messageId) + message;
// TODO stop allocating ... use flywheels
return ByteBuffer.wrap(s.getBytes());
}

private static String getIdString(int id) {
return "[" + id + "]|";
}

private void decode() {
// TODO replace with binary
/**
* This is NOT how we want it for real. Just representing the idea for discussion.
*/
byte[] copy = new byte[b.limit()];
b.get(copy);
String data = new String(copy);
int separator = data.indexOf('|');
String prefix = data.substring(0, separator);
this.type = MessageType.values[Integer.parseInt(prefix.substring(1, data.indexOf(']')))];
this.messageId = Integer.parseInt(prefix.substring(prefix.lastIndexOf("[") + 1, prefix.length() - 1));
this.message = data.substring(separator + 1, data.length());
}

@Override
public String toString() {
if (type == null) {
try {
decode();
} catch (Exception e) {
e.printStackTrace();
}
}
return "Message => ID: " + messageId + " Type: " + type + " Data: " + message;
}
private Message() {
}

// not final so we can reuse this object
private ByteBuffer b;
private int streamId;
private MessageType type;
private String message;

public ByteBuffer getBytes() {
return b;
}

public String getMessage() {
if (type == null) {
decode();
}
return message;
}

public int getStreamId() {
if (type == null) {
decode();
}
return streamId;
}

public MessageType getMessageType() {
if (type == null) {
decode();
}
return type;
}

/**
* Mutates this Frame to contain the given ByteBuffer
*
* @param b
*/
public void wrap(ByteBuffer b) {
this.streamId = -1;
this.type = null;
this.message = null;
this.b = b;
}

/**
* Construct a new Frame from the given ByteBuffer
*
* @param b
* @return
*/
public static Message from(ByteBuffer b) {
Message f = new Message();
f.b = b;
return f;
}

/**
* Mutates this Frame to contain the given message.
*
* @param streamId
* @param type
* @param message
*/
public void wrap(int streamId, MessageType type, String message) {
this.streamId = streamId;
this.type = type;
this.message = message;
this.b = getBytes(streamId, type, message);
}

/**
* Construct a new Frame with the given message.
*
* @param streamId
* @param type
* @param message
* @return
*/
public static Message from(int streamId, MessageType type, String message) {
Message f = new Message();
f.b = getBytes(streamId, type, message);
f.streamId = streamId;
f.type = type;
f.message = message;
return f;
}

private static ByteBuffer getBytes(int messageId, MessageType type, String message) {
// TODO replace with binary
/**
* This is NOT how we want it for real. Just representing the idea for discussion.
*/
String s = "[" + type.getMessageId() + "]" + getIdString(messageId) + message;
// TODO stop allocating ... use flywheels
return ByteBuffer.wrap(s.getBytes());
}

private static String getIdString(int id) {
return "[" + id + "]|";
}

private void decode() {
// TODO replace with binary
/**
* This is NOT how we want it for real. Just representing the idea for discussion.
*/
byte[] copy = new byte[b.limit()];
b.get(copy);
String data = new String(copy);
int separator = data.indexOf('|');
String prefix = data.substring(0, separator);
this.type = MessageType.from(Integer.parseInt(prefix.substring(1, data.indexOf(']'))));
this.streamId = Integer.parseInt(prefix.substring(prefix.lastIndexOf("[") + 1, prefix.length() - 1));
this.message = data.substring(separator + 1, data.length());
}

@Override
public String toString() {
if (type == null) {
try {
decode();
} catch (Exception e) {
e.printStackTrace();
}
}
return "Message => ID: " + streamId + " Type: " + type + " Data: " + message;
}
}
53 changes: 49 additions & 4 deletions src/main/java/io/reactivesocket/MessageType.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,53 @@
*/
package io.reactivesocket;

enum MessageType {
// DO NOT REORDER OR INSERT NEW ELEMENTS. THE ORDINAL IS PART OF THE PROTOCOL
SUBSCRIBE_REQUEST_RESPONSE, SUBSCRIBE_STREAM, STREAM_REQUEST, DISPOSE, NEXT_COMPLETE, NEXT, ERROR, COMPLETE;
public static MessageType[] values = MessageType.values(); // cached for performance
/**
* Types of {@link Message} that can be sent.
*/
public enum MessageType {

SETUP(0x01),
// Messages from Requestor
REQUEST_RESPONSE(0x11),
FIRE_AND_FORGET(0x12),
REQUEST_STREAM(0x13),
REQUEST_SUBSCRIPTION(0x14),
REQUEST_N(0x15),
CANCEL(0x16),
// Messages from Responder
NEXT(0x22),
COMPLETE(0x23),
ERROR(0x24);

private static MessageType[] typesById;

/**
* Index types by id for indexed lookup.
*/
static {
int max = 0;
for (MessageType t : values()) {
if (t.id > max) {
max = t.id;
}
}
typesById = new MessageType[max + 1];
for (MessageType t : values()) {
typesById[t.id] = t;
}
}

private final int id;

private MessageType(int id) {
this.id = id;
}

public int getMessageId() {
return id;
}

public static MessageType from(int id) {
return typesById[id];
}
}
Loading