Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WebSocket ping logic. Fixes #1471. #1848

Merged
merged 1 commit into from
Jul 17, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 59 additions & 104 deletions http/vibe/http/websockets.d
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ final class WebSocket {
Timer m_pingTimer;
uint m_lastPingIndex;
bool m_pongReceived;
bool m_pongSkipped;
short m_closeCode;
const(char)[] m_closeReason;
/// The entropy generator to use
Expand Down Expand Up @@ -508,15 +507,12 @@ final class WebSocket {
assert(reason.length <= 123);

if (connected) {
m_writeMutex.performLocked!({
send((scope msg) {
m_sentCloseFrame = true;
Frame frame;
frame.opcode = FrameOpcode.close;
if(code != 0)
frame.payload = std.bitmanip.nativeToBigEndian(code) ~ cast(const ubyte[])reason;
frame.fin = true;
frame.writeFrame(m_conn, m_rng);
});
if (code != 0)
msg.write(std.bitmanip.nativeToBigEndian(code));
msg.write(cast(const ubyte[])reason);
}, FrameOpcode.close);
}
if (m_pingTimer) m_pingTimer.stop();
if (Task.getThis() != m_reader) m_reader.join();
Expand Down Expand Up @@ -576,49 +572,51 @@ final class WebSocket {
try {
while (!m_conn.empty) {
assert(!m_nextMessage);
if (m_pingTimer) {
if (m_pongSkipped) {
logDebug("Pong not received, closing connection");
m_writeMutex.performLocked!({
m_conn.close();
});
return;
}
if (!m_conn.waitForData(request.serverSettings.webSocketPingInterval))
continue;
}
/*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng);
if (msg.frameOpcode == FrameOpcode.pong) {
enforce(msg.peek().length == uint.sizeof, "Pong payload has wrong length");
enforce(m_lastPingIndex == littleEndianToNative!uint(msg.peek()[0..uint.sizeof]), "Pong payload has wrong value");
m_pongReceived = true;
continue;
}
if(msg.frameOpcode == FrameOpcode.close) {
logDebug("Got closing frame (%s)", m_sentCloseFrame);

// If no close code was passed, we default to 1005
this.m_closeCode = 1005;

// If provided in the frame, attempt to parse the close code/reason
if (msg.peek().length >= short.sizeof) {
this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);

if (msg.peek().length > short.sizeof) {
this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
switch (msg.frameOpcode) {
default: throw new WebSocketException("unknown frame opcode");
case FrameOpcode.ping:
send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong);
break;
case FrameOpcode.pong:
// test if pong matches previous ping
if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) {
logDebugV("Received PONG that doesn't match previous ping.");
break;
}
logDebugV("Received matching PONG.");
m_pongReceived = true;
break;
case FrameOpcode.close:
logDebug("Got closing frame (%s)", m_sentCloseFrame);

// If no close code was passed, we default to 1005
this.m_closeCode = 1005;

// If provided in the frame, attempt to parse the close code/reason
if (msg.peek().length >= short.sizeof) {
this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);

if (msg.peek().length > short.sizeof) {
this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
}
}
}

if(!m_sentCloseFrame) close();
logDebug("Terminating connection (%s)", m_sentCloseFrame);
m_conn.close();
return;
if(!m_sentCloseFrame) close();
logDebug("Terminating connection (%s)", m_sentCloseFrame);
m_conn.close();
return;
case FrameOpcode.text:
case FrameOpcode.binary:
case FrameOpcode.continuation: // FIXME: add proper support for continuation frames!
m_readMutex.performLocked!({
m_nextMessage = msg;
m_readCondition.notifyAll();
while (m_nextMessage) m_readCondition.wait();
});
break;
}
m_readMutex.performLocked!({
m_nextMessage = msg;
m_readCondition.notifyAll();
while (m_nextMessage) m_readCondition.wait();
});
}
} catch (Exception e) {
logDiagnostic("Error while reading websocket message: %s", e.msg);
Expand All @@ -628,27 +626,22 @@ final class WebSocket {
// If no close code was passed, e.g. this was an unclean termination
// of our websocket connection, set the close code to 1006.
if (this.m_closeCode == 0) this.m_closeCode = 1006;
m_conn.close();
m_writeMutex.performLocked!({ m_conn.close(); });
}

private void sendPing()
nothrow {
if (!m_pongReceived) {
logDebug("Pong skipped");
m_pongSkipped = true;
m_pingTimer.stop();
return;
}
try m_writeMutex.performLocked!({
try {
if (!m_pongReceived) {
logDebug("Pong skipped. Closing connection.");
m_writeMutex.performLocked!({ m_conn.close(); });
m_pingTimer.stop();
return;
}
m_pongReceived = false;
Frame ping;
ping.opcode = FrameOpcode.ping;
ping.fin = true;
ping.payload = nativeToLittleEndian(++m_lastPingIndex);
ping.writeFrame(m_conn, m_rng);
logDebug("Ping sent");
});
catch (Exception e) {
send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
logDebugV("Ping sent");
} catch (Exception e) {
logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
}
}
Expand All @@ -667,7 +660,7 @@ final class OutgoingWebSocketMessage : OutputStream {
bool m_finalized = false;
}

this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
{
assert(conn !is null);
m_conn = conn;
Expand Down Expand Up @@ -736,12 +729,12 @@ final class IncomingWebSocketMessage : InputStream {
Frame m_currentFrame;
}

this(Stream conn, RandomNumberStream rng)
private this(Stream conn, RandomNumberStream rng)
{
assert(conn !is null);
m_conn = conn;
m_rng = rng;
readFrame();
skipFrame(); // reads the first frame
}

@property bool empty() const { return m_currentFrame.payload.length == 0; }
Expand Down Expand Up @@ -802,32 +795,6 @@ final class IncomingWebSocketMessage : InputStream {
}

alias read = InputStream.read;

private void readFrame() {
Frame frame;
do {
frame = Frame.readFrame(m_conn);
switch(frame.opcode) {
case FrameOpcode.continuation:
case FrameOpcode.text:
case FrameOpcode.binary:
case FrameOpcode.close:
case FrameOpcode.pong:
m_currentFrame = frame;
break;
case FrameOpcode.ping:
Frame pong;
pong.opcode = FrameOpcode.pong;
pong.fin = true;
pong.payload = frame.payload;

pong.writeFrame(m_conn, m_rng);
break;
default:
throw new WebSocketException("unknown frame opcode");
}
} while( frame.opcode == FrameOpcode.ping );
}
}

/// Magic string defined by the RFC for challenging the server during upgrade
Expand Down Expand Up @@ -926,18 +893,6 @@ private struct Frame {
}
}

void writeFrame(OutputStream stream, RandomNumberStream sys_rng)
{
import vibe.stream.wrapper;

auto rng = StreamOutputRange(stream);
ubyte[maxHeaderSize] hdr;
writeHeader(hdr[], sys_rng);
rng.put(hdr[0 .. getHeaderSize(sys_rng !is null)]);
rng.flush();
stream.flush();
}

static Frame readFrame(InputStream stream)
{
Frame frame;
Expand Down