From 2808d4805ee470ecf6611a4dbc2e42c25bc58cac Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Tue, 14 Apr 2015 11:14:29 +0530 Subject: [PATCH] Fixed race-condition in async-action that is to consume mutating messages (it optionally allows message to be copied, rather than just reffered-to, so async-queue has a copy which does not change). It is an optional-param, defaulted to current behaviour of not copying. This is not required for 'call ' statements, because they already copy message. As of now, this is relevant when foreach feeds messages to an async-action. --- action.c | 8 ++++++-- action.h | 1 + tests/testsuites/json_array_looping.conf | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/action.c b/action.c index 77062fc22c..669f7735ae 100644 --- a/action.c +++ b/action.c @@ -188,7 +188,8 @@ static struct cnfparamdescr cnfparamdescr[] = { { "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */ { "action.reportsuspension", eCmdHdlrBinary, 0 }, { "action.reportsuspensioncontinuation", eCmdHdlrBinary, 0 }, - { "action.resumeinterval", eCmdHdlrInt, 0 } + { "action.resumeinterval", eCmdHdlrInt, 0 }, + { "action.copymsg", eCmdHdlrBinary, 0 }, }; static struct cnfparamblk pblk = { CNFPARAMBLK_VERSION, @@ -355,6 +356,7 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->isTransactional = 0; pThis->bReportSuspension = -1; /* indicate "not yet set" */ pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */ + pThis->bCopyMsg = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pThis->iActionNbr = iActionNbr; pthread_mutex_init(&pThis->mutAction, NULL); @@ -1448,7 +1450,7 @@ doSubmitToActionQ(action_t * const pAction, wti_t * const pWti, msg_t *pMsg) } else {/* in this case, we do single submits to the queue. * TODO: optimize this, we may do at least a multi-submit! */ - iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); + iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, pAction->bCopyMsg ? MsgDup(pMsg) : MsgAddRef(pMsg)); } RETiRet; @@ -1676,6 +1678,8 @@ actionApplyCnfParam(action_t * const pAction, struct cnfparamvals * const pvals) pAction->bReportSuspension = (int) pvals[i].val.d.n; } else if(!strcmp(pblk.descr[i].name, "action.reportsuspensioncontinuation")) { pAction->bReportSuspensionCont = (int) pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.copymsg")) { + pAction->bCopyMsg = (int) pvals[i].val.d.n; } else if(!strcmp(pblk.descr[i].name, "action.resumeinterval")) { pAction->iResumeInterval = pvals[i].val.d.n; } else { diff --git a/action.h b/action.h index b5fc3e5744..0031dff9c5 100644 --- a/action.h +++ b/action.h @@ -49,6 +49,7 @@ struct action_s { sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */ sbool bDisabled; sbool isTransactional; + sbool bCopyMsg; int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */ time_t ttResumeRtry; /* when is it time to retry the resume? */ int iResumeInterval;/* resume interval for this action */ diff --git a/tests/testsuites/json_array_looping.conf b/tests/testsuites/json_array_looping.conf index dce0946385..1d1519b8ea 100644 --- a/tests/testsuites/json_array_looping.conf +++ b/tests/testsuites/json_array_looping.conf @@ -19,7 +19,7 @@ foreach ($.quux in $!foo) do { action(type="omfile" file="./rsyslog.out.log" template="quux") foreach ($.corge in $.quux!bar) do { reset $.grault = $.corge; - action(type="omfile" file="./rsyslog.out.async.log" template="grault" queue.type="linkedlist") + action(type="omfile" file="./rsyslog.out.async.log" template="grault" queue.type="linkedlist" action.copyMsg="on") call prefixed_writer if ($.garply != "") then set $.garply = $.garply & ", ";