Permalink
Browse files

imjournal: add ratelimiting capability

The original imjournal code did not support ratelimiting at all. We
now have our own ratelimiter. This can mitigate against journal
database corruption, when the journal re-sends old data. This is a
current bug in systemd journal, but we won't outrule this to happen
in the future again. So it is better to have a safeguard in place.
By default, we permit 20,000 messages witin 10 minutes. This may
be a bit restrictive, but given the risk potential it seems reasonable.
Users requiring larger traffic flows can always adjust the value.
  • Loading branch information...
1 parent f0f1a53 commit 9057b1ccfb0a7fb51eb9b8d6c9dfbd81547a7754 @rgerhards committed Jun 15, 2013
Showing with 32 additions and 8 deletions.
  1. +9 −0 ChangeLog
  2. +23 −8 plugins/imjournal/imjournal.c
View
@@ -1,5 +1,14 @@
---------------------------------------------------------------------------
Version 7.4.1 [v7.4-stable] 2013-06-??
+- imjournal: add ratelimiting capability
+ The original imjournal code did not support ratelimiting at all. We
+ now have our own ratelimiter. This can mitigate against journal
+ database corruption, when the journal re-sends old data. This is a
+ current bug in systemd journal, but we won't outrule this to happen
+ in the future again. So it is better to have a safeguard in place.
+ By default, we permit 20,000 messages witin 10 minutes. This may
+ be a bit restrictive, but given the risk potential it seems reasonable.
+ Users requiring larger traffic flows can always adjust the value.
- bugfix: potential loop in rate limiting
if the message that tells about rate-limiting gets rate-limited itself,
it will potentially create and endless loop
@@ -3,7 +3,7 @@
* To test under Linux:
* emmit log message into systemd journal
*
- * Copyright (C) 2008-2012 Adiscon GmbH
+ * Copyright (C) 2008-2013 Adiscon GmbH
*
* This file is part of rsyslog.
*
@@ -33,6 +33,7 @@
#include <sys/poll.h>
#include <sys/socket.h>
#include <errno.h>
+#include <systemd/sd-journal.h>
#include "dirty.h"
#include "cfsysline.h"
@@ -47,7 +48,7 @@
#include "errmsg.h"
#include "srUtils.h"
#include "unicode-helper.h"
-#include <systemd/sd-journal.h>
+#include "ratelimit.h"
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
@@ -64,11 +65,15 @@ DEFobjCurrIf(errmsg)
static struct configSettings_s {
char *stateFile;
int iPersistStateInterval;
+ int ratelimitInterval;
+ int ratelimitBurst;
} cs;
-/* module-gloval parameters */
+/* module-global parameters */
static struct cnfparamdescr modpdescr[] = {
{ "statefile", eCmdHdlrGetWord, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 },
{ "persiststateinterval", eCmdHdlrInt, 0 }
};
static struct cnfparamblk modpblk =
@@ -84,6 +89,7 @@ static int bLegacyCnfModGlobalsPermitted = 0;/* are legacy module-global config
static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */
static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */
+static ratelimit_t *ratelimiter = NULL;
static sd_journal *j;
/* enqueue the the journal message into the message queue.
@@ -121,7 +127,7 @@ enqMsg(uchar *msg, uchar *pszTag, int iFacility, int iSeverity, struct timeval *
msgAddJSON(pMsg, (uchar*)"!", json);
}
- CHKiRet(submitMsg2(pMsg));
+ CHKiRet(ratelimitAddMsg(ratelimiter, NULL, pMsg));
finalize_it:
RETiRet;
@@ -249,7 +255,7 @@ readjournal() {
if (equal_sign == NULL) {
errmsg.LogError(0, RS_RET_ERR,"SD_JOURNAL_FOREACH_DATA()"
" returned a malformed field (has no '='): '%s'",
- get);
+ (char*)get);
continue; /* skip the entry */
}
@@ -486,14 +492,16 @@ loadJournalState()
BEGINrunInput
CODESTARTrunInput
- /* this is an endless loop - it is terminated when the thread is
- * signalled to do so. This, however, is handled by the framework.
- */
+ CHKiRet(ratelimitNew(&ratelimiter, "imjournal", NULL));
+ ratelimitSetLinuxLike(ratelimiter, cs.ratelimitInterval, cs.ratelimitBurst);
if (cs.stateFile) {
CHKiRet(loadJournalState());
}
+ /* this is an endless loop - it is terminated when the thread is
+ * signalled to do so. This, however, is handled by the framework.
+ */
while (glbl.GetGlobalInputTermState() == 0) {
int count = 0, r;
@@ -534,6 +542,8 @@ CODESTARTbeginCnfLoad
cs.iPersistStateInterval = DFLT_persiststateinterval;
cs.stateFile = NULL;
+ cs.ratelimitBurst = 20000;
+ cs.ratelimitInterval = 600;
ENDbeginCnfLoad
@@ -578,6 +588,7 @@ ENDafterRun
BEGINmodExit
CODESTARTmodExit
+ ratelimitDestruct(ratelimiter);
if(pInputName != NULL)
prop.Destruct(&pInputName);
if(pLocalHostIP != NULL)
@@ -615,6 +626,10 @@ CODESTARTsetModCnf
cs.iPersistStateInterval = (int) pvals[i].val.d.n;
} else if (!strcmp(modpblk.descr[i].name, "statefile")) {
cs.stateFile = (char *)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(modpblk.descr[i].name, "ratelimit.burst")) {
+ cs.ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "ratelimit.interval")) {
+ cs.ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imjournal: program error, non-handled "
"param '%s' in beginCnfLoad\n", modpblk.descr[i].name);

0 comments on commit 9057b1c

Please sign in to comment.