Skip to content

Commit

Permalink
Merge "rpc shard to shard connection" from Gleb
Browse files Browse the repository at this point in the history
"
If a server and a client have the same amount of shards this series allows to
establish connection from shard X in a client to shard X in a server.
This is beneficial for systems that distribute work based on hash of
a data been processed since in such a system this will eliminate internal
cross-cpu hop from the pipeline.

Using this and shard aware cql driver cross cpu communication in
storage_proxy goes almost to zero for regular cassandra-stress workload.
"

* 'gleb/shard-to-shard' of github.com:cloudius-systems/seastar-dev:
  rpc: provide and option to enable port based load balancing for rpc server
  posix-stack: Choose local port based on the shard number connect() runs on
  posix-stack: Introduce new, port based, load balancing algorithm for connections
  net: remove listen_options constructor
  • Loading branch information
avikivity committed Oct 2, 2018
2 parents 873f95f + 8c58cd7 commit 71e914e
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 15 deletions.
42 changes: 42 additions & 0 deletions doc/network-connection-load-balancing.md
@@ -0,0 +1,42 @@
# Motivation

In sharded systems like seastar it is important for work to be
distributed equally between all shards to achieve maximum performance
from the system. Networking subsystem has its part in distributing work
equally. For instance if on a server all connections will be served by
single shard only, the system will be working with the speed of this
one shard and all other shards will be underutilized.

# Common ways to distribute work received over network between shards

Two common ways to distribute work between shards are:
- do the work at a shard that received it
- shard that does actual work depends on a data been processed
(one way to do it is to hash(data) % smp_count = shard)

# Load Balancing

Those two ways asks for different strategy to distribute connections
between shards. The first one will work best if each cpu will have the
same amount of connections (assuming each connection gets same amount of
works) the second will work best if data will arrive to a shard where
it is going to be processed and actual connection distribution does
not matter.

Seastar's posix stack supports both of those strategies. Desired
one can be chosen by specifying load balancing algorithm in
listen_options provided to reactor::listen() call. Available options
are:

- load_balancing_algorithm::connection_distribution

Make sure that new connection will be placed to a shard with smallest
amount of connections of the same type.

- load_balancing_algorithm::port

Destination shard is chosen as a function of client's local port:
shard = port_number % num_shards. This allows a client to make sure that
a connection will be processed by a specific shard by choosing its local
port accordingly (the knowledge about amount of shards in the server is
needed and can be negotiated by different channel).
17 changes: 17 additions & 0 deletions net/api.hh
Expand Up @@ -243,6 +243,17 @@ class server_socket {
std::unique_ptr<net::server_socket_impl> _ssi;
bool _aborted = false;
public:
enum class load_balancing_algorithm {
// This algorithm tries to distribute all connections equally between all shards.
// It does this by sending new connections to a shard with smallest amount of connections.
connection_distribution,
// This algorithm distributes new connection based on peer's tcp port. Destination shard
// is calculated as a port number modulo number of shards. This allows a client to connect
// to a specific shard in a server given it knows how many shards server has by choosing
// src port number accordingly.
port,
default_ = connection_distribution
};
/// Constructs a \c server_socket not corresponding to a connection
server_socket();
/// \cond internal
Expand Down Expand Up @@ -271,6 +282,12 @@ public:
};
/// @}

struct listen_options {
bool reuse_address = false;
server_socket::load_balancing_algorithm lba = server_socket::load_balancing_algorithm::default_;
transport proto = transport::TCP;
};

class network_stack {
public:
virtual ~network_stack() {}
Expand Down
31 changes: 27 additions & 4 deletions net/posix-stack.cc
Expand Up @@ -19,6 +19,7 @@
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/

#include <random>
#include "posix-stack.hh"
#include "net.hh"
#include "packet.hh"
Expand Down Expand Up @@ -154,12 +155,33 @@ using posix_connected_sctp_socket_impl = posix_connected_socket_impl<transport::

class posix_socket_impl final : public socket_impl {
lw_shared_ptr<pollable_fd> _fd;

future<> find_port_and_connect(socket_address sa, socket_address local, transport proto = transport::TCP) {
static thread_local std::default_random_engine random_engine{std::random_device{}()};
static thread_local std::uniform_int_distribution<uint16_t> u(49152/smp::count + 1, 65535/smp::count - 1);
return repeat([this, sa, local, proto, attempts = 0, requested_port = ntoh(local.as_posix_sockaddr_in().sin_port)] () mutable {
uint16_t port = attempts++ < 5 && requested_port == 0 && proto == transport::TCP ? u(random_engine) * smp::count + engine().cpu_id() : requested_port;
local.as_posix_sockaddr_in().sin_port = hton(port);
return futurize_apply([this, sa, local] { return engine().posix_connect(_fd, sa, local); }).then_wrapped([] (future<> f) {
try {
f.get();
return stop_iteration::yes;
} catch (std::system_error& err) {
if (err.code().value() == EADDRINUSE) {
return stop_iteration::no;
}
throw;
}
});
});
}

public:
posix_socket_impl() = default;

virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override {
_fd = engine().make_pollable_fd(sa, proto);
return engine().posix_connect(_fd, sa, local).then([fd = _fd, proto]() mutable {
return find_port_and_connect(sa, local, proto).then([fd = _fd, proto] () mutable {
std::unique_ptr<connected_socket_impl> csi;
if (proto == transport::TCP) {
csi.reset(new posix_connected_tcp_socket_impl(std::move(fd)));
Expand Down Expand Up @@ -187,7 +209,8 @@ template <transport Transport>
future<connected_socket, socket_address>
posix_server_socket_impl<Transport>::accept() {
return _lfd.accept().then([this] (pollable_fd fd, socket_address sa) {
auto cth = _conntrack.get_handle();
auto cth = _lba == server_socket::load_balancing_algorithm::connection_distribution ?
_conntrack.get_handle() : _conntrack.get_handle(ntoh(sa.as_posix_sockaddr_in().sin_port) % smp::count);
auto cpu = cth.cpu();
if (cpu == engine().cpu_id()) {
std::unique_ptr<connected_socket_impl> csi(
Expand Down Expand Up @@ -334,12 +357,12 @@ posix_network_stack::listen(socket_address sa, listen_options opt) {
return _reuseport ?
server_socket(std::make_unique<posix_reuseport_server_tcp_socket_impl>(sa, engine().posix_listen(sa, opt)))
:
server_socket(std::make_unique<posix_server_tcp_socket_impl>(sa, engine().posix_listen(sa, opt)));
server_socket(std::make_unique<posix_server_tcp_socket_impl>(sa, engine().posix_listen(sa, opt), opt.lba));
} else {
return _reuseport ?
server_socket(std::make_unique<posix_reuseport_server_sctp_socket_impl>(sa, engine().posix_listen(sa, opt)))
:
server_socket(std::make_unique<posix_server_sctp_socket_impl>(sa, engine().posix_listen(sa, opt)));
server_socket(std::make_unique<posix_server_sctp_socket_impl>(sa, engine().posix_listen(sa, opt), opt.lba));
}
}

Expand Down
10 changes: 9 additions & 1 deletion net/posix-stack.hh
Expand Up @@ -54,6 +54,10 @@ class conntrack {
_cpu_load[cpu]++;
return cpu;
}
shard_id force_cpu(shard_id cpu) {
_cpu_load[cpu]++;
return cpu;
}
};

lw_shared_ptr<load_balancer> _lb;
Expand Down Expand Up @@ -92,6 +96,9 @@ public:
handle get_handle() {
return handle(_lb->next_cpu(), _lb);
}
handle get_handle(shard_id cpu) {
return handle(_lb->force_cpu(cpu), _lb);
}
};

class posix_data_source_impl final : public data_source_impl {
Expand Down Expand Up @@ -139,8 +146,9 @@ class posix_server_socket_impl : public server_socket_impl {
socket_address _sa;
pollable_fd _lfd;
conntrack _conntrack;
server_socket::load_balancing_algorithm _lba;
public:
explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {}
explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd, server_socket::load_balancing_algorithm lba) : _sa(sa), _lfd(std::move(lfd)), _lba(lba) {}
virtual future<connected_socket, socket_address> accept();
virtual void abort_accept() override;
};
Expand Down
8 changes: 0 additions & 8 deletions net/socket_defs.hh
Expand Up @@ -61,14 +61,6 @@ namespace net {
class inet_address;
}

struct listen_options {
transport proto = transport::TCP;
bool reuse_address = false;
listen_options(bool rua = false)
: reuse_address(rua)
{}
};

struct ipv4_addr {
uint32_t ip;
uint16_t port;
Expand Down
4 changes: 2 additions & 2 deletions rpc/rpc.cc
Expand Up @@ -917,11 +917,11 @@ namespace rpc {
thread_local std::unordered_map<streaming_domain_type, server*> server::_servers;

server::server(protocol_base* proto, ipv4_addr addr, resource_limits limits)
: server(proto, engine().listen(addr, listen_options(true)), limits, server_options{})
: server(proto, engine().listen(addr, listen_options{true}), limits, server_options{})
{}

server::server(protocol_base* proto, server_options opts, ipv4_addr addr, resource_limits limits)
: server(proto, engine().listen(addr, listen_options(true)), limits, opts)
: server(proto, engine().listen(addr, listen_options{true, opts.load_balancing_algorithm}), limits, opts)
{}

server::server(protocol_base* proto, server_socket ss, resource_limits limits, server_options opts)
Expand Down
1 change: 1 addition & 0 deletions rpc/rpc.hh
Expand Up @@ -111,6 +111,7 @@ struct server_options {
compressor::factory* compressor_factory = nullptr;
bool tcp_nodelay = true;
compat::optional<streaming_domain_type> streaming_domain;
server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_;
};

inline
Expand Down

0 comments on commit 71e914e

Please sign in to comment.