Skip to content

Commit

Permalink
[Project] More work towards heartbeating logic implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vstakhov committed Sep 22, 2019
1 parent 261c549 commit 884a962
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/libserver/cfg_rcl.c
Expand Up @@ -2191,7 +2191,7 @@ rspamd_rcl_config_init (struct rspamd_config *cfg, GHashTable *skip_sections)
rspamd_rcl_add_default_handler (sub,
"heartbeats_loss_max",
rspamd_rcl_parse_struct_integer,
G_STRUCT_OFFSET (struct rspamd_config, heartbeat_interval),
G_STRUCT_OFFSET (struct rspamd_config, heartbeats_loss_max),
RSPAMD_CL_FLAG_INT_32,
"Maximum count of heartbeats to be lost before trying to "
"terminate a worker (default: 0 - disabled)");
Expand Down
33 changes: 22 additions & 11 deletions src/libserver/worker_util.c
Expand Up @@ -761,23 +761,23 @@ rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents)
-(wrk->hb.nbeats) >= rspamd_main->cfg->heartbeats_loss_max) {


if (-(wrk->hb.nbeats) >= rspamd_main->cfg->heartbeats_loss_max + 1) {
msg_err_main ("terminate worker type %s with pid %P, "
"last beat on: %s; %L heartbeat loast",
if (-(wrk->hb.nbeats) > rspamd_main->cfg->heartbeats_loss_max + 1) {
msg_err_main ("force kill worker type %s with pid %P, "
"last beat on: %s; %L heartbeat lost",
g_quark_to_string (wrk->type),
wrk->pid,
timebuf,
-(wrk->hb.nbeats));
kill (wrk->pid, SIGTERM);
kill (wrk->pid, SIGKILL);
}
else {
msg_err_main ("force kill worker type %s with pid %P, "
"last beat on: %s; %L heartbeat loast",
msg_err_main ("terminate worker type %s with pid %P, "
"last beat on: %s; %L heartbeat lost",
g_quark_to_string (wrk->type),
wrk->pid,
timebuf,
-(wrk->hb.nbeats));
kill (wrk->pid, SIGKILL);
kill (wrk->pid, SIGTERM);
}

}
Expand Down Expand Up @@ -1363,10 +1363,21 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main,

if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
/* Normal worker termination, do not fork one more */
msg_info_main ("%s process %P terminated normally",
g_quark_to_string (wrk->type),
wrk->pid);
need_refork = FALSE;

if (wrk->hb.nbeats < 0) {
msg_info_main ("%s process %P terminated normally, but lost %L "
"heartbeats, refork it",
g_quark_to_string (wrk->type),
wrk->pid,
-(wrk->hb.nbeats));
need_refork = TRUE;
}
else {
msg_info_main ("%s process %P terminated normally",
g_quark_to_string (wrk->type),
wrk->pid);
need_refork = FALSE;
}
}
else {
if (WIFSIGNALED (res)) {
Expand Down
8 changes: 8 additions & 0 deletions src/rspamd.c
Expand Up @@ -59,6 +59,7 @@
#ifdef HAVE_OPENSSL
#include <openssl/err.h>
#include <openssl/evp.h>
#include <src/libserver/rspamd_control.h>

#endif

Expand Down Expand Up @@ -1030,6 +1031,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
struct rspamd_worker *wrk)
{
gboolean need_refork;
static struct rspamd_control_command cmd;

/* Turn off locking for logger */
ev_child_stop (EV_A_ w);
Expand All @@ -1052,6 +1054,12 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
close (wrk->srv_pipe[0]);
}

cmd.type = RSPAMD_CONTROL_CHILD_CHANGE;
cmd.cmd.child_change.what = rspamd_child_terminated;
cmd.cmd.child_change.pid = wrk->pid;
cmd.cmd.child_change.additional = w->rstatus;
rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid);

REF_RELEASE (wrk->cf);

if (wrk->finish_actions) {
Expand Down
4 changes: 2 additions & 2 deletions src/rspamd.h
Expand Up @@ -78,8 +78,8 @@ typedef void (*rspamd_worker_term_cb) (EV_P_ ev_child *, struct rspamd_main *,

struct rspamd_worker_heartbeat {
ev_timer heartbeat_ev; /**< used by main for checking heartbeats and by workers to send heartbeats */
ev_tstamp last_event;
gint64 nbeats; /* positive for beats received, negative for beats missed */
ev_tstamp last_event; /**< last heartbeat received timestamp */
gint64 nbeats; /**< positive for beats received, negative for beats missed */
};

/**
Expand Down

0 comments on commit 884a962

Please sign in to comment.