Skip to content

Commit

Permalink
Merge pull request #947 from lultimouomo/pingPong
Browse files Browse the repository at this point in the history
Add automatic ping on websockets
  • Loading branch information
s-ludwig committed Jan 15, 2015
2 parents 99f9f69 + f241c4d commit 2303f1c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
16 changes: 16 additions & 0 deletions source/vibe/http/server.d
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,19 @@ final class HTTPServerSettings {
*/
bool useCompressionIfPossible = false;


/** Interval between WebSocket ping frames.
The default value is 60 seconds; set to Duration.zero to disable pings.
*/
Duration webSocketPingInterval;// = dur!"seconds"(60);

this()
{
// need to use the contructor because the Ubuntu 13.10 GDC cannot CTFE dur()
maxRequestTime = 0.seconds;
keepAliveTimeout = 10.seconds;
webSocketPingInterval = 60.seconds;
}
}

Expand Down Expand Up @@ -692,6 +700,14 @@ final class HTTPServerRequest : HTTPRequest {
Session session;
}

package {
/** The settings of the server serving this request.
*/
@property const(HTTPServerSettings) serverSettings() const
{
return m_settings;
}
}

this(SysTime time, ushort port)
{
Expand Down
48 changes: 46 additions & 2 deletions source/vibe/http/websockets.d
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ final class WebSocket {
Task m_reader;
TaskMutex m_readMutex, m_writeMutex;
TaskCondition m_readCondition;
Timer m_pingTimer;
uint m_lastPingIndex;
bool m_pongReceived;
bool m_pongSkipped;
}

this(ConnectionStream conn, in HTTPServerRequest request)
Expand All @@ -204,6 +208,10 @@ final class WebSocket {
m_writeMutex = new TaskMutex;
m_readMutex = new TaskMutex;
m_readCondition = new TaskCondition(m_readMutex);
if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) {
m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true);
m_pongReceived = true;
}
}

/**
Expand Down Expand Up @@ -310,7 +318,7 @@ final class WebSocket {
frame.writeFrame(m_conn);
}
}

if (m_pingTimer) m_pingTimer.stop();
if (Task.getThis() != m_reader) m_reader.join();
}

Expand Down Expand Up @@ -367,7 +375,24 @@ final class WebSocket {
try {
while (!m_conn.empty) {
assert(!m_nextMessage);
if (m_pingTimer) {
if (m_pongSkipped) {
logDebug("Pong not received, closing connection");
synchronized(m_writeMutex) {
m_conn.close();
}
return;
}
if (!m_conn.waitForData(request.serverSettings.webSocketPingInterval))
continue;
}
scope msg = new IncomingWebSocketMessage(m_conn);
if (msg.frameOpcode == FrameOpcode.pong) {
enforce(msg.peek().length == uint.sizeof, "Pong payload has wrong length");
enforce(m_lastPingIndex == littleEndianToNative!uint(msg.peek()[0..uint.sizeof]), "Pong payload has wrong value");
m_pongReceived = true;
continue;
}
if(msg.frameOpcode == FrameOpcode.close) {
logDebug("Got closing frame (%s)", m_sentCloseFrame);
if(!m_sentCloseFrame) close();
Expand All @@ -387,8 +412,26 @@ final class WebSocket {
}
m_conn.close();
}
}

private void sendPing() {
if (!m_pongReceived) {
logDebug("Pong skipped");
m_pongSkipped = true;
m_pingTimer.stop();
return;
}
synchronized(m_writeMutex) {
m_pongReceived = false;
Frame ping;
ping.opcode = FrameOpcode.ping;
ping.fin = true;
ping.payload = nativeToLittleEndian(++m_lastPingIndex);

ping.writeFrame(m_conn);
logDebug("Ping sent");
}
}
}

/**
Represents a single outgoing _WebSocket message as an OutputStream.
Expand Down Expand Up @@ -500,6 +543,7 @@ final class IncomingWebSocketMessage : InputStream {
case FrameOpcode.text:
case FrameOpcode.binary:
case FrameOpcode.close:
case FrameOpcode.pong:
m_currentFrame = frame;
break;
case FrameOpcode.ping:
Expand Down

0 comments on commit 2303f1c

Please sign in to comment.