Skip to content

Commit

Permalink
Fixed race-condition in async-action that is to consume mutating mess…
Browse files Browse the repository at this point in the history
…ages (it optionally allows message to be copied, rather than just reffered-to, so async-queue has a copy which does not change). It is an optional-param, defaulted to current behaviour of not copying.

This is not required for 'call <ruleset>' statements, because they already copy message.

As of now, this is relevant when foreach feeds messages to an async-action.
  • Loading branch information
janmejay committed Apr 14, 2015
1 parent c1419bc commit 2808d48
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
8 changes: 6 additions & 2 deletions action.c
Expand Up @@ -188,7 +188,8 @@ static struct cnfparamdescr cnfparamdescr[] = {
{ "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */
{ "action.reportsuspension", eCmdHdlrBinary, 0 },
{ "action.reportsuspensioncontinuation", eCmdHdlrBinary, 0 },
{ "action.resumeinterval", eCmdHdlrInt, 0 }
{ "action.resumeinterval", eCmdHdlrInt, 0 },
{ "action.copymsg", eCmdHdlrBinary, 0 },
};
static struct cnfparamblk pblk =
{ CNFPARAMBLK_VERSION,
Expand Down Expand Up @@ -355,6 +356,7 @@ rsRetVal actionConstruct(action_t **ppThis)
pThis->isTransactional = 0;
pThis->bReportSuspension = -1; /* indicate "not yet set" */
pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */
pThis->bCopyMsg = 0;
pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */
pThis->iActionNbr = iActionNbr;
pthread_mutex_init(&pThis->mutAction, NULL);
Expand Down Expand Up @@ -1448,7 +1450,7 @@ doSubmitToActionQ(action_t * const pAction, wti_t * const pWti, msg_t *pMsg)
} else {/* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, pAction->bCopyMsg ? MsgDup(pMsg) : MsgAddRef(pMsg));
}

RETiRet;
Expand Down Expand Up @@ -1676,6 +1678,8 @@ actionApplyCnfParam(action_t * const pAction, struct cnfparamvals * const pvals)
pAction->bReportSuspension = (int) pvals[i].val.d.n;
} else if(!strcmp(pblk.descr[i].name, "action.reportsuspensioncontinuation")) {
pAction->bReportSuspensionCont = (int) pvals[i].val.d.n;
} else if(!strcmp(pblk.descr[i].name, "action.copymsg")) {
pAction->bCopyMsg = (int) pvals[i].val.d.n;
} else if(!strcmp(pblk.descr[i].name, "action.resumeinterval")) {
pAction->iResumeInterval = pvals[i].val.d.n;
} else {
Expand Down
1 change: 1 addition & 0 deletions action.h
Expand Up @@ -49,6 +49,7 @@ struct action_s {
sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */
sbool bDisabled;
sbool isTransactional;
sbool bCopyMsg;
int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */
time_t ttResumeRtry; /* when is it time to retry the resume? */
int iResumeInterval;/* resume interval for this action */
Expand Down
2 changes: 1 addition & 1 deletion tests/testsuites/json_array_looping.conf
Expand Up @@ -19,7 +19,7 @@ foreach ($.quux in $!foo) do {
action(type="omfile" file="./rsyslog.out.log" template="quux")
foreach ($.corge in $.quux!bar) do {
reset $.grault = $.corge;
action(type="omfile" file="./rsyslog.out.async.log" template="grault" queue.type="linkedlist")
action(type="omfile" file="./rsyslog.out.async.log" template="grault" queue.type="linkedlist" action.copyMsg="on")
call prefixed_writer
if ($.garply != "") then
set $.garply = $.garply & ", ";
Expand Down

0 comments on commit 2808d48

Please sign in to comment.