Skip to content

Commit

Permalink
Merge pull request #1547 from skoppe/unix-sockets
Browse files Browse the repository at this point in the history
Add unix-socket support for connectTCP for libevent
  • Loading branch information
s-ludwig committed Aug 22, 2016
2 parents 1b17907 + 33027ee commit d97a89c
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 20 deletions.
11 changes: 11 additions & 0 deletions examples/unix_socket/dub.json
@@ -0,0 +1,11 @@
{
"name": "unit-socket-example",
"description": "Example for sending http requests to unix sockets",
"dependencies": {
"vibe-d:http": {"path": "../../"},
"vibe-d:web": {"path": "../../"},
"vibe-d:core": {"path": "../../"}
},
"versions": ["VibeNoSSL"],
"subConfigurations": { "vibe-d:core": "libevent"}
}
13 changes: 13 additions & 0 deletions examples/unix_socket/source/app.d
@@ -0,0 +1,13 @@
import vibe.inet.url;
import vibe.http.client;
import vibe.stream.operations;
import std.stdio;

void main()
{
URL url = URL("http+unix://%2Fvar%2Frun%2Fdocker.sock/containers/json");
writeln(url);
requestHTTP(url,(scope req){},(scope res){
writeln(res.bodyReader.readAllUTF8);
});
}
6 changes: 4 additions & 2 deletions source/vibe/core/drivers/libevent2_tcp.d
Expand Up @@ -83,9 +83,11 @@ package final class Libevent2TCPConnection : TCPConnection {

void* ptr;
if( ctx.remote_addr.family == AF_INET ) ptr = &ctx.remote_addr.sockAddrInet4.sin_addr;
else if (ctx.remote_addr.family == AF_UNIX ) ptr = &ctx.remote_addr.sockAddrUnix.sun_path;
else ptr = &ctx.remote_addr.sockAddrInet6.sin6_addr;
evutil_inet_ntop(ctx.remote_addr.family, ptr, m_peerAddressBuf.ptr, m_peerAddressBuf.length);
m_peerAddress = cast(string)m_peerAddressBuf[0 .. m_peerAddressBuf[].indexOf('\0')];

if (evutil_inet_ntop(ctx.remote_addr.family, ptr, m_peerAddressBuf.ptr, m_peerAddressBuf.length) !is null)
m_peerAddress = cast(string)m_peerAddressBuf[0 .. m_peerAddressBuf[].indexOf('\0')];

bufferevent_setwatermark(m_ctx.event, EV_WRITE, 4096, 65536);
bufferevent_setwatermark(m_ctx.event, EV_READ, 0, 65536);
Expand Down
38 changes: 30 additions & 8 deletions source/vibe/core/net.d
Expand Up @@ -24,7 +24,10 @@ version(Windows) {
else
import core.sys.windows.winsock2;
}

version(Posix)
{
import core.sys.posix.sys.un;
}

/**
Resolves the given host name/IP address string.
Expand Down Expand Up @@ -91,25 +94,28 @@ TCPListener listenTCP_s(ushort port, void function(TCPConnection stream) connect
TCPConnection connectTCP(string host, ushort port, string bind_interface = null, ushort bind_port = 0)
{
NetworkAddress addr = resolveHost(host);
addr.port = port;
if (addr.family != AF_UNIX)
addr.port = port;
NetworkAddress bind_address;
if (bind_interface.length) bind_address = resolveHost(bind_interface, addr.family);
else {
bind_address.family = addr.family;
if (addr.family == AF_INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
else bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
if (bind_address.family == AF_INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
else if (bind_address.family != AF_UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
}
bind_address.port = bind_port;
if (addr.family != AF_UNIX)
bind_address.port = bind_port;
return getEventDriver().connectTCP(addr, bind_address);
}
/// ditto
TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_address = anyAddress)
{
if (bind_address.family == AF_UNSPEC) {
bind_address.family = addr.family;
if (addr.family == AF_INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
else bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
bind_address.port = 0;
if (bind_address.family == AF_INET) bind_address.sockAddrInet4.sin_addr.s_addr = 0;
else if (bind_address.family != AF_UNIX) bind_address.sockAddrInet6.sin6_addr.s6_addr[] = 0;
if (bind_address.family != AF_UNIX)
bind_address.port = 0;
}
enforce(addr.family == bind_address.family, "Destination address and bind address have different address families.");
return getEventDriver().connectTCP(addr, bind_address);
Expand Down Expand Up @@ -142,6 +148,7 @@ struct NetworkAddress {

private union {
sockaddr addr;
sockaddr_un addr_unix;
sockaddr_in addr_ip4;
sockaddr_in6 addr_ip6;
}
Expand Down Expand Up @@ -187,6 +194,7 @@ struct NetworkAddress {
const pure nothrow {
switch (this.family) {
default: assert(false, "sockAddrLen() called for invalid address family.");
case AF_UNIX: return addr_unix.sizeof;
case AF_INET: return addr_ip4.sizeof;
case AF_INET6: return addr_ip6.sizeof;
}
Expand All @@ -200,6 +208,10 @@ struct NetworkAddress {
in { assert (family == AF_INET6); }
body { return &addr_ip6; }

@property inout(sockaddr_un)* sockAddrUnix() inout pure nothrow
in { assert (family == AF_UNIX); }
body { return &addr_unix; }

/** Returns a string representation of the IP address
*/
string toAddressString()
Expand Down Expand Up @@ -231,6 +243,13 @@ struct NetworkAddress {
sink.formattedWrite("%x", bigEndianToNative!ushort(_dummy));
}
break;
case AF_UNIX:
import std.traits : hasMember;
static if (hasMember!(sockaddr_un, "sun_len"))
sink.formattedWrite("%s",() @trusted { return cast(char[])addr_unix.sun_path[0..addr_unix.sun_len]; } ());
else
sink.formattedWrite("%s",() @trusted { return (cast(char*)addr_unix.sun_path.ptr).fromStringz; } ());
break;
}
}

Expand Down Expand Up @@ -258,6 +277,9 @@ struct NetworkAddress {
toAddressString(sink);
sink.formattedWrite("]:%s", port);
break;
case AF_UNIX:
toAddressString(sink);
break;
}
}

Expand Down
82 changes: 72 additions & 10 deletions source/vibe/http/client.d
Expand Up @@ -36,7 +36,13 @@ import std.string;
import std.typecons;
import std.datetime;


version(Posix)
{
version(VibeLibeventDriver)
{
version = UnixSocket;
}
}
/**************************************************************************************************/
/* Public functions */
/**************************************************************************************************/
Expand Down Expand Up @@ -67,16 +73,25 @@ HTTPClientResponse requestHTTP(string url, scope void delegate(scope HTTPClientR
/// ditto
HTTPClientResponse requestHTTP(URL url, scope void delegate(scope HTTPClientRequest req) requester = null, const(HTTPClientSettings) settings = defaultSettings)
{
enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s).");
version(UnixSocket) {
enforce(url.schema == "http" || url.schema == "https" || url.schema == "http+unix" || url.schema == "https+unix", "URL schema must be http(s) or http(s)+unix.");
} else {
enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s).");
}
enforce(url.host.length > 0, "URL must contain a host name.");
bool use_tls;

if (settings.proxyURL.schema !is null)
use_tls = settings.proxyURL.schema == "https";
else
use_tls = url.schema == "https";
{
version(UnixSocket)
use_tls = url.schema == "https";
else
use_tls = url.schema == "https" || url.schema == "https+unix";
}

auto cli = connectHTTP(url.host, url.port, use_tls, settings);
auto cli = connectHTTP(url.getFilteredHost, url.port, use_tls, settings);
auto res = cli.request((req){
if (url.localURI.length) {
assert(url.path.absolute, "Request URL path must be absolute.");
Expand Down Expand Up @@ -114,16 +129,25 @@ void requestHTTP(string url, scope void delegate(scope HTTPClientRequest req) re
/// ditto
void requestHTTP(URL url, scope void delegate(scope HTTPClientRequest req) requester, scope void delegate(scope HTTPClientResponse req) responder, const(HTTPClientSettings) settings = defaultSettings)
{
enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s).");
version(UnixSocket) {
enforce(url.schema == "http" || url.schema == "https" || url.schema == "http+unix" || url.schema == "https+unix", "URL schema must be http(s) or http(s)+unix.");
} else {
enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s).");
}
enforce(url.host.length > 0, "URL must contain a host name.");
bool use_tls;

if (settings.proxyURL.schema !is null)
use_tls = settings.proxyURL.schema == "https";
else
use_tls = url.schema == "https";
{
version(UnixSocket)
use_tls = url.schema == "https";
else
use_tls = url.schema == "https" || url.schema == "https+unix";
}

auto cli = connectHTTP(url.host, url.port, use_tls, settings);
auto cli = connectHTTP(url.getFilteredHost, url.port, use_tls, settings);
cli.request((scope req) {
if (url.localURI.length) {
assert(url.path.absolute, "Request URL path must be absolute.");
Expand Down Expand Up @@ -550,9 +574,33 @@ final class HTTPClient {
m_conn = connectTCP(proxyAddr, m_settings.networkInterface);
}
else {
auto addr = resolveHost(m_server, m_settings.dnsAddressFamily);
addr.port = m_port;
m_conn = connectTCP(addr, m_settings.networkInterface);
version(UnixSocket)
{
import core.sys.posix.sys.un;
import core.sys.posix.sys.socket;
import std.regex : regex, Captures, Regex, matchFirst, ctRegex;
import core.stdc.string : strcpy;

NetworkAddress addr;
if (m_server[0] == '/')
{
addr.family = AF_UNIX;
sockaddr_un* s = addr.sockAddrUnix();
enforce(s.sun_path.length > m_server.length, "Unix sockets cannot have that long a name.");
s.sun_family = AF_UNIX;
strcpy(cast(char*)s.sun_path.ptr,m_server.toStringz());
} else
{
addr = resolveHost(m_server, m_settings.dnsAddressFamily);
addr.port = m_port;
}
m_conn = connectTCP(addr, m_settings.networkInterface);
} else
{
auto addr = resolveHost(m_server, m_settings.dnsAddressFamily);
addr.port = m_port;
m_conn = connectTCP(addr, m_settings.networkInterface);
}
}

m_stream = m_conn;
Expand Down Expand Up @@ -1022,5 +1070,19 @@ final class HTTPClientResponse : HTTPResponse {
}
}

/** Returns clean host string. In case of unix socket it performs urlDecode on host. */
private auto getFilteredHost(URL url)
{
version(UnixSocket)
{
import vibe.textfilter.urlencode : urlDecode;
if (url.schema == "https+unix" || url.schema == "http+unix")
return urlDecode(url.host);
else
return url.host;
} else
return url.host;
}

// This object is a placeholder and should to never be modified.
package __gshared HTTPClientSettings defaultSettings = new HTTPClientSettings;
14 changes: 14 additions & 0 deletions source/vibe/inet/url.d
Expand Up @@ -71,6 +71,8 @@ struct URL {
case "ws":
case "wss":
case "file":
case "http+unix":
case "https+unix":
// proto://server/path style
enforce(str.startsWith("//"), "URL must start with proto://...");
requires_host = true;
Expand Down Expand Up @@ -270,6 +272,8 @@ struct URL {
case "ftp":
case "spdy":
case "sftp":
case "http+unix":
case "https+unix":
dst.put("//");
break;
}
Expand Down Expand Up @@ -426,3 +430,13 @@ unittest { // issue #1318
assert(false, "Expected to throw an exception.");
} catch (Exception e) {}
}

unittest {
assert(URL("http+unix://%2Fvar%2Frun%2Fdocker.sock").schema == "http+unix");
assert(URL("https+unix://%2Fvar%2Frun%2Fdocker.sock").schema == "https+unix");
assert(URL("http+unix://%2Fvar%2Frun%2Fdocker.sock").host == "%2Fvar%2Frun%2Fdocker.sock");
assert(URL("http+unix://%2Fvar%2Frun%2Fdocker.sock").pathString == "");
assert(URL("http+unix://%2Fvar%2Frun%2Fdocker.sock/container/json").pathString == "/container/json");
auto url = URL("http+unix://%2Fvar%2Frun%2Fdocker.sock/container/json");
assert(URL(url.toString()) == url);
}

0 comments on commit d97a89c

Please sign in to comment.