Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add automatic ping on websockets #947

Merged
merged 4 commits into from
Jan 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions source/vibe/http/server.d
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,19 @@ final class HTTPServerSettings {
*/
bool useCompressionIfPossible = false;


/** Interval between WebSocket ping frames.

The default value is 60 seconds; set to Duration.zero to disable pings.
*/
Duration webSocketPingInterval;// = dur!"seconds"(60);

this()
{
// need to use the contructor because the Ubuntu 13.10 GDC cannot CTFE dur()
maxRequestTime = 0.seconds;
keepAliveTimeout = 10.seconds;
webSocketPingInterval = 60.seconds;
}
}

Expand Down Expand Up @@ -692,6 +700,14 @@ final class HTTPServerRequest : HTTPRequest {
Session session;
}

package {
/** The settings of the server serving this request.
*/
@property const(HTTPServerSettings) serverSettings() const
{
return m_settings;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave this as package for now to avoid any potential necessity for API breakage later. If it turns out that this is useful for external libraries/applications, we can still make it public.

}

this(SysTime time, ushort port)
{
Expand Down
48 changes: 46 additions & 2 deletions source/vibe/http/websockets.d
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ final class WebSocket {
Task m_reader;
TaskMutex m_readMutex, m_writeMutex;
TaskCondition m_readCondition;
Timer m_pingTimer;
uint m_lastPingIndex;
bool m_pongReceived;
bool m_pongSkipped;
}

this(ConnectionStream conn, in HTTPServerRequest request)
Expand All @@ -204,6 +208,10 @@ final class WebSocket {
m_writeMutex = new TaskMutex;
m_readMutex = new TaskMutex;
m_readCondition = new TaskCondition(m_readMutex);
if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) {
m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true);
m_pongReceived = true;
}
}

/**
Expand Down Expand Up @@ -310,7 +318,7 @@ final class WebSocket {
frame.writeFrame(m_conn);
}
}

if (m_pingTimer) m_pingTimer.stop();
if (Task.getThis() != m_reader) m_reader.join();
}

Expand Down Expand Up @@ -367,7 +375,24 @@ final class WebSocket {
try {
while (!m_conn.empty) {
assert(!m_nextMessage);
if (m_pingTimer) {
if (m_pongSkipped) {
logDebug("Pong not received, closing connection");
synchronized(m_writeMutex) {
m_conn.close();
}
return;
}
if (!m_conn.waitForData(request.serverSettings.webSocketPingInterval))
continue;
}
scope msg = new IncomingWebSocketMessage(m_conn);
if (msg.frameOpcode == FrameOpcode.pong) {
enforce(msg.peek().length == uint.sizeof, "Pong payload has wrong length");
enforce(m_lastPingIndex == littleEndianToNative!uint(msg.peek()[0..uint.sizeof]), "Pong payload has wrong value");
m_pongReceived = true;
continue;
}
if(msg.frameOpcode == FrameOpcode.close) {
logDebug("Got closing frame (%s)", m_sentCloseFrame);
if(!m_sentCloseFrame) close();
Expand All @@ -387,8 +412,26 @@ final class WebSocket {
}
m_conn.close();
}
}

private void sendPing() {
if (!m_pongReceived) {
logDebug("Pong skipped");
m_pongSkipped = true;
m_pingTimer.stop();
return;
}
synchronized(m_writeMutex) {
m_pongReceived = false;
Frame ping;
ping.opcode = FrameOpcode.ping;
ping.fin = true;
ping.payload = nativeToLittleEndian(++m_lastPingIndex);

ping.writeFrame(m_conn);
logDebug("Ping sent");
}
}
}

/**
Represents a single outgoing _WebSocket message as an OutputStream.
Expand Down Expand Up @@ -500,6 +543,7 @@ final class IncomingWebSocketMessage : InputStream {
case FrameOpcode.text:
case FrameOpcode.binary:
case FrameOpcode.close:
case FrameOpcode.pong:
m_currentFrame = frame;
break;
case FrameOpcode.ping:
Expand Down