From 426f979f84c055e5b8866b998c7e74f4dcfb5f92 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 8 Aug 2018 14:53:52 +0300 Subject: [PATCH 1/4] net: remove listen_options constructor listen_option can be initialized with list initialization "listen_option{val}" just as well. --- net/socket_defs.hh | 5 +---- rpc/rpc.cc | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/net/socket_defs.hh b/net/socket_defs.hh index 0822ee08a1a..b3abc7411e1 100644 --- a/net/socket_defs.hh +++ b/net/socket_defs.hh @@ -62,11 +62,8 @@ class inet_address; } struct listen_options { - transport proto = transport::TCP; bool reuse_address = false; - listen_options(bool rua = false) - : reuse_address(rua) - {} + transport proto = transport::TCP; }; struct ipv4_addr { diff --git a/rpc/rpc.cc b/rpc/rpc.cc index cf6d4c424e3..7fac51f261d 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -917,11 +917,11 @@ namespace rpc { thread_local std::unordered_map server::_servers; server::server(protocol_base* proto, ipv4_addr addr, resource_limits limits) - : server(proto, engine().listen(addr, listen_options(true)), limits, server_options{}) + : server(proto, engine().listen(addr, listen_options{true}), limits, server_options{}) {} server::server(protocol_base* proto, server_options opts, ipv4_addr addr, resource_limits limits) - : server(proto, engine().listen(addr, listen_options(true)), limits, opts) + : server(proto, engine().listen(addr, listen_options{true}), limits, opts) {} server::server(protocol_base* proto, server_socket ss, resource_limits limits, server_options opts) From f51eea8e9cefdb14bc1bed1f01bc48557bc2d8fd Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Sun, 5 Aug 2018 13:39:05 +0300 Subject: [PATCH 2/4] posix-stack: Introduce new, port based, load balancing algorithm for connections New algorithm distributes connection according to peer's port address. Designated shard is determined by ip_port modulo smp. This allow client to connect to specific shard by choosing its local port carefully. --- doc/network-connection-load-balancing.md | 42 ++++++++++++++++++++++++ net/api.hh | 17 ++++++++++ net/posix-stack.cc | 7 ++-- net/posix-stack.hh | 10 +++++- net/socket_defs.hh | 5 --- 5 files changed, 72 insertions(+), 9 deletions(-) create mode 100644 doc/network-connection-load-balancing.md diff --git a/doc/network-connection-load-balancing.md b/doc/network-connection-load-balancing.md new file mode 100644 index 00000000000..45b9d1660d7 --- /dev/null +++ b/doc/network-connection-load-balancing.md @@ -0,0 +1,42 @@ +# Motivation + +In sharded systems like seastar it is important for work to be +distributed equally between all shards to achieve maximum performance +from the system. Networking subsystem has its part in distributing work +equally. For instance if on a server all connections will be served by +single shard only, the system will be working with the speed of this +one shard and all other shards will be underutilized. + +# Common ways to distribute work received over network between shards + +Two common ways to distribute work between shards are: + - do the work at a shard that received it + - shard that does actual work depends on a data been processed + (one way to do it is to hash(data) % smp_count = shard) + +# Load Balancing + +Those two ways asks for different strategy to distribute connections +between shards. The first one will work best if each cpu will have the +same amount of connections (assuming each connection gets same amount of +works) the second will work best if data will arrive to a shard where +it is going to be processed and actual connection distribution does +not matter. + +Seastar's posix stack supports both of those strategies. Desired +one can be chosen by specifying load balancing algorithm in +listen_options provided to reactor::listen() call. Available options +are: + +- load_balancing_algorithm::connection_distribution + + Make sure that new connection will be placed to a shard with smallest + amount of connections of the same type. + +- load_balancing_algorithm::port + + Destination shard is chosen as a function of client's local port: + shard = port_number % num_shards. This allows a client to make sure that + a connection will be processed by a specific shard by choosing its local + port accordingly (the knowledge about amount of shards in the server is + needed and can be negotiated by different channel). diff --git a/net/api.hh b/net/api.hh index 60fb186ff3d..6c4a56cc0b3 100644 --- a/net/api.hh +++ b/net/api.hh @@ -243,6 +243,17 @@ class server_socket { std::unique_ptr _ssi; bool _aborted = false; public: + enum class load_balancing_algorithm { + // This algorithm tries to distribute all connections equally between all shards. + // It does this by sending new connections to a shard with smallest amount of connections. + connection_distribution, + // This algorithm distributes new connection based on peer's tcp port. Destination shard + // is calculated as a port number modulo number of shards. This allows a client to connect + // to a specific shard in a server given it knows how many shards server has by choosing + // src port number accordingly. + port, + default_ = connection_distribution + }; /// Constructs a \c server_socket not corresponding to a connection server_socket(); /// \cond internal @@ -271,6 +282,12 @@ public: }; /// @} +struct listen_options { + bool reuse_address = false; + server_socket::load_balancing_algorithm lba = server_socket::load_balancing_algorithm::default_; + transport proto = transport::TCP; +}; + class network_stack { public: virtual ~network_stack() {} diff --git a/net/posix-stack.cc b/net/posix-stack.cc index 36df944d33e..2b4b93cd6d5 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -187,7 +187,8 @@ template future posix_server_socket_impl::accept() { return _lfd.accept().then([this] (pollable_fd fd, socket_address sa) { - auto cth = _conntrack.get_handle(); + auto cth = _lba == server_socket::load_balancing_algorithm::connection_distribution ? + _conntrack.get_handle() : _conntrack.get_handle(ntoh(sa.as_posix_sockaddr_in().sin_port) % smp::count); auto cpu = cth.cpu(); if (cpu == engine().cpu_id()) { std::unique_ptr csi( @@ -334,12 +335,12 @@ posix_network_stack::listen(socket_address sa, listen_options opt) { return _reuseport ? server_socket(std::make_unique(sa, engine().posix_listen(sa, opt))) : - server_socket(std::make_unique(sa, engine().posix_listen(sa, opt))); + server_socket(std::make_unique(sa, engine().posix_listen(sa, opt), opt.lba)); } else { return _reuseport ? server_socket(std::make_unique(sa, engine().posix_listen(sa, opt))) : - server_socket(std::make_unique(sa, engine().posix_listen(sa, opt))); + server_socket(std::make_unique(sa, engine().posix_listen(sa, opt), opt.lba)); } } diff --git a/net/posix-stack.hh b/net/posix-stack.hh index bc1f39b87de..757abb11dd6 100644 --- a/net/posix-stack.hh +++ b/net/posix-stack.hh @@ -54,6 +54,10 @@ class conntrack { _cpu_load[cpu]++; return cpu; } + shard_id force_cpu(shard_id cpu) { + _cpu_load[cpu]++; + return cpu; + } }; lw_shared_ptr _lb; @@ -92,6 +96,9 @@ public: handle get_handle() { return handle(_lb->next_cpu(), _lb); } + handle get_handle(shard_id cpu) { + return handle(_lb->force_cpu(cpu), _lb); + } }; class posix_data_source_impl final : public data_source_impl { @@ -139,8 +146,9 @@ class posix_server_socket_impl : public server_socket_impl { socket_address _sa; pollable_fd _lfd; conntrack _conntrack; + server_socket::load_balancing_algorithm _lba; public: - explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {} + explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd, server_socket::load_balancing_algorithm lba) : _sa(sa), _lfd(std::move(lfd)), _lba(lba) {} virtual future accept(); virtual void abort_accept() override; }; diff --git a/net/socket_defs.hh b/net/socket_defs.hh index b3abc7411e1..da26006acae 100644 --- a/net/socket_defs.hh +++ b/net/socket_defs.hh @@ -61,11 +61,6 @@ namespace net { class inet_address; } -struct listen_options { - bool reuse_address = false; - transport proto = transport::TCP; -}; - struct ipv4_addr { uint32_t ip; uint16_t port; From 4fc3c402bcf643d831f1ab9ffef68303bea5aade Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 6 Aug 2018 13:15:59 +0300 Subject: [PATCH 3/4] posix-stack: Choose local port based on the shard number connect() runs on If local address provided to connect() has wildcard port try to use a port number that satisfy equation: port modulo number_of_shards = current_shard. If a server uses port based load balancing and has the same amount of shards as a client it will result in connection going to the same shard number in the server, otherwise no harm will be done. Connecting to the same shard number is beneficial for applications that distribute work based on a hash of the data been processed since in such systems this will eliminate internal cross-cpu hop from the pipeline. --- net/posix-stack.cc | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/net/posix-stack.cc b/net/posix-stack.cc index 2b4b93cd6d5..fd7e129eeb6 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -19,6 +19,7 @@ * Copyright (C) 2014 Cloudius Systems, Ltd. */ +#include #include "posix-stack.hh" #include "net.hh" #include "packet.hh" @@ -154,12 +155,33 @@ using posix_connected_sctp_socket_impl = posix_connected_socket_impl _fd; + + future<> find_port_and_connect(socket_address sa, socket_address local, transport proto = transport::TCP) { + static thread_local std::default_random_engine random_engine{std::random_device{}()}; + static thread_local std::uniform_int_distribution u(49152/smp::count + 1, 65535/smp::count - 1); + return repeat([this, sa, local, proto, attempts = 0, requested_port = ntoh(local.as_posix_sockaddr_in().sin_port)] () mutable { + uint16_t port = attempts++ < 5 && requested_port == 0 && proto == transport::TCP ? u(random_engine) * smp::count + engine().cpu_id() : requested_port; + local.as_posix_sockaddr_in().sin_port = hton(port); + return futurize_apply([this, sa, local] { return engine().posix_connect(_fd, sa, local); }).then_wrapped([] (future<> f) { + try { + f.get(); + return stop_iteration::yes; + } catch (std::system_error& err) { + if (err.code().value() == EADDRINUSE) { + return stop_iteration::no; + } + throw; + } + }); + }); + } + public: posix_socket_impl() = default; virtual future connect(socket_address sa, socket_address local, transport proto = transport::TCP) override { _fd = engine().make_pollable_fd(sa, proto); - return engine().posix_connect(_fd, sa, local).then([fd = _fd, proto]() mutable { + return find_port_and_connect(sa, local, proto).then([fd = _fd, proto] () mutable { std::unique_ptr csi; if (proto == transport::TCP) { csi.reset(new posix_connected_tcp_socket_impl(std::move(fd))); From 8c58cd7c88f673e43aea498353500841d3bdeab4 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Sun, 5 Aug 2018 18:16:53 +0300 Subject: [PATCH 4/4] rpc: provide and option to enable port based load balancing for rpc server --- rpc/rpc.cc | 2 +- rpc/rpc.hh | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 7fac51f261d..053b175c096 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -921,7 +921,7 @@ namespace rpc { {} server::server(protocol_base* proto, server_options opts, ipv4_addr addr, resource_limits limits) - : server(proto, engine().listen(addr, listen_options{true}), limits, opts) + : server(proto, engine().listen(addr, listen_options{true, opts.load_balancing_algorithm}), limits, opts) {} server::server(protocol_base* proto, server_socket ss, resource_limits limits, server_options opts) diff --git a/rpc/rpc.hh b/rpc/rpc.hh index 46b670fea23..f7925968950 100644 --- a/rpc/rpc.hh +++ b/rpc/rpc.hh @@ -111,6 +111,7 @@ struct server_options { compressor::factory* compressor_factory = nullptr; bool tcp_nodelay = true; compat::optional streaming_domain; + server_socket::load_balancing_algorithm load_balancing_algorithm = server_socket::load_balancing_algorithm::default_; }; inline