Skip to content

Commit

Permalink
Scheduler: new REUSEPORT balancing method/architecture.
Browse files Browse the repository at this point in the history
Starting from Linux Kernel 3.9, there is a new TCP/UDP socket option which allows
to bind same port and address from multiples threads (or any instance under the
same process context).

This patch implements the SO_REUSEPORT TCP Flag if the running Kernel is >= 3.9,
so on that mode each working thread create it own socket that bind the same address,
with this implementation we reduce the number of system calls involved when a new
connection arrives, avoid lookup the lowest loaded thread and also we allow the
Kernel to perform a better Scheduling on SMP systems that requires to scale.

If the detected Kernel is lower than 3.9, it will use the old Fair Balancing
mechanism.

Signed-off-by: Eduardo Silva <edsiper@gmail.com>
  • Loading branch information
edsiper committed Apr 27, 2014
1 parent 2787854 commit d1da249
Show file tree
Hide file tree
Showing 14 changed files with 134 additions and 22 deletions.
12 changes: 9 additions & 3 deletions plugins/liana/liana.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ int _mkp_network_io_bind(int socket_fd, const struct sockaddr *addr, socklen_t a

ret = bind(socket_fd, addr, addrlen);
if( ret == -1 ) {
perror("bind");
mk_warn("Error binding socket");
return ret;
}
Expand Down Expand Up @@ -211,7 +212,7 @@ int _mkp_network_io_bind(int socket_fd, const struct sockaddr *addr, socklen_t a
return ret;
}

int _mkp_network_io_server(int port, char *listen_addr)
int _mkp_network_io_server(int port, char *listen_addr, int reuse_port)
{
int socket_fd = -1;
int ret;
Expand Down Expand Up @@ -244,8 +245,14 @@ int _mkp_network_io_server(int port, char *listen_addr)

mk_api->socket_set_tcp_nodelay(socket_fd);
mk_api->socket_reset(socket_fd);
ret = _mkp_network_io_bind(socket_fd, rp->ai_addr, rp->ai_addrlen, MK_SOMAXCONN);

/* Check if reuse port can be enabled on this socket */
if (reuse_port == MK_TRUE &&
mk_kernel_runver >= MK_KERNEL_VERSION(3, 9, 0)) {
mk_api->socket_set_tcp_reuseport(socket_fd);
}

ret = _mkp_network_io_bind(socket_fd, rp->ai_addr, rp->ai_addrlen, MK_SOMAXCONN);
if(ret == -1) {
mk_err("Cannot listen on %s:%i\n", listen_addr, port);
continue;
Expand All @@ -259,4 +266,3 @@ int _mkp_network_io_server(int port, char *listen_addr)

return socket_fd;
}

3 changes: 2 additions & 1 deletion src/include/MKPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "mk_http.h"
#include "mk_file.h"
#include "mk_socket.h"
#include "mk_kernel.h"
#include "mk_macros.h"

/* global vars */
Expand Down Expand Up @@ -77,7 +78,7 @@ int MK_EXPORT _mkp_network_io_send_file(int socket_fd, int file_fd, off_t *file_
int MK_EXPORT _mkp_network_io_create_socket(int domain, int type, int protocol);
int MK_EXPORT _mkp_network_io_bind(int socket_fd, const struct sockaddr *addr,
socklen_t addrlen, int backlog);
int MK_EXPORT _mkp_network_io_server(int port, char *listen_addr);
int MK_EXPORT _mkp_network_io_server(int port, char *listen_addr, int reuse_port);
int MK_EXPORT _mkp_event_read(int sockfd);
int MK_EXPORT _mkp_event_write(int sockfd);
int MK_EXPORT _mkp_event_error(int sockfd);
Expand Down
3 changes: 2 additions & 1 deletion src/include/mk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#define O_NOATIME 01000000
#endif

#define M_DEFAULT_CONFIG_FILE "monkey.conf"
#define MK_DEFAULT_CONFIG_FILE "monkey.conf"
#define MK_DEFAULT_MIMES_CONF_FILE "monkey.mime"
#define MK_DEFAULT_PLUGIN_LOAD_CONF_FILE "plugins.load"
#define MK_DEFAULT_SITES_CONF_DIR "sites/"
Expand Down Expand Up @@ -86,6 +86,7 @@ struct server_config
int8_t fdt; /* is FDT enabled ? */
int8_t is_daemon;
int8_t is_seteuid;
int8_t scheduler_mode; /* Scheduler balancing mode */

char *serverconf; /* path to configuration files */
char *listen_addr;
Expand Down
2 changes: 1 addition & 1 deletion src/include/mk_epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct epoll_state_index

/* Monkey epoll calls */
int mk_epoll_create();
void *mk_epoll_init(int efd, int max_events);
void *mk_epoll_init(int server_fd, int efd, int max_events);
struct epoll_state *mk_epoll_state_get(int fd);

int mk_epoll_add(int efd, int fd, int mode, unsigned int behavior);
Expand Down
3 changes: 2 additions & 1 deletion src/include/mk_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ struct plugin_network_io
int (*send_file) (int, int, off_t *, size_t);
int (*create_socket) (int, int, int);
int (*bind) (int, const struct sockaddr *addr, socklen_t, int);
int (*server) (int, char *);
int (*server) (int, char *, int);
};

struct plugin
Expand Down Expand Up @@ -177,6 +177,7 @@ struct plugin_api
int (*socket_reset) (int);
int (*socket_set_tcp_fastopen) (int);
int (*socket_set_tcp_nodelay) (int);
int (*socket_set_tcp_reuseport) (int);
int (*socket_connect) (char *, int);
int (*socket_set_nonblocking) (int);
int (*socket_create) ();
Expand Down
20 changes: 20 additions & 0 deletions src/include/mk_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@
#define MK_SCHEDULER_CONN_PROCESS 1
#define MK_SCHEDULER_SIGNAL_DEADBEEF 0xDEADBEEF

/*
* Scheduler balancing mode:
*
* - Fair Balancing: use a single socket and upon accept
* new connections, lookup the less loaded thread and
* assign the socket to that specific epoll queue.
*
* - ReusePort: Use new Linux Kernel 3.14 feature that
* allows thread to share binded address on a lister
* socket. We let the Kernel to decide how to balance.
*/
#define MK_SCHEDULER_FAIR_BALANCING 0
#define MK_SCHEDULER_REUSEPORT 1

extern __thread struct rb_root *cs_list;

struct sched_connection
Expand All @@ -55,6 +69,9 @@ struct sched_list_node
unsigned long long accepted_connections;
unsigned long long closed_connections;

/* Just used on MK_SCHEDULER_REUSEPORT mode */
int server_fd;

/*
* Red-Black tree queue to perform fast lookup over
* the scheduler busy queue
Expand Down Expand Up @@ -93,6 +110,7 @@ struct sched_list_node *sched_list;
/* Struct under thread context */
typedef struct
{
int server_fd;
int epoll_fd;
int epoll_max_events;
int max_events;
Expand All @@ -104,6 +122,7 @@ typedef struct

pthread_key_t MK_EXPORT worker_sched_node;
extern pthread_mutex_t mutex_worker_init;
pthread_mutex_t mutex_port_init;

void mk_sched_init();
int mk_sched_launch_thread(int max_events, pthread_t *tout, mklib_ctx ctx);
Expand All @@ -129,6 +148,7 @@ void mk_sched_update_thread_status(struct sched_list_node *sched,

int mk_sched_check_timeouts(struct sched_list_node *sched);
int mk_sched_add_client(int remote_fd);
int mk_sched_add_client_reuseport(int remote_fd, struct sched_list_node *sched);
int mk_sched_register_client(int remote_fd, struct sched_list_node *sched);
int mk_sched_remove_client(struct sched_list_node *sched, int remote_fd);
struct sched_connection *mk_sched_get_connection(struct sched_list_node
Expand Down
7 changes: 6 additions & 1 deletion src/include/mk_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
#define SOCK_NONBLOCK 04000
#endif

#ifndef SO_REUSEPORT
#define SO_REUSEPORT 15
#endif

/*
* TCP_FASTOPEN: as this is a very new option in the Linux Kernel, the value is
* not yet exported and can be missing, lets make sure is available for all
Expand All @@ -52,14 +56,15 @@ int mk_socket_set_cork_flag(int fd, int state);
int mk_socket_set_tcp_fastopen(int sockfd);
int mk_socket_set_tcp_nodelay(int sockfd);
int mk_socket_set_tcp_defer_accept(int sockfd);
int mk_socket_set_tcp_reuseport(int sockfd);
int mk_socket_set_nonblocking(int sockfd);

int mk_socket_close(int socket);

int mk_socket_create(void);
int mk_socket_connect(char *host, int port);
int mk_socket_reset(int socket);
int mk_socket_server(int port, char *listen_addr);
int mk_socket_server(int port, char *listen_addr, int reuse_port);

int mk_socket_accept(int server_fd);
int mk_socket_sendv(int socket_fd, struct mk_iov *mk_io);
Expand Down
10 changes: 9 additions & 1 deletion src/mk_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,15 @@ void mk_config_set_init_values(void)
config->index_files = NULL;
config->user_dir = NULL;

/* TCP Auto Corking: only available on Linux >= 3.14.0 */
/* TCP REUSEPORT: available on Linux >= 3.9 */
if (mk_kernel_runver >= MK_KERNEL_VERSION(3, 9, 0)) {
config->scheduler_mode = MK_SCHEDULER_REUSEPORT;
}
else {
config->scheduler_mode = MK_SCHEDULER_FAIR_BALANCING;
}

/* TCP Auto Corking: only available on Linux >= 3.14 */
if (mk_kernel_runver >= MK_KERNEL_VERSION(3, 14, 0) &&
mk_socket_tcp_autocorking() == MK_TRUE) {
config->corking = MK_FALSE;
Expand Down
22 changes: 20 additions & 2 deletions src/mk_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "mk_utils.h"
#include "mk_macros.h"
#include "mk_linuxtrace.h"
#include "mk_scheduler.h"

static __thread struct epoll_state_index mk_epoll_state_k;

Expand Down Expand Up @@ -213,12 +214,12 @@ int mk_epoll_create()
return efd;
}

void *mk_epoll_init(int efd, int max_events)
void *mk_epoll_init(int server_fd, int efd, int max_events)
{
int i, fd, ret = -1;
int num_fds;
int fds_timeout;

int remote_fd;
struct epoll_event *events;
struct sched_list_node *sched;

Expand Down Expand Up @@ -254,6 +255,23 @@ void *mk_epoll_init(int efd, int max_events)
}
}

/* New connection under MK_SCHEDULER_REUSEPORT mode */
if (fd == server_fd) {
remote_fd = mk_socket_accept(server_fd);
if (mk_unlikely(remote_fd == -1)) {
#ifdef TRACE
MK_TRACE("Could not accept connection");
#endif
continue;
}
#ifdef TRACE
MK_TRACE("New connection arrived: FD %i", remote_fd);
#endif
/* Register new connection into the scheduler */
mk_sched_add_client_reuseport(remote_fd, sched);
mk_sched_register_client(remote_fd, sched);
fd = remote_fd;
}
ret = mk_conn_read(fd);
}
else if (events[i].events & EPOLLOUT) {
Expand Down
1 change: 1 addition & 0 deletions src/mk_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ void mk_plugin_init()
api->socket_connect = mk_socket_connect;
api->socket_reset = mk_socket_reset;
api->socket_set_tcp_fastopen = mk_socket_set_tcp_fastopen;
api->socket_set_tcp_reuseport = mk_socket_set_tcp_reuseport;
api->socket_set_tcp_nodelay = mk_socket_set_tcp_nodelay;
api->socket_set_nonblocking = mk_socket_set_nonblocking;
api->socket_create = mk_socket_create;
Expand Down
42 changes: 37 additions & 5 deletions src/mk_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ __thread struct rb_root *cs_list;

/*
* Returns the worker id which should take a new incomming connection,
* it returns the worker id with less active connections
* it returns the worker id with less active connections. Just used
* if config->scheduler_mode is MK_SCHEDULER_FAIR_BALANCING.
*/
static inline int _next_target()
{
Expand Down Expand Up @@ -206,7 +207,6 @@ int mk_sched_add_client(int remote_fd)
r = mk_epoll_add(sched->epoll_fd, remote_fd, MK_EPOLL_WRITE,
MK_EPOLL_LEVEL_TRIGGERED);

/* If epoll has failed, decrement the active connections counter */
if (mk_likely(r == 0)) {
sched->accepted_connections++;
}
Expand All @@ -215,6 +215,21 @@ int mk_sched_add_client(int remote_fd)
return r;
}

int mk_sched_add_client_reuseport(int remote_fd, struct sched_list_node *sched)
{
int r;

r = mk_epoll_add(sched->epoll_fd, remote_fd, MK_EPOLL_READ,
MK_EPOLL_LEVEL_TRIGGERED);

if (mk_likely(r == 0)) {
sched->accepted_connections++;
}

MK_LT_SCHED(remote_fd, "ADD_CLIENT_REUSEPORT");
return r;
}

/*
* Register a new client connection into the scheduler, this call takes place
* inside the worker/thread context.
Expand Down Expand Up @@ -282,7 +297,7 @@ static void mk_sched_thread_lists_init()
}

/* Register thread information. The caller thread is the thread information's owner */
static int mk_sched_register_thread(int efd)
static int mk_sched_register_thread(int server_fd, int efd)
{
unsigned int i;
struct sched_connection *sched_conn, *array;
Expand All @@ -298,6 +313,7 @@ static int mk_sched_register_thread(int efd)
sl = &sched_list[wid];
sl->idx = wid++;
sl->tid = pthread_self();
sl->server_fd = server_fd;

/*
* Under Linux does not exists the difference between process and
Expand Down Expand Up @@ -337,6 +353,7 @@ static int mk_sched_register_thread(int efd)
void *mk_sched_launch_worker_loop(void *thread_conf)
{
int ret;
int server_fd = -1;
char *thread_name = 0;
unsigned long len;
sched_thread_conf *thconf = thread_conf;
Expand All @@ -354,7 +371,15 @@ void *mk_sched_launch_worker_loop(void *thread_conf)
mk_cache_thread_init();

/* Register working thread */
wid = mk_sched_register_thread(thconf->epoll_fd);
if (config->scheduler_mode == MK_SCHEDULER_REUSEPORT) {
pthread_mutex_lock(&mutex_port_init);
server_fd = mk_socket_server(config->serverport,
config->listen_addr,
MK_TRUE);
pthread_mutex_unlock(&mutex_port_init);
}
wid = mk_sched_register_thread(server_fd, thconf->epoll_fd);


/* Plugin thread context calls */
mk_epoll_state_init();
Expand All @@ -376,6 +401,13 @@ void *mk_sched_launch_worker_loop(void *thread_conf)
exit(EXIT_FAILURE);
}


if (server_fd > 0) {
event.data.fd = server_fd;
event.events = EPOLLIN;
ret = epoll_ctl(thinfo->epoll_fd, EPOLL_CTL_ADD, server_fd, &event);
}

/*
* ULONG_MAX BUG test only
* =======================
Expand Down Expand Up @@ -403,7 +435,7 @@ void *mk_sched_launch_worker_loop(void *thread_conf)
__builtin_prefetch(&worker_sched_node);

/* Init epoll_wait() loop */
mk_epoll_init(thinfo->epoll_fd, epoll_max_events);
mk_epoll_init(thinfo->server_fd, thinfo->epoll_fd, epoll_max_events);

return 0;
}
Expand Down
5 changes: 5 additions & 0 deletions src/mk_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ void mk_server_loop(int server_fd)
int ret;
int remote_fd;

/*
*/
while (1) sleep(60);

/* Activate TCP_DEFER_ACCEPT */
if (mk_socket_set_tcp_defer_accept(server_fd) != 0) {
mk_warn("TCP_DEFER_ACCEPT failed");
Expand Down
11 changes: 8 additions & 3 deletions src/mk_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ int mk_socket_set_tcp_defer_accept(int sockfd)
return setsockopt(sockfd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &timeout, sizeof(int));
}

int mk_socket_set_tcp_reuseport(int sockfd)
{
int on = 1;
return setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));
}

int mk_socket_close(int socket)
{
return plg_netiomap->close(socket);
Expand Down Expand Up @@ -152,12 +158,11 @@ int mk_socket_reset(int socket)
}

/* Just IPv4 for now... */
int mk_socket_server(int port, char *listen_addr)
int mk_socket_server(int port, char *listen_addr, int reuse_port)
{
int socket_fd;

socket_fd = plg_netiomap->server(port, listen_addr);

socket_fd = plg_netiomap->server(port, listen_addr, reuse_port);
if (socket_fd < 0) {
exit(EXIT_FAILURE);
}
Expand Down
Loading

0 comments on commit d1da249

Please sign in to comment.