Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for websocket clients #1332

Merged
merged 6 commits into from Dec 2, 2015
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
132 changes: 124 additions & 8 deletions source/vibe/http/websockets.d
Expand Up @@ -35,6 +35,9 @@ import vibe.core.log;
import vibe.core.net;
import vibe.stream.operations;
import vibe.http.server;
import vibe.http.client;
import vibe.core.connectionpool;
import vibe.utils.array;

import core.time;
import std.array;
Expand All @@ -45,7 +48,10 @@ import std.bitmanip;
import std.digest.sha;
import std.string;
import std.functional;

import std.uuid;
import std.base64;
import std.digest.sha;
import vibe.crypto.cryptorand;

/// Exception thrown by $(D vibe.http.websockets).
class WebSocketException: Exception
Expand All @@ -63,6 +69,59 @@ class WebSocketException: Exception
}
}

/**
Returns a WebSocket client object that is connected to the specified host.
*/
WebSocket connectWebSocket(URL url, HTTPClientSettings settings = defaultSettings)
{
auto host = url.host;
auto port = url.port;
bool use_tls = (url.schema == "wss") ? true : false;

if (port == 0)
port = (use_tls) ? 443 : 80;

static struct ConnInfo { string host; ushort port; bool useTLS; string proxyIP; ushort proxyPort; }
static FixedRingBuffer!(Tuple!(ConnInfo, ConnectionPool!HTTPClient), 16) s_connections;
auto ckey = ConnInfo(host, port, use_tls, settings ? settings.proxyURL.host : null, settings ? settings.proxyURL.port : 0);

ConnectionPool!HTTPClient pool;
foreach (c; s_connections)
if (c[0].host == host && c[0].port == port && c[0].useTLS == use_tls && ((c[0].proxyIP == settings.proxyURL.host && c[0].proxyPort == settings.proxyURL.port) || settings is null))
pool = c[1];

if (!pool)
{
logDebug("Create HTTP client pool %s:%s %s proxy %s:%d", host, port, use_tls, (settings) ? settings.proxyURL.host : string.init, (settings) ? settings.proxyURL.port : 0);
pool = new ConnectionPool!HTTPClient({
auto ret = new HTTPClient;
ret.connect(host, port, use_tls, settings);
return ret;
});
if (s_connections.full)
s_connections.popFront();
s_connections.put(tuple(ckey, pool));
}

auto challengeKey = generateChallengeKey();
auto answerKey = computeAcceptKey(challengeKey);
auto cl = pool.lockConnection();
auto res = cl.request((scope req){
req.requestURL = (url.localURI == "") ? "/" : url.localURI;
req.method = HTTPMethod.GET;
req.headers["Upgrade"] = "websocket";
req.headers["Connection"] = "Upgrade";
req.headers["Sec-WebSocket-Version"] = "13";
req.headers["Sec-WebSocket-Key"] = challengeKey;
});

auto key = "sec-websocket-accept" in res.headers;
assert(key, answerKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't want an assert here, rather an enforce.

auto ws = new WebSocket(res.switchProtocol("websocket"), null, false);
return ws;
}


/**
Establishes a web socket conection and passes it to the $(D on_handshake) delegate.
*/
Expand Down Expand Up @@ -187,12 +246,14 @@ final class WebSocket {
uint m_lastPingIndex;
bool m_pongReceived;
bool m_pongSkipped;
bool m_isServer = true;
}

this(ConnectionStream conn, in HTTPServerRequest request)
this(ConnectionStream conn, in HTTPServerRequest request, bool is_server = true)
{
m_conn = conn;
m_request = request;
m_isServer = is_server;
assert(m_conn);
m_reader = runTask(&startReader);
m_writeMutex = new InterruptibleTaskMutex;
Expand Down Expand Up @@ -288,7 +349,7 @@ final class WebSocket {
{
m_writeMutex.performLocked!({
enforceEx!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
scope message = new OutgoingWebSocketMessage(m_conn, frameOpcode);
scope message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_isServer);
scope(exit) message.finalize();
sender(message);
});
Expand All @@ -310,6 +371,7 @@ final class WebSocket {
m_writeMutex.performLocked!({
m_sentCloseFrame = true;
Frame frame;
frame.isServer = m_isServer;
frame.opcode = FrameOpcode.close;
if(code != 0)
frame.payload = std.bitmanip.nativeToBigEndian(code) ~ cast(ubyte[])reason;
Expand Down Expand Up @@ -422,6 +484,7 @@ final class WebSocket {
m_writeMutex.performLocked!({
m_pongReceived = false;
Frame ping;
ping.isServer = m_isServer;
ping.opcode = FrameOpcode.ping;
ping.fin = true;
ping.payload = nativeToLittleEndian(++m_lastPingIndex);
Expand All @@ -440,13 +503,15 @@ final class OutgoingWebSocketMessage : OutputStream {
FrameOpcode m_frameOpcode;
Appender!(ubyte[]) m_buffer;
bool m_finalized = false;
bool m_isServer;
}

this( Stream conn, FrameOpcode frameOpcode )
this( Stream conn, FrameOpcode frameOpcode, bool is_server = true )
{
assert(conn !is null);
m_conn = conn;
m_frameOpcode = frameOpcode;
m_isServer = is_server;
}

void write(in ubyte[] bytes)
Expand All @@ -459,6 +524,7 @@ final class OutgoingWebSocketMessage : OutputStream {
{
assert(!m_finalized);
Frame frame;
frame.isServer = m_isServer;
frame.opcode = m_frameOpcode;
frame.fin = false;
frame.payload = m_buffer.data;
Expand All @@ -473,6 +539,7 @@ final class OutgoingWebSocketMessage : OutputStream {
m_finalized = true;

Frame frame;
frame.isServer = m_isServer;
frame.fin = true;
frame.opcode = m_frameOpcode;
frame.payload = m_buffer.data;
Expand Down Expand Up @@ -576,6 +643,7 @@ struct Frame {
bool fin;
FrameOpcode opcode;
ubyte[] payload;
bool isServer = true;


void writeFrame(OutputStream stream)
Expand All @@ -588,16 +656,31 @@ struct Frame {
if (fin) firstByte |= 0x80;
rng.put(firstByte);

auto b1 = 0;
if (!isServer) {
b1 = 0x80;
}

if( payload.length < 126 ) {
rng.put(std.bitmanip.nativeToBigEndian(cast(ubyte)payload.length));
rng.put(std.bitmanip.nativeToBigEndian(cast(ubyte)(b1 | payload.length)));
} else if( payload.length <= 65536 ) {
rng.put(cast(ubyte[])[126]);
rng.put(cast(ubyte[])[(b1 | 126)]);
rng.put(std.bitmanip.nativeToBigEndian(cast(ushort)payload.length));
} else {
rng.put(cast(ubyte[])[127]);
rng.put(cast(ubyte[])[(b1 | 127)]);
rng.put(std.bitmanip.nativeToBigEndian(payload.length));
}
rng.put(payload);

if (!isServer) {
auto key = generateNewMaskKey();
rng.put(key);
for (size_t i = 0; i < payload.length; i++) {
payload[i] ^= key[i % 4];
}
rng.put(payload);
}else {
rng.put(payload);
}
rng.flush();
stream.flush();
}
Expand Down Expand Up @@ -641,3 +724,36 @@ struct Frame {
return frame;
}
}


// This object is a placeholder and should to never be modified.
// copied from client.d not sure how to make visible for websockets.d so we avoid creating a new object
private __gshared HTTPClientSettings defaultSettings = new HTTPClientSettings;

private ubyte[] generateNewMaskKey()
{
auto rng = new SystemRNG();
auto buffer = new ubyte[4];
rng.read(buffer);
return buffer;
}

private string generateChallengeKey()
{
auto uuid = randomUUID().toString();
immutable(ubyte)[] b = uuid.representation;
auto result = Base64.encode(b);
return to!(string)(result);
}

private string computeAcceptKey(string challengekey)
{
immutable(ubyte)[] b = challengekey.representation;
immutable(ubyte)[] a = s_webSocketGuid.representation;
SHA1 hash;
hash.start();
hash.put(b);
hash.put(a);
auto result = Base64.encode(hash.finish());
return to!(string)(result);
}
10 changes: 10 additions & 0 deletions source/vibe/inet/url.d
Expand Up @@ -68,6 +68,8 @@ struct URL {
case "ftp":
case "spdy":
case "sftp":
case "ws":
case "wss":
case "file":
// proto://server/path style
enforce(str.startsWith("//"), "URL must start with proto://...");
Expand Down Expand Up @@ -325,6 +327,14 @@ unittest { // issue #1044
assert(url.pathString == "/q");
}

//websocket unittest
unittest {
URL url = URL("ws://127.0.0.1:8080/echo");
assert(url.host == "127.0.0.1");
assert(url.port == 8080);
assert(url.localURI == "/echo");
}

unittest {
Path p = Path("/foo bar/boo oom/");
URL url = URL("http", "example.com", 0, p); // constructor test
Expand Down