Permalink
Browse files

Extend Address to support a list of addresses

This in turn allows clients to connect to a semicolon-delimited list of
addresses, making it possible to run without a DNS name.

Fix #5: Need a way for clients to connect to a random server
  • Loading branch information...
ongardie committed Nov 21, 2014
1 parent 35bc680 commit 83733fe8ed86fa6c33f53f7f1f06d47c86b6d3f2
Showing with 121 additions and 70 deletions.
  1. +2 −1 Client/Client.h
  2. +36 −23 RPC/Address.cc
  3. +16 −15 RPC/Address.h
  4. +60 −24 RPC/AddressTest.cc
  5. +1 −1 RPC/ClientServerTest.cc
  6. +6 −6 RPC/ClientSessionTest.cc
@@ -449,7 +449,8 @@ class Cluster {
* \param hosts
* A string describing the hosts in the cluster. This should be of the
* form host:port, where host is usually a DNS name that resolves to
* multiple IP addresses.
* multiple IP addresses. Alternatively, you can pass a list of hosts
* as host1:port1;host2:port2;host3:port3.
*/
explicit Cluster(const std::string& hosts);
~Cluster();
@@ -1,4 +1,5 @@
/* Copyright (c) 2012 Stanford University
* Copyright (c) 2014 Diego Ongaro
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -25,45 +26,53 @@
#include "Core/Debug.h"
#include "Core/Endian.h"
#include "Core/Random.h"
#include "Core/StringUtil.h"
#include "RPC/Address.h"

namespace LogCabin {
namespace RPC {

Address::Address(const std::string& str, uint16_t defaultPort)
: originalString(str)
, host(str)
, port()
, hosts()
, storage()
, len(0)
{
memset(&storage, 0, sizeof(storage));
size_t lastColon = host.rfind(':');
if (lastColon != host.npos &&
host.find(']', lastColon) == host.npos) {
// following lastColon is a port number
port = host.substr(lastColon + 1);
host.erase(lastColon);
} else {
// use default port
std::stringstream buf;
buf << defaultPort;
port = buf.str();
originalString += ":" + port;
}

// IPv6 hosts are surrounded in brackets. These need to be stripped off.
if (!host.empty() && host[0] == '[' && host[host.length() - 1] == ']') {
host = host.substr(1, host.length() - 2);
std::vector<std::string> hostsList = Core::StringUtil::split(str, ';');
for (auto it = hostsList.begin(); it != hostsList.end(); ++it) {
std::string host = *it;
std::string port;
if (host.empty())
continue;

size_t lastColon = host.rfind(':');
if (lastColon != host.npos &&
host.find(']', lastColon) == host.npos) {
// following lastColon is a port number
port = host.substr(lastColon + 1);
host.erase(lastColon);
} else {
// use default port
port = Core::StringUtil::toString(defaultPort);
}

// IPv6 hosts are surrounded in brackets. These need to be stripped.
if (host.at(0) == '[' &&
host.at(host.length() - 1) == ']') {
host = host.substr(1, host.length() - 2);
}

hosts.push_back({host, port});
}

refresh();
}

Address::Address(const Address& other)
: originalString(other.originalString)
, host(other.host)
, port(other.port)
, hosts(other.hosts)
, storage()
, len(other.len)
{
@@ -74,8 +83,7 @@ Address&
Address::operator=(const Address& other)
{
originalString = other.originalString;
host = other.host;
port = other.port;
hosts = other.hosts;
memcpy(&storage, &other.storage, sizeof(storage));
len = other.len;
return *this;
@@ -143,6 +151,11 @@ Address::toString() const
void
Address::refresh()
{
if (hosts.empty())
return;
size_t hostIdx = Core::Random::random32() % hosts.size();
const std::string& host = hosts.at(hostIdx).first;
const std::string& port = hosts.at(hostIdx).second;
VERBOSE("Running getaddrinfo for host %s with port %s",
host.c_str(), port.c_str());

@@ -169,7 +182,7 @@ Address::refresh()
if (!candidates.empty()) {
// Select one randomly and hope it works.
size_t idx = Core::Random::random32() % candidates.size();
addrinfo* addr = candidates[idx];
addrinfo* addr = candidates.at(idx);
memcpy(&storage, addr->ai_addr, addr->ai_addrlen);
len = addr->ai_addrlen;
}
@@ -1,4 +1,5 @@
/* Copyright (c) 2012 Stanford University
* Copyright (c) 2014 Diego Ongaro
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -18,13 +19,15 @@

#include <sys/socket.h>
#include <string>
#include <vector>

namespace LogCabin {
namespace RPC {

/**
* An Address specifies a host and a port.
* This class also handles DNS lookups for addressing hosts by name.
* This class resolves user-friendly addresses for services into socket-level
* addresses. It supports DNS lookups for addressing hosts by name, and it
* supports multiple (alternative) addresses.
*/
class Address {
public:
@@ -38,6 +41,7 @@ class Address {
* - IPv4Address
* - [IPv6Address]:port
* - [IPv6Address]
* Or a semicolon-delimited list of these to represent multiple hosts.
* \param defaultPort
* The port number to use if none is specified in str.
*/
@@ -88,7 +92,7 @@ class Address {
std::string toString() const;

/**
* Convert the host and port to a sockaddr.
* Convert (a random one of) the host(s) and port(s) to a sockaddr.
* If the host is a name instead of numeric, this will run a DNS query and
* select a random result. If this query fails, any previous sockaddr will
* be left intact.
@@ -98,23 +102,20 @@ class Address {
private:

/**
* The host name or numeric address as passed into the constructor.
* The host name(s) or numeric address(es) as passed into the constructor.
*/
std::string originalString;

/**
* The host name or numeric address as parsed from the string passed into
* the constructor. This has brackets stripped out of IPv6 addresses and is
* in the form needed by getaddrinfo().
* A list of (host, port) pairs as parsed from originalString.
* - First component: the host name or numeric address as parsed from the
* string passed into the constructor. This has brackets stripped out of
* IPv6 addresses and is in the form needed by getaddrinfo().
* - Second component: an ASCII representation of the port number to use.
* It is stored in string form because that's sometimes how it comes into
* the constructor and always what refresh() needs to call getaddrinfo().
*/
std::string host;

/**
* An ASCII representation of the port number to use. It is stored in
* string form because that's sometimes how it comes into the constructor
* and always what refresh() needs to call getaddrinfo().
*/
std::string port;
std::vector<std::pair<std::string, std::string>> hosts;

/**
* Storage for the sockaddr returned by getSockAddr.
@@ -1,4 +1,5 @@
/* Copyright (c) 2012 Stanford University
* Copyright (c) 2014 Diego Ongaro
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -23,50 +24,73 @@ namespace RPC {
namespace {

TEST(RPCAddressTest, constructor) {
EXPECT_EQ(":90 (resolved to Unspecified)",
EXPECT_EQ(" (resolved to Unspecified)",
Address("", 90).toString());

// hostname
Address name("example.com", 80);
EXPECT_EQ("example.com", name.host);
EXPECT_EQ("80", name.port);
EXPECT_EQ("example.com:80", name.originalString);
EXPECT_EQ("example.com", name.hosts.at(0).first);
EXPECT_EQ("80", name.hosts.at(0).second);
EXPECT_EQ("example.com", name.originalString);
Address namePort("example.com:80", 90);
EXPECT_EQ("example.com", namePort.host);
EXPECT_EQ("80", namePort.port);
EXPECT_EQ("example.com", namePort.hosts.at(0).first);
EXPECT_EQ("80", namePort.hosts.at(0).second);
EXPECT_EQ("example.com:80", namePort.originalString);

// IPv4
Address ipv4("1.2.3.4", 80);
EXPECT_EQ("1.2.3.4", ipv4.host);
EXPECT_EQ("80", ipv4.port);
EXPECT_EQ("1.2.3.4:80", ipv4.originalString);
EXPECT_EQ("1.2.3.4", ipv4.hosts.at(0).first);
EXPECT_EQ("80", ipv4.hosts.at(0).second);
EXPECT_EQ("1.2.3.4", ipv4.originalString);
Address ipv4Port("1.2.3.4:80", 90);
EXPECT_EQ("1.2.3.4", ipv4Port.host);
EXPECT_EQ("80", ipv4Port.port);
EXPECT_EQ("1.2.3.4", ipv4Port.hosts.at(0).first);
EXPECT_EQ("80", ipv4Port.hosts.at(0).second);
EXPECT_EQ("1.2.3.4:80", ipv4Port.originalString);

// IPv6
Address ipv6("[1:2:3:4:5:6:7:8]", 80);
EXPECT_EQ("1:2:3:4:5:6:7:8", ipv6.host);
EXPECT_EQ("80", ipv6.port);
EXPECT_EQ("[1:2:3:4:5:6:7:8]:80", ipv6.originalString);
EXPECT_EQ("1:2:3:4:5:6:7:8", ipv6.hosts.at(0).first);
EXPECT_EQ("80", ipv6.hosts.at(0).second);
EXPECT_EQ("[1:2:3:4:5:6:7:8]", ipv6.originalString);
Address ipv6Port("[1:2:3:4:5:6:7:8]:80", 90);
EXPECT_EQ("1:2:3:4:5:6:7:8", ipv6Port.host);
EXPECT_EQ("80", ipv6Port.port);
EXPECT_EQ("1:2:3:4:5:6:7:8", ipv6Port.hosts.at(0).first);
EXPECT_EQ("80", ipv6Port.hosts.at(0).second);
EXPECT_EQ("[1:2:3:4:5:6:7:8]:80", ipv6Port.originalString);
Address ipv6Short("[::1]", 80);
EXPECT_EQ("::1", ipv6Short.host);
EXPECT_EQ("80", ipv6Short.port);
EXPECT_EQ("[::1]:80", ipv6Short.originalString);

EXPECT_EQ("::1", ipv6Short.hosts.at(0).first);
EXPECT_EQ("80", ipv6Short.hosts.at(0).second);
EXPECT_EQ("[::1]", ipv6Short.originalString);

// multiple hosts
Address all("example.com;"
"example.com:80;"
"1.2.3.4;"
"1.2.3.4:80;"
"[1:2:3:4:5:6:7:8];"
"[1:2:3:4:5:6:7:8]:80;"
"[::1]", 80);
EXPECT_EQ((std::vector<std::pair<std::string, std::string>> {
{"example.com", "80"},
{"example.com", "80"},
{"1.2.3.4", "80"},
{"1.2.3.4", "80"},
{"1:2:3:4:5:6:7:8", "80"},
{"1:2:3:4:5:6:7:8", "80"},
{"::1", "80"},
}),
all.hosts);

Address semicolons(";;;example.com;;;;", 80);
EXPECT_EQ((std::vector<std::pair<std::string, std::string>> {
{"example.com", "80"},
}),
semicolons.hosts);
}

TEST(RPCAddressTest, constructor_copy) {
Address a("127.0.0.1", 80);
Address b(a);
EXPECT_EQ(a.host, b.host);
EXPECT_EQ(a.port, b.port);
EXPECT_EQ(a.hosts, b.hosts);
EXPECT_EQ(a.len, b.len);
EXPECT_EQ(a.toString(), b.toString());
EXPECT_EQ(a.getResolvedString(), b.getResolvedString());
@@ -76,8 +100,7 @@ TEST(RPCAddressTest, assignment) {
Address a("127.0.0.1", 80);
Address b("127.0.0.2", 81);
b = a;
EXPECT_EQ(a.host, b.host);
EXPECT_EQ(a.port, b.port);
EXPECT_EQ(a.hosts, b.hosts);
EXPECT_EQ(a.len, b.len);
EXPECT_EQ(a.toString(), b.toString());
EXPECT_EQ(a.getResolvedString(), b.getResolvedString());
@@ -102,6 +125,19 @@ TEST(RPCAddressTest, toString) {
}

TEST(RPCAddressTest, refresh) {
Address empty("", 80);
empty.refresh();
EXPECT_FALSE(empty.isValid());

// should be random, but should eventually refresh to all addresses
Address multi("1.2.3.4;5.6.7.8", 80);
std::set<std::string> resolved;
for (uint64_t i = 0; i < 20; ++i) {
multi.refresh();
resolved.insert(multi.getResolvedString());
}
EXPECT_EQ(2U, resolved.size());

// This should be a pretty stable IP address, since it is supposed to be
// easy to be memorize (at least for IPv4).
std::string googleDNS =
@@ -129,7 +129,7 @@ TEST_F(RPCClientServerTest, timeout) {
Event::Loop::Lock blockPings(serverEventLoop);
RPC::OpaqueClientRPC rpc2 = clientSession->sendRequest(RPC::Buffer());
rpc2.waitForReply();
EXPECT_EQ("Server 127.0.0.1:61023 (resolved to 127.0.0.1:61023) timed out",
EXPECT_EQ("Server 127.0.0.1 (resolved to 127.0.0.1:61023) timed out",
rpc2.getErrorMessage());

}
@@ -114,7 +114,7 @@ TEST_F(RPCClientSessionTest, onReceiveMessage_ping) {

TEST_F(RPCClientSessionTest, onDisconnect) {
session->messageSocket->onDisconnect();
EXPECT_EQ("Disconnected from server 127.0.0.1:0 (resolved to 127.0.0.1:0)",
EXPECT_EQ("Disconnected from server 127.0.0.1 (resolved to 127.0.0.1:0)",
session->errorMessage);
}

@@ -140,7 +140,7 @@ TEST_F(RPCClientSessionTest, handleTimerEvent) {
// need to time out session
session->numActiveRPCs = 1;
session->timer.handleTimerEvent();
EXPECT_EQ("Server 127.0.0.1:0 (resolved to 127.0.0.1:0) timed out",
EXPECT_EQ("Server 127.0.0.1 (resolved to 127.0.0.1:0) timed out",
session->errorMessage);
session->numActiveRPCs = 0;
}
@@ -149,12 +149,12 @@ TEST_F(RPCClientSessionTest, constructor) {
auto session2 = ClientSession::makeSession(eventLoop,
Address("127.0.0.1", 0),
1024);
EXPECT_EQ("127.0.0.1:0 (resolved to 127.0.0.1:0)",
EXPECT_EQ("127.0.0.1 (resolved to 127.0.0.1:0)",
session2->address.toString());
EXPECT_EQ("Failed to connect socket to 127.0.0.1:0 "
EXPECT_EQ("Failed to connect socket to 127.0.0.1 "
"(resolved to 127.0.0.1:0)",
session2->errorMessage);
EXPECT_EQ("Closed session: Failed to connect socket to 127.0.0.1:0 "
EXPECT_EQ("Closed session: Failed to connect socket to 127.0.0.1 "
"(resolved to 127.0.0.1:0)",
session2->toString());
EXPECT_FALSE(session2->messageSocket);
@@ -193,7 +193,7 @@ TEST_F(RPCClientSessionTest, getErrorMessage) {
}

TEST_F(RPCClientSessionTest, toString) {
EXPECT_EQ("Active session to 127.0.0.1:0 (resolved to 127.0.0.1:0)",
EXPECT_EQ("Active session to 127.0.0.1 (resolved to 127.0.0.1:0)",
session->toString());
}

0 comments on commit 83733fe

Please sign in to comment.