From 786b2645d38386a9a1771f7fbd6a151af22b5c64 Mon Sep 17 00:00:00 2001 From: Emmanuel Schmidbauer Date: Fri, 11 Jan 2019 12:47:07 -0500 Subject: [PATCH] janssonrpcc: add new parameter "keep_alive" - send periodic tcp keepalive packets to server in order to prevent connection from closing --- .../janssonrpcc/doc/janssonrpcc_admin.xml | 19 ++++- src/modules/janssonrpcc/janssonrpc_connect.c | 70 ++++++++++++++++++- src/modules/janssonrpcc/janssonrpc_server.h | 1 + src/modules/janssonrpcc/janssonrpc_srv.h | 1 + src/modules/janssonrpcc/janssonrpcc_mod.c | 16 +++++ 5 files changed, 104 insertions(+), 3 deletions(-) diff --git a/src/modules/janssonrpcc/doc/janssonrpcc_admin.xml b/src/modules/janssonrpcc/doc/janssonrpcc_admin.xml index 433a8771bed..0d7e78c7085 100644 --- a/src/modules/janssonrpcc/doc/janssonrpcc_admin.xml +++ b/src/modules/janssonrpcc/doc/janssonrpcc_admin.xml @@ -169,7 +169,24 @@ modparam("janssonrpcc", "server", "conn=user_db;addr=rpc.prod.exmaple.net;port=5 Set <varname>retry_codes</varname> parameter ... -modparam("janssonrpc-s", "retry_codes", "-32603, -32000..-32099"); +modparam("janssonrpcc", "retry_codes", "-32603, -32000..-32099"); +... + + + +
+ <varname>keep_alive</varname> (integer) + + number of seconds to send a tcp keep-alive to the server connection + + + Default is 0 (disabled) + + + Set <varname>keep_alive</varname> parameter + +... +modparam("janssonrpcc", "keep_alive", 10) ... diff --git a/src/modules/janssonrpcc/janssonrpc_connect.c b/src/modules/janssonrpcc/janssonrpc_connect.c index 83fd4530e89..0ba40f16438 100644 --- a/src/modules/janssonrpcc/janssonrpc_connect.c +++ b/src/modules/janssonrpcc/janssonrpc_connect.c @@ -27,6 +27,7 @@ #include #include #include +#include #include "../../core/sr_module.h" #include "../../core/route.h" @@ -44,6 +45,8 @@ #include "janssonrpc_server.h" #include "janssonrpc_connect.h" +unsigned int jsonrpc_keep_alive; + void wait_server_backoff(unsigned int timeout /* seconds */, jsonrpc_server_t* server, bool delay); @@ -79,6 +82,11 @@ void force_disconnect(jsonrpc_server_t* server) server->buffer = NULL; server->status = JSONRPC_SERVER_DISCONNECTED; + if (server->keep_alive_socket_fd >= 0) { + INFO("closing socket"); + close(server->keep_alive_socket_fd); + server->keep_alive_socket_fd = -1; + } // close bufferevent bev_disconnect(server->bev); @@ -245,6 +253,12 @@ void connect_failed(jsonrpc_server_t* server) bev_disconnect(server->bev); server->status = JSONRPC_SERVER_RECONNECTING; + // close socket + if (server->keep_alive_socket_fd >= 0) { + INFO("closing socket"); + close(server->keep_alive_socket_fd); + server->keep_alive_socket_fd = -1; + } wait_server_backoff(JSONRPC_RECONNECT_INTERVAL, server, true); } @@ -283,21 +297,73 @@ void bev_connect_cb(struct bufferevent* bev, short events, void* arg) connect_failed(server); } +int fd_is_valid(int fd) { + return fcntl(fd, F_GETFD) != -1 || errno != EBADF; +} + +int set_linger(int fd, int onoff, int linger) { + struct linger l = { .l_linger = linger, .l_onoff = onoff}; + int res = setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)); + assert(res == 0); + return res; +} + + +int set_keepalive(int fd, int keepalive, int cnt, int idle, int intvl) { + int res = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)); + assert(res == 0); + + res = setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &idle, sizeof(idle)); + assert(res == 0); + + res = setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(idle)); + assert(res == 0); + + res = setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &intvl, sizeof(intvl)); + assert(res == 0); + + return res; +} + void bev_connect(jsonrpc_server_t* server) { if(!server) { ERR("Trying to connect null server\n"); return; } + int fd = -1; + if (jsonrpc_keep_alive > 0) { + if (server->keep_alive_socket_fd > 0) { + fd = server->keep_alive_socket_fd; + } else { + INFO("setting up socket"); + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd <= 0) { + server->keep_alive_socket_fd = -1; + ERR("could not setup socket"); + } else { + server->keep_alive_socket_fd = fd; // track fd to close later + } + } + if (!fd_is_valid(fd)) { // make sure socket is valid + fd = -1; + server->keep_alive_socket_fd = -1; + } + } INFO("Connecting to server %.*s:%d for conn %.*s.\n", STR(server->addr), server->port, STR(server->conn)); + if (fd > 0) { + set_linger(fd, 1, 0); + set_keepalive(fd, 1, 1, jsonrpc_keep_alive, jsonrpc_keep_alive); + } + server->bev = bufferevent_socket_new( global_ev_base, - -1, + fd, BEV_OPT_CLOSE_ON_FREE); - if(!(server->bev)) { + if (!(server->bev)) { ERR("Could not create bufferevent for %.*s:%d\n", STR(server->addr), server->port); connect_failed(server); return; diff --git a/src/modules/janssonrpcc/janssonrpc_server.h b/src/modules/janssonrpcc/janssonrpc_server.h index 700695a85a9..b04e826ca53 100644 --- a/src/modules/janssonrpcc/janssonrpc_server.h +++ b/src/modules/janssonrpcc/janssonrpc_server.h @@ -45,6 +45,7 @@ typedef struct jsonrpc_server { unsigned int req_count; unsigned int priority, weight; bool added; + int keep_alive_socket_fd; struct bufferevent* bev; /* local mem */ netstring_t* buffer; } jsonrpc_server_t; diff --git a/src/modules/janssonrpcc/janssonrpc_srv.h b/src/modules/janssonrpcc/janssonrpc_srv.h index 79c289fd854..711cb4da6fd 100644 --- a/src/modules/janssonrpcc/janssonrpc_srv.h +++ b/src/modules/janssonrpcc/janssonrpc_srv.h @@ -41,6 +41,7 @@ typedef struct srv_cb_params { extern jsonrpc_srv_t* global_srv_list; extern unsigned int jsonrpc_min_srv_ttl; +extern unsigned int jsonrpc_keep_alive; jsonrpc_srv_t* create_srv(str srv, str conn, unsigned int ttl); void addto_srv_list(jsonrpc_srv_t* srv, jsonrpc_srv_t** list); diff --git a/src/modules/janssonrpcc/janssonrpcc_mod.c b/src/modules/janssonrpcc/janssonrpcc_mod.c index 6c71b363616..9076f4a5435 100644 --- a/src/modules/janssonrpcc/janssonrpcc_mod.c +++ b/src/modules/janssonrpcc/janssonrpcc_mod.c @@ -51,6 +51,7 @@ void mod_destroy(void); int parse_server_param(modparam_t type, void* val); int parse_retry_codes_param(modparam_t type, void* val); int parse_min_ttl_param(modparam_t type, void* val); +int parse_keep_alive_param(modparam_t type, void* val); static int fixup_req(void** param, int param_no); static int fixup_req_free(void** param, int param_no); static int fixup_notify(void** param, int param_no); @@ -100,6 +101,7 @@ static param_export_t mod_params[]={ {"retry_codes", STR_PARAM|USE_FUNC_PARAM, (void*)parse_retry_codes_param}, {"min_srv_ttl", INT_PARAM|USE_FUNC_PARAM, (void*)parse_min_ttl_param}, {"result_pv", STR_PARAM, &result_pv_str.s}, + {"keep_alive", INT_PARAM|USE_FUNC_PARAM, (void*)parse_keep_alive_param}, { 0,0,0 } }; @@ -333,6 +335,20 @@ int parse_min_ttl_param(modparam_t type, void* val) return 0; } +int parse_keep_alive_param(modparam_t type, void* val) +{ + if (PARAM_TYPE_MASK(type) != INT_PARAM) { + ERR("keep_alive must be of type %d, not %d!\n", INT_PARAM, type); + return -1; + } + jsonrpc_keep_alive = (int)(long)val; + if (jsonrpc_keep_alive < 0) { + jsonrpc_keep_alive = 0; + } + INFO("jsonrpc_keep_alive set to %d\n", jsonrpc_keep_alive); + return 0; +} + /* Fixup Functions */ static int fixup_req(void** param, int param_no)