Skip to content

Commit

Permalink
Merge pull request #1848 from rejectedsoftware/issue1471_websocket_ping
Browse files Browse the repository at this point in the history
Fix WebSocket ping logic. Fixes #1471.
  • Loading branch information
s-ludwig committed Jul 17, 2017
2 parents f960427 + 9c11b67 commit e3d6217
Showing 1 changed file with 59 additions and 104 deletions.
163 changes: 59 additions & 104 deletions http/vibe/http/websockets.d
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ final class WebSocket {
Timer m_pingTimer;
uint m_lastPingIndex;
bool m_pongReceived;
bool m_pongSkipped;
short m_closeCode;
const(char)[] m_closeReason;
/// The entropy generator to use
Expand Down Expand Up @@ -508,15 +507,12 @@ final class WebSocket {
assert(reason.length <= 123);

if (connected) {
m_writeMutex.performLocked!({
send((scope msg) {
m_sentCloseFrame = true;
Frame frame;
frame.opcode = FrameOpcode.close;
if(code != 0)
frame.payload = std.bitmanip.nativeToBigEndian(code) ~ cast(const ubyte[])reason;
frame.fin = true;
frame.writeFrame(m_conn, m_rng);
});
if (code != 0)
msg.write(std.bitmanip.nativeToBigEndian(code));
msg.write(cast(const ubyte[])reason);
}, FrameOpcode.close);
}
if (m_pingTimer) m_pingTimer.stop();
if (Task.getThis() != m_reader) m_reader.join();
Expand Down Expand Up @@ -576,49 +572,51 @@ final class WebSocket {
try {
while (!m_conn.empty) {
assert(!m_nextMessage);
if (m_pingTimer) {
if (m_pongSkipped) {
logDebug("Pong not received, closing connection");
m_writeMutex.performLocked!({
m_conn.close();
});
return;
}
if (!m_conn.waitForData(request.serverSettings.webSocketPingInterval))
continue;
}
/*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng);
if (msg.frameOpcode == FrameOpcode.pong) {
enforce(msg.peek().length == uint.sizeof, "Pong payload has wrong length");
enforce(m_lastPingIndex == littleEndianToNative!uint(msg.peek()[0..uint.sizeof]), "Pong payload has wrong value");
m_pongReceived = true;
continue;
}
if(msg.frameOpcode == FrameOpcode.close) {
logDebug("Got closing frame (%s)", m_sentCloseFrame);

// If no close code was passed, we default to 1005
this.m_closeCode = 1005;

// If provided in the frame, attempt to parse the close code/reason
if (msg.peek().length >= short.sizeof) {
this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);

if (msg.peek().length > short.sizeof) {
this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
switch (msg.frameOpcode) {
default: throw new WebSocketException("unknown frame opcode");
case FrameOpcode.ping:
send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong);
break;
case FrameOpcode.pong:
// test if pong matches previous ping
if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) {
logDebugV("Received PONG that doesn't match previous ping.");
break;
}
logDebugV("Received matching PONG.");
m_pongReceived = true;
break;
case FrameOpcode.close:
logDebug("Got closing frame (%s)", m_sentCloseFrame);

// If no close code was passed, we default to 1005
this.m_closeCode = 1005;

// If provided in the frame, attempt to parse the close code/reason
if (msg.peek().length >= short.sizeof) {
this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);

if (msg.peek().length > short.sizeof) {
this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
}
}
}

if(!m_sentCloseFrame) close();
logDebug("Terminating connection (%s)", m_sentCloseFrame);
m_conn.close();
return;
if(!m_sentCloseFrame) close();
logDebug("Terminating connection (%s)", m_sentCloseFrame);
m_conn.close();
return;
case FrameOpcode.text:
case FrameOpcode.binary:
case FrameOpcode.continuation: // FIXME: add proper support for continuation frames!
m_readMutex.performLocked!({
m_nextMessage = msg;
m_readCondition.notifyAll();
while (m_nextMessage) m_readCondition.wait();
});
break;
}
m_readMutex.performLocked!({
m_nextMessage = msg;
m_readCondition.notifyAll();
while (m_nextMessage) m_readCondition.wait();
});
}
} catch (Exception e) {
logDiagnostic("Error while reading websocket message: %s", e.msg);
Expand All @@ -628,27 +626,22 @@ final class WebSocket {
// If no close code was passed, e.g. this was an unclean termination
// of our websocket connection, set the close code to 1006.
if (this.m_closeCode == 0) this.m_closeCode = 1006;
m_conn.close();
m_writeMutex.performLocked!({ m_conn.close(); });
}

private void sendPing()
nothrow {
if (!m_pongReceived) {
logDebug("Pong skipped");
m_pongSkipped = true;
m_pingTimer.stop();
return;
}
try m_writeMutex.performLocked!({
try {
if (!m_pongReceived) {
logDebug("Pong skipped. Closing connection.");
m_writeMutex.performLocked!({ m_conn.close(); });
m_pingTimer.stop();
return;
}
m_pongReceived = false;
Frame ping;
ping.opcode = FrameOpcode.ping;
ping.fin = true;
ping.payload = nativeToLittleEndian(++m_lastPingIndex);
ping.writeFrame(m_conn, m_rng);
logDebug("Ping sent");
});
catch (Exception e) {
send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
logDebugV("Ping sent");
} catch (Exception e) {
logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
}
}
Expand All @@ -667,7 +660,7 @@ final class OutgoingWebSocketMessage : OutputStream {
bool m_finalized = false;
}

this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
{
assert(conn !is null);
m_conn = conn;
Expand Down Expand Up @@ -736,12 +729,12 @@ final class IncomingWebSocketMessage : InputStream {
Frame m_currentFrame;
}

this(Stream conn, RandomNumberStream rng)
private this(Stream conn, RandomNumberStream rng)
{
assert(conn !is null);
m_conn = conn;
m_rng = rng;
readFrame();
skipFrame(); // reads the first frame
}

@property bool empty() const { return m_currentFrame.payload.length == 0; }
Expand Down Expand Up @@ -802,32 +795,6 @@ final class IncomingWebSocketMessage : InputStream {
}

alias read = InputStream.read;

private void readFrame() {
Frame frame;
do {
frame = Frame.readFrame(m_conn);
switch(frame.opcode) {
case FrameOpcode.continuation:
case FrameOpcode.text:
case FrameOpcode.binary:
case FrameOpcode.close:
case FrameOpcode.pong:
m_currentFrame = frame;
break;
case FrameOpcode.ping:
Frame pong;
pong.opcode = FrameOpcode.pong;
pong.fin = true;
pong.payload = frame.payload;

pong.writeFrame(m_conn, m_rng);
break;
default:
throw new WebSocketException("unknown frame opcode");
}
} while( frame.opcode == FrameOpcode.ping );
}
}

/// Magic string defined by the RFC for challenging the server during upgrade
Expand Down Expand Up @@ -926,18 +893,6 @@ private struct Frame {
}
}

void writeFrame(OutputStream stream, RandomNumberStream sys_rng)
{
import vibe.stream.wrapper;

auto rng = StreamOutputRange(stream);
ubyte[maxHeaderSize] hdr;
writeHeader(hdr[], sys_rng);
rng.put(hdr[0 .. getHeaderSize(sys_rng !is null)]);
rng.flush();
stream.flush();
}

static Frame readFrame(InputStream stream)
{
Frame frame;
Expand Down

0 comments on commit e3d6217

Please sign in to comment.