diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 6aff21f441f5d..a01a8bde98a07 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -140,9 +140,13 @@ package journald // /* The close notification pipe was closed. */ // return 0; // } -// if (sd_journal_process(j) == SD_JOURNAL_APPEND) { +// switch (sd_journal_process(j)) { +// case SD_JOURNAL_APPEND: // /* Data, which we might care about, was appended. */ // return 1; +// case SD_JOURNAL_INVALIDATE: +// /* Journal files were added or removed. */ +// return 2; // } // } while ((fds[0].revents & POLLHUP) == 0); // return 0; @@ -251,7 +255,7 @@ drain: return cursor, done } -func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char { +func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, cmatch *C.char, untilUnixMicro uint64) (*C.sd_journal, *C.char) { s.mu.Lock() s.readers.readers[logWatcher] = logWatcher if s.closed { @@ -279,9 +283,37 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, } var done bool - cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro) + switch status { + case 1: + // Read new messages from the journal. + cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro) + case 2: + // Close this handle to the journal. + C.sd_journal_close(j) + j = nil + // Open a new handle to the journal. + rc := C.sd_journal_open(&j, C.int(0)) + if rc != 0 { + done = true + break + } + // Remove limits on the size of data items that we'll retrieve. + rc = C.sd_journal_set_data_threshold(j, C.size_t(0)) + if rc != 0 { + done = true + break + } + // Add a match to have the library do the searching for us. + rc = C.sd_journal_add_match(j, unsafe.Pointer(cmatch), C.strlen(cmatch)) + if rc != 0 { + done = true + break + } + default: + done = true + } - if status != 1 || done { + if done { // We were notified to stop break } @@ -305,7 +337,7 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor = <-newCursor } - return cursor + return j, cursor } func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { @@ -328,12 +360,12 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon // here, potentially while the goroutine that uses them is still // running. Otherwise, close them when we return from this function. following := false - defer func(pfollowing *bool) { - if !*pfollowing { + defer func() { + if following { close(logWatcher.Msg) } C.sd_journal_close(j) - }(&following) + }() // Remove limits on the size of data items that we'll retrieve. rc = C.sd_journal_set_data_threshold(j, C.size_t(0)) if rc != 0 { @@ -422,7 +454,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon if C.pipe(&pipes[0]) == C.int(-1) { logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe") } else { - cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro) + j, cursor = s.followJournal(logWatcher, j, pipes, cursor, cmatch, untilUnixMicro) // Let followJournal handle freeing the journal context // object and closing the channel. following = true