Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Proposed fix for handling journal correctly
The fix is to immediately setup the inotify file descriptor via
`sd_journal_get_fd()` right after a journal open, and then
periodically call `sd_journal_process()` to give the client API
library a chance to detect deleted journal files on disk that need to
be closed so they can be properly erased by the file system.

We remove the open/close dance and simplify that code as a result.

Fixes issue #2436.
  • Loading branch information
portante committed Feb 1, 2018
1 parent f1191db commit 34b2d73
Showing 1 changed file with 34 additions and 37 deletions.
71 changes: 34 additions & 37 deletions plugins/imjournal/imjournal.c
Expand Up @@ -114,6 +114,9 @@ static const char *pidFieldName; /* read-only after startup */
static int bPidFallBack;
static ratelimit_t *ratelimiter = NULL;
static sd_journal *j;
static int j_inotify_fd;

#define J_PROCESS_PERIOD 1024 /* Call sd_journal_process() every 1,024 records */

static rsRetVal persistJournalState(void);
static rsRetVal loadJournalState(void);
Expand All @@ -126,6 +129,12 @@ static rsRetVal openJournal(void) {
LogError(-r, RS_RET_IO_ERROR, "imjournal: sd_journal_open() failed");
iRet = RS_RET_IO_ERROR;
}
if ((r = sd_journal_get_fd(j)) < 0) {
LogError(-r, RS_RET_IO_ERROR, "imjournal: sd_journal_get_fd() failed");
iRet = RS_RET_IO_ERROR;
} else {
j_inotify_fd = r;
}
RETiRet;
}

Expand All @@ -134,6 +143,7 @@ static void closeJournal(void) {
persistJournalState();
}
sd_journal_close(j);
j_inotify_fd = 0;
}


Expand Down Expand Up @@ -424,7 +434,7 @@ persistJournalState(void)
char *cursor;
int ret = 0;

/* On success, sd_journal_get_cursor() returns 1 in systemd
/* On success, sd_journal_get_cursor() returns 1 in systemd
197 or older and 0 in systemd 198 or newer */
if ((ret = sd_journal_get_cursor(j, &cursor)) >= 0) {
/* we create a temporary name by adding a ".tmp"
Expand Down Expand Up @@ -471,11 +481,8 @@ pollJournal(void)
struct pollfd pollfd;
int err; // journal error code to process
int pr = 0;
#ifdef NEW_JOURNAL
int jr = 0;
#endif

pollfd.fd = sd_journal_get_fd(j);
pollfd.fd = j_inotify_fd;
pollfd.events = sd_journal_get_events(j);
#ifdef NEW_JOURNAL
pr = poll(&pollfd, 1, POLL_TIMEOUT);
Expand All @@ -487,38 +494,19 @@ pollJournal(void)
/* EINTR is also received during termination
* so return now to check the term state.
*/
ABORT_FINALIZE(RS_RET_OK);
ABORT_FINALIZE(RS_RET_OK);
} else {
LogError(errno, RS_RET_ERR, "imjournal: poll() failed");
ABORT_FINALIZE(RS_RET_ERR);
}
}
#ifndef NEW_JOURNAL
assert(pr == 1);

pr = sd_journal_process(j);
err = pr;
if (pr < 0) {
#else
jr = sd_journal_process(j);

if (pr == 1 && jr == SD_JOURNAL_INVALIDATE) {
/* do not persist stateFile sd_journal_get_cursor will fail! */
char* tmp = cs.stateFile;
cs.stateFile = NULL;
closeJournal();
cs.stateFile = tmp;

CHKiRet(openJournal());

if(cs.stateFile != NULL){
iRet = loadJournalState(); // TODO: CHECK
}
LogMsg(0, RS_RET_OK, LOG_NOTICE, "imjournal: journal reloaded...");
} else if (jr < 0) {
err = jr;
#endif
LogError(err, RS_RET_ERR, "imjournal: sd_journal_process() failed");

err = sd_journal_process(j);
if (err < 0) {
LogError(-err, RS_RET_ERR, "imjournal: sd_journal_process() failed");
ABORT_FINALIZE(RS_RET_ERR);
}

Expand Down Expand Up @@ -567,7 +555,6 @@ loadJournalState(void)
cs.stateFile = new_stateFile;
}


if ((r_sf = fopen(cs.stateFile, "rb")) != NULL) {
char readCursor[128 + 1];
if (fscanf(r_sf, "%128s\n", readCursor) != EOF) {
Expand Down Expand Up @@ -642,7 +629,7 @@ tryRecover(void) {


BEGINrunInput
int count = 0;
uint64_t count = 0;
CODESTARTrunInput
CHKiRet(ratelimitNew(&ratelimiter, "imjournal", NULL));
dbgprintf("imjournal: ratelimiting burst %d, interval %d\n", cs.ratelimitBurst,
Expand All @@ -667,7 +654,6 @@ CODESTARTrunInput
"\"usepidfromsystem\" is depricated, use \"usepid\" instead");
}


if (cs.usePid && (strcmp(cs.usePid, "system") == 0)) {
pidFieldName = "_PID";
bPidFallBack = 0;
Expand Down Expand Up @@ -698,21 +684,32 @@ CODESTARTrunInput

if (r == 0) {
/* No new messages, wait for activity. */
if(pollJournal() != RS_RET_OK) {
if (pollJournal() != RS_RET_OK) {
tryRecover();
}
continue;
}

if(readjournal() != RS_RET_OK) {
if (readjournal() != RS_RET_OK) {
tryRecover();
continue;
}

count++;

if ((count % J_PROCESS_PERIOD) == 0) {
/* Give the journal a periodic chance to detect rotated journal files to be cleaned up. */
r = sd_journal_process(j);
if (r < 0) {
LogError(-r, RS_RET_ERR, "imjournal: sd_journal_process() failed");
tryRecover();
continue;
}
}

if (cs.stateFile) { /* can't persist without a state file */
/* TODO: This could use some finer metric. */
count++;
if (count == cs.iPersistStateInterval) {
count = 0;
if ((count % cs.iPersistStateInterval) == 0) {
persistJournalState();
}
}
Expand Down

0 comments on commit 34b2d73

Please sign in to comment.