Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unix-socket support for connectTCP for libevent #1547

Merged
merged 1 commit into from Aug 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions examples/unix_socket/dub.json
@@ -0,0 +1,11 @@
{
"name": "unit-socket-example",
"description": "Example for sending http requests to unix sockets",
"dependencies": {
"vibe-d:http": {"path": "../../"},
"vibe-d:web": {"path": "../../"},
"vibe-d:core": {"path": "../../"}
},
"versions": ["VibeNoSSL"],
"subConfigurations": { "vibe-d:core": "libevent"}
}
13 changes: 13 additions & 0 deletions examples/unix_socket/source/app.d
@@ -0,0 +1,13 @@
import vibe.inet.url;
import vibe.http.client;
import vibe.stream.operations;
import std.stdio;

void main()
{
URL url = URL("http+unix://%2Fvar%2Frun%2Fdocker.sock/containers/json");
writeln(url);
requestHTTP(url,(scope req){},(scope res){
writeln(res.bodyReader.readAllUTF8);
});
}
6 changes: 4 additions & 2 deletions source/vibe/core/drivers/libevent2_tcp.d
Expand Up @@ -83,9 +83,11 @@ package final class Libevent2TCPConnection : TCPConnection {

void* ptr;
if( ctx.remote_addr.family == AF_INET ) ptr = &ctx.remote_addr.sockAddrInet4.sin_addr;
else if (ctx.remote_addr.family == AF_UNIX ) ptr = &ctx.remote_addr.sockAddrUnix.sun_path;
else ptr = &ctx.remote_addr.sockAddrInet6.sin6_addr;
evutil_inet_ntop(ctx.remote_addr.family, ptr, m_peerAddressBuf.ptr, m_peerAddressBuf.length);
m_peerAddress = cast(string)m_peerAddressBuf[0 .. m_peerAddressBuf[].indexOf('\0')];

if (evutil_inet_ntop(ctx.remote_addr.family, ptr, m_peerAddressBuf.ptr, m_peerAddressBuf.length) !is null)
m_peerAddress = cast(string)m_peerAddressBuf[0 .. m_peerAddressBuf[].indexOf('\0')];

bufferevent_setwatermark(m_ctx.event, EV_WRITE, 4096, 65536);
bufferevent_setwatermark(m_ctx.event, EV_READ, 0, 65536);
Expand Down
38 changes: 30 additions & 8 deletions source/vibe/core/net.d
Expand Up @@ -24,7 +24,10 @@ version(Windows) {
else
import core.sys.windows.winsock2;
}

version(Posix)
{
import core.sys.posix.sys.un;
}

/**
Resolves the given host name/IP address string.
Expand Down Expand Up @@ -91,25 +94,28 @@ TCPListener listenTCP_s(ushort port, void function(TCPConnection stream) connect
TCPConnection connectTCP(string host, ushort port, string bind_interface = null, ushort bind_port = 0)
{
NetworkAddress addr = resolveHost(host);
addr.port = port;
if (addr.family != AF_UNIX)
addr.port = port;
NetworkAddress bind_address;
if (bind_interface.length) bind_address = resolveHost(bind_interface, addr.family);
else {
bind_address.family = addr.family;
if (addr.family == AF_INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
else bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
if (bind_address.family == AF_INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
else if (bind_address.family != AF_UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
}
bind_address.port = bind_port;
if (addr.family != AF_UNIX)
bind_address.port = bind_port;
return getEventDriver().connectTCP(addr, bind_address);
}
/// ditto
TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress)
{
if (bind_address.family == AF_UNSPEC) {
bind_address.family = addr.family;
if (addr.family == AF_INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
else bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
bind_address.port = 0;
if (bind_address.family == AF_INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
else if (bind_address.family != AF_UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
if (bind_address.family != AF_UNIX)
bind_address.port = 0;
}
enforce(addr.family == bind_address.family, "Destination address and bind address have different address families.");
return getEventDriver().connectTCP(addr, bind_address);
Expand Down Expand Up @@ -142,6 +148,7 @@ struct NetworkAddress {

private union {
sockaddr addr;
sockaddr_un addr_unix;
sockaddr_in addr_ip4;
sockaddr_in6 addr_ip6;
}
Expand Down Expand Up @@ -187,6 +194,7 @@ struct NetworkAddress {
const pure nothrow {
switch (this.family) {
default: assert(false, "sockAddrLen() called for invalid address family.");
case AF_UNIX: return addr_unix.sizeof;
case AF_INET: return addr_ip4.sizeof;
case AF_INET6: return addr_ip6.sizeof;
}
Expand All @@ -200,6 +208,10 @@ struct NetworkAddress {
in { assert (family == AF_INET6); }
body { return &addr_ip6; }

@property inout(sockaddr_un)* sockAddrUnix() inout pure nothrow
in { assert (family == AF_UNIX); }
body { return &addr_unix; }

/** Returns a string representation of the IP address
*/
string toAddressString()
Expand Down Expand Up @@ -231,6 +243,13 @@ struct NetworkAddress {
sink.formattedWrite("%x", bigEndianToNative!ushort(_dummy));
}
break;
case AF_UNIX:
import std.traits : hasMember;
static if (hasMember!(sockaddr_un, "sun_len"))
sink.formattedWrite("%s",() @trusted { return cast(char[])addr_unix.sun_path[0..addr_unix.sun_len]; } ());
else
sink.formattedWrite("%s",() @trusted { return (cast(char*)addr_unix.sun_path.ptr).fromStringz; } ());
break;
}
}

Expand Down Expand Up @@ -258,6 +277,9 @@ struct NetworkAddress {
toAddressString(sink);
sink.formattedWrite("]:%s", port);
break;
case AF_UNIX:
toAddressString(sink);
break;
}
}

Expand Down
82 changes: 72 additions & 10 deletions source/vibe/http/client.d
Expand Up @@ -36,7 +36,13 @@ import std.string;
import std.typecons;
import std.datetime;


version(Posix)
{
version(VibeLibeventDriver)
{
version = UnixSocket;
}
}
/**************************************************************************************************/
/* Public functions */
/**************************************************************************************************/
Expand Down Expand Up @@ -67,16 +73,25 @@ HTTPClientResponse requestHTTP(string url, scope void delegate(scope HTTPClientR
/// ditto
HTTPClientResponse requestHTTP(URL url, scope void delegate(scope HTTPClientRequest req) requester = null, const(HTTPClientSettings) settings = defaultSettings)
{
enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s).");
version(UnixSocket) {
enforce(url.schema == "http" || url.schema == "https" || url.schema == "http+unix" || url.schema == "https+unix", "URL schema must be http(s) or http(s)+unix.");
} else {
enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s).");
}
enforce(url.host.length > 0, "URL must contain a host name.");
bool use_tls;

if (settings.proxyURL.schema !is null)
use_tls = settings.proxyURL.schema == "https";
else
use_tls = url.schema == "https";
{
version(UnixSocket)
use_tls = url.schema == "https";
else
use_tls = url.schema == "https" || url.schema == "https+unix";
}

auto cli = connectHTTP(url.host, url.port, use_tls, settings);
auto cli = connectHTTP(url.getFilteredHost, url.port, use_tls, settings);
auto res = cli.request((req){
if (url.localURI.length) {
assert(url.path.absolute, "Request URL path must be absolute.");
Expand Down Expand Up @@ -114,16 +129,25 @@ void requestHTTP(string url, scope void delegate(scope HTTPClientRequest req) re
/// ditto
void requestHTTP(URL url, scope void delegate(scope HTTPClientRequest req) requester, scope void delegate(scope HTTPClientResponse req) responder, const(HTTPClientSettings) settings = defaultSettings)
{
enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s).");
version(UnixSocket) {
enforce(url.schema == "http" || url.schema == "https" || url.schema == "http+unix" || url.schema == "https+unix", "URL schema must be http(s) or http(s)+unix.");
} else {
enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s).");
}
enforce(url.host.length > 0, "URL must contain a host name.");
bool use_tls;

if (settings.proxyURL.schema !is null)
use_tls = settings.proxyURL.schema == "https";
else
use_tls = url.schema == "https";
{
version(UnixSocket)
use_tls = url.schema == "https";
else
use_tls = url.schema == "https" || url.schema == "https+unix";
}

auto cli = connectHTTP(url.host, url.port, use_tls, settings);
auto cli = connectHTTP(url.getFilteredHost, url.port, use_tls, settings);
cli.request((scope req) {
if (url.localURI.length) {
assert(url.path.absolute, "Request URL path must be absolute.");
Expand Down Expand Up @@ -550,9 +574,33 @@ final class HTTPClient {
m_conn = connectTCP(proxyAddr, m_settings.networkInterface);
}
else {
auto addr = resolveHost(m_server, m_settings.dnsAddressFamily);
addr.port = m_port;
m_conn = connectTCP(addr, m_settings.networkInterface);
version(UnixSocket)
{
import core.sys.posix.sys.un;
import core.sys.posix.sys.socket;
import std.regex : regex, Captures, Regex, matchFirst, ctRegex;
import core.stdc.string : strcpy;

NetworkAddress addr;
if (m_server[0] == '/')
{
addr.family = AF_UNIX;
sockaddr_un* s = addr.sockAddrUnix();
enforce(s.sun_path.length > m_server.length, "Unix sockets cannot have that long a name.");
s.sun_family = AF_UNIX;
strcpy(cast(char*)s.sun_path.ptr,m_server.toStringz());
} else
{
addr = resolveHost(m_server, m_settings.dnsAddressFamily);
addr.port = m_port;
}
m_conn = connectTCP(addr, m_settings.networkInterface);
} else
{
auto addr = resolveHost(m_server, m_settings.dnsAddressFamily);
addr.port = m_port;
m_conn = connectTCP(addr, m_settings.networkInterface);
}
}

m_stream = m_conn;
Expand Down Expand Up @@ -1022,5 +1070,19 @@ final class HTTPClientResponse : HTTPResponse {
}
}

/** Returns clean host string. In case of unix socket it performs urlDecode on host. */
private auto getFilteredHost(URL url)
{
version(UnixSocket)
{
import vibe.textfilter.urlencode : urlDecode;
if (url.schema == "https+unix" || url.schema == "http+unix")
return urlDecode(url.host);
else
return url.host;
} else
return url.host;
}

// This object is a placeholder and should to never be modified.
package __gshared HTTPClientSettings defaultSettings = new HTTPClientSettings;
14 changes: 14 additions & 0 deletions source/vibe/inet/url.d
Expand Up @@ -71,6 +71,8 @@ struct URL {
case "ws":
case "wss":
case "file":
case "http+unix":
case "https+unix":
// proto://server/path style
enforce(str.startsWith("//"), "URL must start with proto://...");
requires_host = true;
Expand Down Expand Up @@ -270,6 +272,8 @@ struct URL {
case "ftp":
case "spdy":
case "sftp":
case "http+unix":
case "https+unix":
dst.put("//");
break;
}
Expand Down Expand Up @@ -426,3 +430,13 @@ unittest { // issue #1318
assert(false, "Expected to throw an exception.");
} catch (Exception e) {}
}

unittest {
assert(URL("http+unix://%2Fvar%2Frun%2Fdocker.sock").schema == "http+unix");
assert(URL("https+unix://%2Fvar%2Frun%2Fdocker.sock").schema == "https+unix");
assert(URL("http+unix://%2Fvar%2Frun%2Fdocker.sock").host == "%2Fvar%2Frun%2Fdocker.sock");
assert(URL("http+unix://%2Fvar%2Frun%2Fdocker.sock").pathString == "");
assert(URL("http+unix://%2Fvar%2Frun%2Fdocker.sock/container/json").pathString == "/container/json");
auto url = URL("http+unix://%2Fvar%2Frun%2Fdocker.sock/container/json");
assert(URL(url.toString()) == url);
}