Skip to content

Commit

Permalink
OMHIREDIS::FIXED:: Correctly suspend module in case of failure
Browse files Browse the repository at this point in the history
  • Loading branch information
frikilax committed May 23, 2023
1 parent 2775f4f commit d59ba56
Showing 1 changed file with 48 additions and 5 deletions.
53 changes: 48 additions & 5 deletions contrib/omhiredis/omhiredis.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,38 @@ static rsRetVal initHiredis(wrkrInstanceData_t *pWrkrData, int bSilent)
RETiRet;
}

static rsRetVal isMaster(wrkrInstanceData_t *pWrkrData) {
DEFiRet;
redisReply *reply = NULL;

assert(pWrkrData->conn != NULL);

reply = redisCommand(pWrkrData->conn, "ROLE");
if (reply == NULL) {
DBGPRINTF("omhiredis: could not get reply from ROLE command\n");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else if (reply->type == REDIS_REPLY_ERROR) {
LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "omhiredis: got an error while querying role -> "
"%s\n", reply->str);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else if (reply->type != REDIS_REPLY_ARRAY || reply->element[0]->type != REDIS_REPLY_STRING) {
LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "omhiredis: did not get a proper reply from ROLE command");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else {
if (strncmp(reply->element[0]->str, "master", 6)) {
LogMsg(0, RS_RET_OK, LOG_WARNING, "omhiredis: current connected node is not a master");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
}

finalize_it:
free(reply);
RETiRet;
}

static rsRetVal writeHiredis(uchar* key, uchar *message, wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
Expand Down Expand Up @@ -248,8 +280,13 @@ static rsRetVal writeHiredis(uchar* key, uchar *message, wrkrInstanceData_t *pWr
* try to restablish our connection to redis */
BEGINtryResume
CODESTARTtryResume
if(pWrkrData->conn == NULL)
iRet = initHiredis(pWrkrData, 0);
closeHiredis(pWrkrData);
CHKiRet(initHiredis(pWrkrData, 0));
// Must get a master node for all modes, except 'publish'
if(pWrkrData->pData->mode != OMHIREDIS_MODE_PUBLISH) {
CHKiRet(isMaster(pWrkrData));
}
finalize_it:
ENDtryResume

/* begin a transaction.
Expand Down Expand Up @@ -289,13 +326,19 @@ CODESTARTendTransaction
redisReply *reply;
int i;
for ( i = 0; i < pWrkrData->count; i++ ) {
redisGetReply ( pWrkrData->conn, (void*)&reply);
if( pWrkrData->conn->err ){
if( REDIS_OK != redisGetReply( pWrkrData->conn, (void*)&reply) || pWrkrData->conn->err ) {
dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr);
LogError(0, RS_RET_REDIS_ERROR, "Error while processing replies: %s", pWrkrData->conn->errstr);
closeHiredis(pWrkrData);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else {
if (reply->type == REDIS_REPLY_ERROR) {
LogError(0, RS_RET_REDIS_ERROR, "Received error from redis -> %s", reply->str);
closeHiredis(pWrkrData);
freeReplyObject(reply);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
freeReplyObject(reply);
}
}
Expand Down Expand Up @@ -398,7 +441,7 @@ CODESTARTnewActInst
dbgprintf("omhiredis: using default RSYSLOG_ForwardFormat template\n");
CHKmalloc(pData->tplName = ustrdup("RSYSLOG_ForwardFormat"));
}
if (pData->expiration && strcmp(pData->modeDescription, "set")) {
if (pData->expiration && pData->mode != OMHIREDIS_MODE_SET) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: expiration set but mode is not "\
"'set', expiration will be ignored");
}
Expand Down

0 comments on commit d59ba56

Please sign in to comment.