From 4017dc3a2443e2dc445b77bdb94a754071d043b4 Mon Sep 17 00:00:00 2001 From: Seudin Kasumovic Date: Sat, 25 Apr 2015 00:32:47 +0200 Subject: [PATCH] erlang: Reconnect after lost connection - ensure connect to Erlang node after lost connection or not connected on start --- modules/erlang/cnode.c | 83 +++++++++++++++++++++++++++++++++++++---- modules/erlang/cnode.h | 3 ++ modules/erlang/worker.c | 3 ++ 3 files changed, 82 insertions(+), 7 deletions(-) diff --git a/modules/erlang/cnode.c b/modules/erlang/cnode.c index b4f5f644008..6cc27866830 100644 --- a/modules/erlang/cnode.c +++ b/modules/erlang/cnode.c @@ -45,6 +45,7 @@ static io_wait_h io_h; cnode_handler_t *enode = NULL; +csockfd_handler_t *csocket_handler = NULL; /** * @brief Initialize Kamailo as C node by active connect as client. @@ -239,6 +240,8 @@ int init_cnode_sockets(int cnode_id) void cnode_main_loop(int cnode_id) { char _cnode_name[MAXNODELEN]; + str *enode_name; + int r; if (snprintf(_cnode_name, MAXNODELEN, "%.*s%d@%.*s", STR_FMT(&cnode_alivename), cnode_id+1,STR_FMT(&cnode_host)) >= MAXNODELEN) { @@ -255,50 +258,73 @@ void cnode_main_loop(int cnode_id) return; } + enode_name = erlang_nodename.s?&erlang_nodename:&erlang_node_sname; + /* main loop */ switch(io_h.poll_method){ case POLL_POLL: while(1){ - io_wait_loop_poll(&io_h, IO_LISTEN_TIMEOUT, 0); + r = io_wait_loop_poll(&io_h, IO_LISTEN_TIMEOUT, 0); + if(!r && enode_connect()) { + LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name)); + } } break; #ifdef HAVE_SELECT case POLL_SELECT: while(1){ - io_wait_loop_select(&io_h, IO_LISTEN_TIMEOUT, 0); + r = io_wait_loop_select(&io_h, IO_LISTEN_TIMEOUT, 0); + if(!r && enode_connect()) { + LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name)); + } } break; #endif #ifdef HAVE_SIGIO_RT case POLL_SIGIO_RT: while(1){ - io_wait_loop_sigio_rt(&io_h, IO_LISTEN_TIMEOUT); + r = io_wait_loop_sigio_rt(&io_h, IO_LISTEN_TIMEOUT); + if(!r && enode_connect()) { + LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name)); + } } break; #endif #ifdef HAVE_EPOLL case POLL_EPOLL_LT: while(1){ - io_wait_loop_epoll(&io_h, IO_LISTEN_TIMEOUT, 0); + r = io_wait_loop_epoll(&io_h, IO_LISTEN_TIMEOUT, 0); + if(!r && enode_connect()) { + LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name)); + } } break; case POLL_EPOLL_ET: while(1){ - io_wait_loop_epoll(&io_h, IO_LISTEN_TIMEOUT, 1); + r = io_wait_loop_epoll(&io_h, IO_LISTEN_TIMEOUT, 1); + if(!r && enode_connect()) { + LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name)); + } } break; #endif #ifdef HAVE_KQUEUE case POLL_KQUEUE: while(1){ - io_wait_loop_kqueue(&io_h, IO_LISTEN_TIMEOUT, 0); + r = io_wait_loop_kqueue(&io_h, IO_LISTEN_TIMEOUT, 0); + if(!r && enode_connect()) { + LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name)); + } } break; #endif #ifdef HAVE_DEVPOLL case POLL_DEVPOLL: while(1){ - io_wait_loop_devpoll(&io_h, IO_LISTEN_TIMEOUT, 0); + r = io_wait_loop_devpoll(&io_h, IO_LISTEN_TIMEOUT, 0); + if(!r && enode_connect()) { + LM_ERR("failed reconnect to %.*s\n",STR_FMT(enode_name)); + } } break; #endif @@ -486,6 +512,8 @@ int csockfd_init(csockfd_handler_t *phandler, const ei_cnode *ec) erl_set_nonblock(csockfd); + csocket_handler = phandler; + return 0; } @@ -512,3 +540,44 @@ int handle_csockfd(handler_common_t *phandler_t) return worker_init((worker_handler_t*)phandler->new,fd,&phandler_t->ec); } + +/* + * \brief Connect to Erlang node if not connected + */ +int enode_connect() { + + handler_common_t *phandler; + + if (!csocket_handler) { + return -1; + } + + if (enode) { + return 0; + } + + LM_DBG("not connected, trying to connect...\n"); + + phandler = (handler_common_t*)pkg_malloc(sizeof(cnode_handler_t)); + + if (!phandler) { + LM_CRIT("not enough memory\n"); + return -1; + } + + io_handler_ins(phandler); + + /* connect to remote Erlang node */ + if (cnode_connect_to((cnode_handler_t*)phandler,&csocket_handler->ec, erlang_nodename.s?&erlang_nodename:&erlang_node_sname)) { + /* continue even failed to connect, connection can be established + * from Erlang side too */ + io_handler_del(phandler); + } else if (io_watch_add(&io_h,phandler->sockfd,POLLIN,ERL_CNODE_H,phandler)){ + LM_CRIT("io_watch_add failed\n"); + erl_close_socket(phandler->sockfd); + io_handler_del(phandler); + return -1; + } + + return 0; +} diff --git a/modules/erlang/cnode.h b/modules/erlang/cnode.h index 5e9b45909e4..300f5eba3b4 100644 --- a/modules/erlang/cnode.h +++ b/modules/erlang/cnode.h @@ -84,6 +84,8 @@ typedef struct csockfd_handler_s ei_cnode ec; /* erlang C node (actually it's kamailio node) */ } csockfd_handler_t; +extern csockfd_handler_t *csocket_handler; + int init_cnode_sockets(int idx); void cnode_main_loop(int idx); @@ -95,6 +97,7 @@ int wait_cnode_tmo(handler_common_t *phandler_t); int destroy_cnode(handler_common_t *phandler_t); int cnode_connect_to(cnode_handler_t *phandler, ei_cnode *ec, const str *nodename ); +int enode_connect(); enum erl_handle_type { ERL_EPMD_H = 1, diff --git a/modules/erlang/worker.c b/modules/erlang/worker.c index 56b28fd47af..772dfdab414 100644 --- a/modules/erlang/worker.c +++ b/modules/erlang/worker.c @@ -56,6 +56,9 @@ int handle_worker(handler_common_t *phandler) eapi_t api; int rc; + /* ensure be connected */ + enode_connect(); + memset((void*)&msg,0,sizeof(msg)); /* Kamailio worker PID */