Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiple concurrent websocket connects failing #2265

Merged
merged 10 commits into from
Feb 18, 2019
2 changes: 1 addition & 1 deletion core/vibe/core/drivers/libevent2_tcp.d
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ package final class Libevent2TCPConnection : TCPConnection {

@property ulong leastSize()
{
if (!m_ctx || !m_ctx.event) return 0;
if (!m_ctx || !m_ctx.event || m_ctx.shutdown) return 0;
if (m_readBuffer.length) {
checkReader();
return m_readBuffer.length;
Expand Down
14 changes: 8 additions & 6 deletions http/vibe/http/client.d
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,11 @@ final class HTTPClient {
scope(exit) m_requesting = false;

if (!m_conn || !m_conn.connected) {
if (m_conn) m_conn.close(); // make sure all resources are freed
if (m_conn) {
m_conn.close(); // make sure all resources are freed
m_conn = TCPConnection.init;
}

if (m_settings.proxyURL.host !is null){

enum AddressType {
Expand Down Expand Up @@ -640,6 +644,7 @@ final class HTTPClient {
try m_tlsStream = createTLSStream(m_conn, m_tls, TLSStreamState.connecting, m_server, m_conn.remoteAddress);
catch (Exception e) {
m_conn.close();
m_conn = TCPConnection.init;
throw e;
}
m_stream = m_tlsStream;
Expand Down Expand Up @@ -1106,8 +1111,6 @@ final class HTTPClientResponse : HTTPResponse {
enforce(!new_protocol.length || !icmp(*resNewProto, new_protocol),
"Expected Upgrade: " ~ new_protocol ~", received Upgrade: " ~ *resNewProto);
auto stream = createConnectionProxyStream!(typeof(m_client.m_stream), typeof(m_client.m_conn))(m_client.m_stream, m_client.m_conn);
m_client.m_responding = false;
m_client = null;
m_closeConn = true; // cannot reuse connection for further requests!
return stream;
}
Expand All @@ -1119,9 +1122,8 @@ final class HTTPClientResponse : HTTPResponse {
enforce(resNewProto, "Server did not send an Upgrade header");
enforce(!new_protocol.length || !icmp(*resNewProto, new_protocol),
"Expected Upgrade: " ~ new_protocol ~", received Upgrade: " ~ *resNewProto);
scope stream = createConnectionProxyStream(m_client.m_stream, m_client.m_conn);
m_client.m_responding = false;
m_client = null;
auto stream = createConnectionProxyStream(m_client.m_stream, m_client.m_conn);
scope (exit) () @trusted { destroy(stream); } ();
m_closeConn = true;
del(stream);
}
Expand Down
23 changes: 15 additions & 8 deletions http/vibe/http/server.d
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,7 @@ final class HTTPServerResponse : HTTPResponse {
bool m_headerWritten = false;
bool m_isHeadResponse = false;
bool m_tls;
bool m_requiresConnectionClose;
SysTime m_timeFinalized;
}

Expand Down Expand Up @@ -1510,6 +1511,7 @@ final class HTTPServerResponse : HTTPResponse {
statusCode = HTTPStatus.SwitchingProtocols;
if (protocol.length) headers["Upgrade"] = protocol;
writeVoidBody();
m_requiresConnectionClose = true;
return createConnectionProxyStream(m_conn, m_rawConnection);
}
/// ditto
Expand All @@ -1518,13 +1520,12 @@ final class HTTPServerResponse : HTTPResponse {
statusCode = HTTPStatus.SwitchingProtocols;
if (protocol.length) headers["Upgrade"] = protocol;
writeVoidBody();
m_requiresConnectionClose = true;
() @trusted {
auto conn = createConnectionProxyStreamFL(m_conn, m_rawConnection);
del(conn);
} ();
finalize();
if (m_rawConnection && m_rawConnection.connected)
m_rawConnection.close(); // connection not reusable after a protocol upgrade
}

/** Special method for handling CONNECT proxy tunnel
Expand All @@ -1545,7 +1546,6 @@ final class HTTPServerResponse : HTTPResponse {
del(conn);
} ();
finalize();
m_rawConnection.close(); // connection not reusable after a protocol upgrade
}

/** Sets the specified cookie value.
Expand Down Expand Up @@ -1665,8 +1665,9 @@ final class HTTPServerResponse : HTTPResponse {
catch (Exception e) logDebug("Failed to flush connection after finishing HTTP response: %s", e.msg);
if (!isHeadResponse && bytesWritten < headers.get("Content-Length", "0").to!long) {
logDebug("HTTP response only written partially before finalization. Terminating connection.");
m_rawConnection.close();
m_requiresConnectionClose = true;
}

m_rawConnection = InterfaceProxy!ConnectionStream.init;
}

Expand Down Expand Up @@ -1994,7 +1995,7 @@ private HTTPListener listenHTTPPlain(HTTPServerSettings settings, HTTPServerRequ
import vibe.core.core : runWorkerTaskDist;
import std.algorithm : canFind, find;

static TCPListener doListen(HTTPServerContext listen_info, bool dist, bool reusePort)
static TCPListener doListen(HTTPServerContext listen_info, bool dist, bool reusePort, bool is_tls)
@safe {
try {
TCPListenOptions options = TCPListenOptions.defaults;
Expand All @@ -2013,7 +2014,7 @@ private HTTPListener listenHTTPPlain(HTTPServerSettings settings, HTTPServerRequ
if (listen_info.bindPort == 0)
listen_info.m_bindPort = ret.bindAddress.port;

auto proto = listen_info.tlsContext ? "https" : "http";
auto proto = is_tls ? "https" : "http";
auto urladdr = listen_info.bindAddress;
if (urladdr.canFind(':')) urladdr = "["~urladdr~"]";
logInfo("Listening for requests on %s://%s:%s/", proto, urladdr, listen_info.bindPort);
Expand All @@ -2035,7 +2036,10 @@ private HTTPListener listenHTTPPlain(HTTPServerSettings settings, HTTPServerRequ
if (!l.empty) linfo = l.front;
else {
auto li = new HTTPServerContext(addr, settings.port);
if (auto tcp_lst = doListen(li, (settings.options & HTTPServerOptionImpl.distribute) != 0, (settings.options & HTTPServerOption.reusePort) != 0)) // DMD BUG 2043
if (auto tcp_lst = doListen(li,
(settings.options & HTTPServerOptionImpl.distribute) != 0,
(settings.options & HTTPServerOption.reusePort) != 0,
settings.tlsContext !is null)) // DMD BUG 2043
{
li.m_listener = tcp_lst;
s_listeners ~= li;
Expand Down Expand Up @@ -2267,7 +2271,7 @@ private bool handleRequest(InterfaceProxy!Stream http_stream, TCPConnection tcp_
if (!parsed || res.headerWritten || !cast(Exception)e) keep_alive = false;
}

if (tcp_connection.connected) {
if (tcp_connection.connected && keep_alive) {
if (req.bodyReader && !req.bodyReader.empty) {
req.bodyReader.pipe(nullSink);
logTrace("dropped body");
Expand All @@ -2277,6 +2281,9 @@ private bool handleRequest(InterfaceProxy!Stream http_stream, TCPConnection tcp_
// finalize (e.g. for chunked encoding)
res.finalize();

if (res.m_requiresConnectionClose)
keep_alive = false;

foreach (k, v ; req._files) {
if (existsFile(v.tempPath)) {
removeFile(v.tempPath);
Expand Down
91 changes: 45 additions & 46 deletions http/vibe/http/websockets.d
Original file line number Diff line number Diff line change
Expand Up @@ -85,43 +85,10 @@ class WebSocketException: Exception
*/
WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings)
@safe {
import std.typecons : Tuple, tuple;

auto host = url.host;
auto port = url.port;
bool use_tls = (url.schema == "wss") ? true : false;

if (port == 0)
port = (use_tls) ? 443 : 80;

static struct ConnInfo { string host; ushort port; bool useTLS; string proxyIP; ushort proxyPort; }
static vibe.utils.array.FixedRingBuffer!(Tuple!(ConnInfo, ConnectionPool!HTTPClient), 16) s_connections;
auto ckey = ConnInfo(host, port, use_tls, settings ? settings.proxyURL.host : null, settings ? settings.proxyURL.port : 0);

ConnectionPool!HTTPClient pool;
foreach (c; s_connections)
if (c[0].host == host && c[0].port == port && c[0].useTLS == use_tls && (settings is null || (c[0].proxyIP == settings.proxyURL.host && c[0].proxyPort == settings.proxyURL.port)))
pool = c[1];

if (!pool)
{
logDebug("Create HTTP client pool %s:%s %s proxy %s:%d", host, port, use_tls, (settings) ? settings.proxyURL.host : string.init, (settings) ? settings.proxyURL.port : 0);
pool = new ConnectionPool!HTTPClient({
auto ret = new HTTPClient;
ret.connect(host, port, use_tls, settings);
return ret;
});
if (s_connections.full)
s_connections.popFront();
s_connections.put(tuple(ckey, pool));
}

auto rng = secureRNG();
auto challengeKey = generateChallengeKey(rng);
auto answerKey = computeAcceptKey(challengeKey);
auto cl = pool.lockConnection();
auto res = cl.request((scope req){
req.requestURL = (url.localURI == "") ? "/" : url.localURI;
auto res = requestHTTP(url, (scope req){
req.method = HTTPMethod.GET;
req.headers["Upgrade"] = "websocket";
req.headers["Connection"] = "Upgrade";
Expand All @@ -135,8 +102,7 @@ WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = default
enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
enforce(*key == answerKey, "Response has wrong accept key");
auto conn = res.switchProtocol("websocket");
auto ws = new WebSocket(conn, null, rng);
return ws;
return new WebSocket(conn, rng, res);
}

/// ditto
Expand All @@ -163,8 +129,9 @@ void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPC
enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
enforce(*key == answerKey, "Response has wrong accept key");
res.switchProtocol("websocket", (scope conn) @trusted {
scope ws = new WebSocket(conn, null, rng);
scope ws = new WebSocket(conn, rng, res);
del(ws);
if (ws.connected) ws.close();
});
}
);
Expand Down Expand Up @@ -233,7 +200,7 @@ void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPSe
res.headers["Connection"] = "Upgrade";
ConnectionStream conn = res.switchProtocol("websocket");

WebSocket socket = new WebSocket(conn, req, null);
WebSocket socket = new WebSocket(conn, req, res);
try {
on_handshake(socket);
} catch (Exception e) {
Expand Down Expand Up @@ -309,7 +276,7 @@ HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handsh
res.headers["Connection"] = "Upgrade";
res.switchProtocol("websocket", (scope conn) {
// TODO: put back 'scope' once it is actually enforced by DMD
/*scope*/ auto socket = new WebSocket(conn, req, null);
/*scope*/ auto socket = new WebSocket(conn, req, res);
try on_handshake(socket);
catch (Exception e) {
logDiagnostic("WebSocket handler failed: %s", e.msg);
Expand Down Expand Up @@ -527,7 +494,10 @@ final class WebSocket {
bool m_sentCloseFrame = false;
IncomingWebSocketMessage m_nextMessage = null;
const HTTPServerRequest m_request;
HTTPServerResponse m_serverResponse;
HTTPClientResponse m_clientResponse;
Task m_reader;
Task m_ownerTask;
InterruptibleTaskMutex m_readMutex, m_writeMutex;
InterruptibleTaskCondition m_readCondition;
Timer m_pingTimer;
Expand All @@ -547,12 +517,15 @@ final class WebSocket {
* conn = Underlying connection string
* request = HTTP request used to establish the connection
* rng = Source of entropy to use. If null, assume we're a server socket
* client_res = For client sockets, the response object (keeps the http client locked until the socket is done)
*/
private this(ConnectionStream conn, in HTTPServerRequest request,
RandomNumberStream rng)
private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res)
{
m_ownerTask = Task.getThis();
m_conn = conn;
m_request = request;
m_clientResponse = client_res;
m_serverResponse = server_res;
assert(m_conn);
m_rng = rng;
m_writeMutex = new InterruptibleTaskMutex;
Expand All @@ -567,6 +540,16 @@ final class WebSocket {
});
}

private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res)
{
this(conn, null, null, rng, client_res);
}

private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res)
{
this(conn, request, res, null, null);
}

/**
Determines if the WebSocket connection is still alive and ready for sending.

Expand All @@ -576,7 +559,7 @@ final class WebSocket {

See_also: $(D waitForData)
*/
@property bool connected() { return m_conn.connected && !m_sentCloseFrame; }
@property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; }

/**
Returns the close code sent by the remote end.
Expand Down Expand Up @@ -721,7 +704,25 @@ final class WebSocket {
}, FrameOpcode.close);
}
if (m_pingTimer) m_pingTimer.stop();
if (Task.getThis() != m_reader) m_reader.join();


if (Task.getThis() == m_ownerTask) {
m_writeMutex.performLocked!({
if (m_clientResponse) {
m_clientResponse.disconnect();
m_clientResponse = HTTPClientResponse.init;
}
if (m_serverResponse) {
m_serverResponse.finalize();
m_serverResponse = HTTPServerResponse.init;
}
});

m_reader.join();

() @trusted { destroy(m_conn); } ();
m_conn = ConnectionStream.init;
}
}

/**
Expand Down Expand Up @@ -811,7 +812,6 @@ final class WebSocket {

if(!m_sentCloseFrame) close();
logDebug("Terminating connection (%s)", m_sentCloseFrame);
m_conn.close();
return;
case FrameOpcode.text:
case FrameOpcode.binary:
Expand All @@ -832,15 +832,14 @@ final class WebSocket {
// If no close code was passed, e.g. this was an unclean termination
// of our websocket connection, set the close code to 1006.
if (this.m_closeCode == 0) this.m_closeCode = WebSocketCloseReason.abnormalClosure;
m_writeMutex.performLocked!({ m_conn.close(); });
}

private void sendPing()
nothrow {
try {
if (!m_pongReceived) {
logDebug("Pong skipped. Closing connection.");
m_writeMutex.performLocked!({ m_conn.close(); });
close();
m_pingTimer.stop();
return;
}
Expand Down
2 changes: 2 additions & 0 deletions tests/vibe.http.websocket.2169/dub.sdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name "test"
dependency "vibe-d:http" path="../../"
Loading