Skip to content

Commit

Permalink
Merge pull request #1836 from rejectedsoftware/1534_websockets
Browse files Browse the repository at this point in the history
Improve web sockets and add autobahn test suite client
merged-on-behalf-of: Sönke Ludwig <s-ludwig@users.noreply.github.com>
  • Loading branch information
dlang-bot committed Jul 12, 2017
2 parents e317840 + f8bbddc commit 3ca4a94
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 46 deletions.
161 changes: 117 additions & 44 deletions http/vibe/http/websockets.d
@@ -1,7 +1,7 @@
/**
Implements WebSocket support and fallbacks for older browsers.
Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
Copyright: © 2012-2014 RejectedSoftware e.K.
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
Authors: Jan Krüger
Expand Down Expand Up @@ -62,7 +62,7 @@ alias WebSocketHandshakeDelegate = void delegate(scope WebSocket);
/// Exception thrown by $(D vibe.http.websockets).
class WebSocketException: Exception
{
@safe:
@safe pure nothrow:

///
this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
Expand Down Expand Up @@ -337,7 +337,9 @@ final class WebSocket {
bool m_pongSkipped;
short m_closeCode;
const(char)[] m_closeReason;
RandomNumberStream m_rng;
/// The entropy generator to use
/// If not null, it means this is a server socket.
RandomNumberStream m_rng;
}

/**
Expand Down Expand Up @@ -450,7 +452,9 @@ final class WebSocket {
*/
void send(scope const(char)[] data)
{
send((scope message){ message.write(cast(const ubyte[])data); });
send(
(scope message) { message.write(cast(const ubyte[])data); },
FrameOpcode.text);
}

/**
Expand All @@ -462,7 +466,7 @@ final class WebSocket {
A `WebSocketException` is thrown if the connection gets closed
before or during the transfer of the message.
*/
void send(ubyte[] data)
void send(in ubyte[] data)
{
send((scope message){ message.write(data); }, FrameOpcode.binary);
}
Expand All @@ -474,7 +478,7 @@ final class WebSocket {
A `WebSocketException` is thrown if the connection gets closed
before or during the transfer of the message.
*/
void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode = FrameOpcode.text)
void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode)
{
m_writeMutex.performLocked!({
enforceEx!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
Expand All @@ -484,6 +488,13 @@ final class WebSocket {
});
}

/// Compatibility overload - will be removed soon.
deprecated("Call the overload which requires an explicit FrameOpcode.")
void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender)
{
send(sender, FrameOpcode.text);
}

/**
Actively closes the connection.
Expand Down Expand Up @@ -649,7 +660,7 @@ final class WebSocket {
final class OutgoingWebSocketMessage : OutputStream {
@safe:
private {
RandomNumberStream m_rng;
RandomNumberStream m_rng;
Stream m_conn;
FrameOpcode m_frameOpcode;
Appender!(ubyte[]) m_buffer;
Expand All @@ -661,7 +672,7 @@ final class OutgoingWebSocketMessage : OutputStream {
assert(conn !is null);
m_conn = conn;
m_frameOpcode = frameOpcode;
m_rng = rng;
m_rng = rng;
}

size_t write(in ubyte[] bytes, IOMode mode)
Expand Down Expand Up @@ -720,7 +731,7 @@ final class OutgoingWebSocketMessage : OutputStream {
final class IncomingWebSocketMessage : InputStream {
@safe:
private {
RandomNumberStream m_rng;
RandomNumberStream m_rng;
Stream m_conn;
Frame m_currentFrame;
}
Expand All @@ -729,8 +740,8 @@ final class IncomingWebSocketMessage : InputStream {
{
assert(conn !is null);
m_conn = conn;
m_rng = rng;
readFrame();
m_rng = rng;
readFrame();
}

@property bool empty() const { return m_currentFrame.payload.length == 0; }
Expand All @@ -744,6 +755,27 @@ final class IncomingWebSocketMessage : InputStream {

const(ubyte)[] peek() { return m_currentFrame.payload; }

/**
* Retrieve the next websocket frame of the stream and discard the current
* one
*
* This function is helpful if one wish to process frames by frames,
* or minimize memory allocation, as `peek` will only return the current
* frame data, and read requires a pre-allocated buffer.
*
* Returns:
* `false` if the current frame is the final one, `true` if a new frame
* was read.
*/
bool skipFrame()
{
if (m_currentFrame.fin)
return false;

m_currentFrame = Frame.readFrame(m_conn);
return true;
}

size_t read(scope ubyte[] dst, IOMode mode)
{
size_t nread = 0;
Expand All @@ -759,10 +791,10 @@ final class IncomingWebSocketMessage : InputStream {
m_currentFrame.payload = m_currentFrame.payload[sz .. $];
nread += sz;

if (leastSize == 0 && !m_currentFrame.fin) {
if (leastSize == 0) {
if (mode == IOMode.immediate || mode == IOMode.once && nread > 0)
break;
m_currentFrame = Frame.readFrame(m_conn);
this.skipFrame();
}
}

Expand Down Expand Up @@ -798,24 +830,26 @@ final class IncomingWebSocketMessage : InputStream {
}
}

/// Magic string defined by the RFC for challenging the server during upgrade
private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

private immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

/**
* The Opcode is 4 bytes, as defined in Section 5.2
* The Opcode is 4 bits, as defined in Section 5.2
*
* Values are defined in section 11.8
* Currently only 6 values are defined, however the opcode is defined as
* taking 4 bytes.
* taking 4 bits.
*/
private enum FrameOpcode : uint {
private enum FrameOpcode : ubyte {
continuation = 0x0,
text = 0x1,
binary = 0x2,
close = 0x8,
ping = 0x9,
pong = 0xA
}
static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits");


private struct Frame {
Expand All @@ -826,6 +860,32 @@ private struct Frame {
FrameOpcode opcode;
ubyte[] payload;

/**
* Return the header length encoded with the expected amount of bits
*
* The WebSocket RFC define a variable-length payload length.
* In short, it means that:
* - If the length is <= 125, it is stored as the 7 least significant
* bits of the second header byte. The first bit is reserved for MASK.
* - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of
* 126 is stored in the aforementioned 7 bits, and the actual length
* is stored in the next two bytes, resulting in a 4 bytes header
* ( + masking key, if any).
* - If the length is > 65_536, a magic value of 127 will be used for
* the 7-bit field, and the next 8 bytes are expected to be the length,
* resulting in a 10 bytes header ( + masking key, if any).
*
* Those functions encapsulate all this logic and allow to just get the
* length with the desired size.
*
* Return:
* - For `ubyte`, the value to store in the 7 bits field, either the
* length or a magic value (126 or 127).
* - For `ushort`, a value in the range [126; 65_536].
* If payload.length is not in this bound, an assertion will be triggered.
* - For `ulong`, a value in the range [65_537; size_t.max].
* If payload.length is not in this bound, an assertion will be triggered.
*/
size_t getHeaderSize(bool mask)
{
size_t ret = 1;
Expand All @@ -838,7 +898,7 @@ private struct Frame {

void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
{
ubyte[4] buff;
ubyte[4] buff;
ubyte firstByte = cast(ubyte)opcode;
if (fin) firstByte |= 0x80;
dst[0] = firstByte;
Expand Down Expand Up @@ -881,38 +941,51 @@ private struct Frame {
static Frame readFrame(InputStream stream)
{
Frame frame;
ubyte[2] data2;
ubyte[8] data8;
stream.read(data2);
//enforceEx!WebSocketException( (data[0] & 0x70) != 0, "reserved bits must be unset" );
frame.fin = (data2[0] & 0x80) == 0x80;
bool masked = (data2[1] & 0x80) == 0x80;
frame.opcode = cast(FrameOpcode)(data2[0] & 0xf);

logDebug("Read frame: %s %s", frame.opcode, frame.fin);
//parsing length
ulong length = data2[1] & 0x7f;
if( length == 126 ) {
stream.read(data2);
length = bigEndianToNative!ushort(data2);
} else if( length == 127 ) {
stream.read(data8);
length = bigEndianToNative!ulong(data8);
}
ubyte[8] data;

//masking key
ubyte[4] maskingKey;
if( masked ) stream.read(maskingKey);
stream.read(data[0 .. 2]);
frame.fin = (data[0] & 0x80) != 0;
frame.opcode = cast(FrameOpcode)(data[0] & 0x0F);

//payload
bool masked = !!(data[1] & 0b1000_0000);

//parsing length
ulong length = data[1] & 0b0111_1111;
if (length == 126) {
stream.read(data[0 .. 2]);
length = bigEndianToNative!ushort(data[0 .. 2]);
} else if (length == 127) {
stream.read(data);
length = bigEndianToNative!ulong(data);

// RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes
// interpreted as a 64-bit unsigned integer (the most significant
// bit MUST be 0)
enforceEx!WebSocketException(!(length >> 63),
"Received length has a non-zero most significant bit");

}
logDebug("Read frame: %s %s %s length=%d",
frame.opcode,
frame.fin ? "final frame" : "continuation",
masked ? "masked" : "not masked",
length);

// Masking key is 32 bits / uint
if (masked)
stream.read(data[0 .. 4]);

// Read payload
// TODO: Provide a way to limit the size read, easy
// DOS for server code here (rejectedsoftware/vibe.d#1496).
enforceEx!WebSocketException(length <= size_t.max);
frame.payload = new ubyte[cast(size_t)length];
frame.payload = new ubyte[](cast(size_t)length);
stream.read(frame.payload);

//de-masking
for( size_t i = 0; i < length; ++i ) {
frame.payload[i] = frame.payload[i] ^ maskingKey[i % 4];
}
if (masked)
foreach (size_t i; 0 .. cast(size_t)length)
frame.payload[i] = frame.payload[i] ^ data[i % 4];

return frame;
}
Expand Down
@@ -0,0 +1,6 @@
websockets-autobahn-client
.dub
docs.json
__dummy.html
*.o
*.obj
@@ -0,0 +1,9 @@
{
"name": "websockets-autobahn-client",
"dependencies": {
"vibe-d": { "path": "../../" }
},
"versions": [
"VibeDefaultMain"
]
}
@@ -0,0 +1,41 @@
import vibe.d;

shared static this ()
{
runTask(() => runTestSuite());
}

void runTestSuite ()
{
auto count = getCaseCount();
logInfo("We're going to run %d test cases...", count);

foreach (currCase; 1 .. count)
{
auto url = URL("ws://127.0.0.1:9001/runCase?agent=vibe.d&case="
~ to!string(currCase));
logInfo("Running test case %d/%d", currCase, count);
connectWebSocket(
url, (scope ws) {
while (ws.waitForData) {
ws.receive((scope message) {
ws.send(message.readAll);
});
}
});
}
}


size_t getCaseCount (string base_addr = "ws://127.0.0.1:9001")
{
size_t ret;
auto url = URL(base_addr ~ "/getCaseCount");
connectWebSocket(
url, (scope ws) {
while (ws.waitForData) {
ret = ws.receiveText.to!size_t;
}
});
return ret;
}
6 changes: 4 additions & 2 deletions travis-ci.sh
Expand Up @@ -44,7 +44,9 @@ if [ ${BUILD_EXAMPLE=1} -eq 1 ]; then
fi
if [ ${RUN_TEST=1} -eq 1 ]; then
for ex in `\ls -1 tests/`; do
echo "[INFO] Running test $ex"
(cd tests/$ex && dub --compiler=$DC --override-config=vibe-d:core/$VIBED_DRIVER $DUB_ARGS && dub clean)
if [ -r test/$ex/dub.json ] || [ -r test/$ex/dub.sdl ]; then
echo "[INFO] Running test $ex"
(cd tests/$ex && dub --compiler=$DC --override-config=vibe-d:core/$VIBED_DRIVER $DUB_ARGS && dub clean)
fi
done
fi

0 comments on commit 3ca4a94

Please sign in to comment.