Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to avoid getting stuck when draining the journal #298

Merged
152 changes: 135 additions & 17 deletions daemon/logger/journald/read.go
Expand Up @@ -140,7 +140,9 @@ package journald
// /* The close notification pipe was closed. */
// return 0;
// }
// if (sd_journal_process(j) == SD_JOURNAL_APPEND) {
// switch (sd_journal_process(j)) {
// case SD_JOURNAL_APPEND:
// case SD_JOURNAL_INVALIDATE:
// /* Data, which we might care about, was appended. */
// return 1;
// }
Expand Down Expand Up @@ -169,28 +171,95 @@ func (s *journald) Close() error {
return nil
}

func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) *C.char {
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) (*C.char, bool) {
var msg, data, cursor *C.char
var length C.size_t
var stamp C.uint64_t
var priority, partial C.int

// Give the journal handle an opportunity to close any open descriptors
// for files that have been removed.
C.sd_journal_process(j)

// Seek to the location of the last entry that we sent.
if oldCursor != nil {
// We know which entry was read last, so try to go to that
// location.
rc := C.sd_journal_seek_cursor(j, oldCursor)
if rc != 0 {
return oldCursor, false
}
// Go forward to the first unsent message.
rc = C.sd_journal_next(j)
if rc < 0 {
return oldCursor, false
}
// We want to avoid sending a given entry twice (or more), so
// attempt to advance to the first unread entry in the journal
// so long as "this" one matches the last entry that we read.
for C.sd_journal_test_cursor(j, oldCursor) > 0 {
if C.sd_journal_next(j) <= 0 {
return oldCursor, false
}
}
}

// Walk the journal from here forward until we run out of new entries.
drain:
sent := uint64(0)
eof := false
for {
// Try not to send a given entry twice.
if oldCursor != nil {
for C.sd_journal_test_cursor(j, oldCursor) > 0 {
if C.sd_journal_next(j) <= 0 {
break drain
// If we're not keeping up with journald writing to the journal, some of the
// files between where we are and "now" may have been deleted since we started
// walking the set of entries. If that's happened, the inotify descriptor in
// the journal handle will have pending deletion events after we've been reading
// for a while. Letting the journal library process them will close any that
// are already deleted, so that we'll skip over them and allow space that would
// have been reclaimed by deleting these files to actually be reclaimed.
if sent > 0 && sent%1024 == 0 {
if status := C.sd_journal_process(j); status < 0 {
cerrstr := C.strerror(C.int(-status))
errstr := C.GoString(cerrstr)
fmtstr := "error %q while attempting to process journal events for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
// Attempt to rewind the last-read cursor to the
// entry that we last sent.
if status = C.sd_journal_previous(j); status < 0 {
cerrstr := C.strerror(C.int(-status))
errstr := C.GoString(cerrstr)
fmtstr := "error %q while attempting to rewind journal by 1 for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
}
break
}
}
// If the output channel is full, stop here, so that we don't block indefinitely
// waiting until we can output another message, which won't ever happen if the
// client has already disconnected.
if len(logWatcher.Msg) >= cap(logWatcher.Msg) {
// Attempt to rewind the last-read cursor to the entry
// that we last sent.
if status := C.sd_journal_previous(j); status < 0 {
cerrstr := C.strerror(C.int(-status))
errstr := C.GoString(cerrstr)
fmtstr := "error %q while attempting to rewind journal by 1 for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
}
break
}
// Read and send the logged message, if there is one to read.
// Read and send the current message, if there is one to read.
i := C.get_message(j, &msg, &length, &partial)
if i != -C.ENOENT && i != -C.EADDRNOTAVAIL {
// Read the entry's timestamp.
if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
// Attempt to rewind the last-read
// cursor to the entry that we last
// sent.
if status := C.sd_journal_previous(j); status < 0 {
cerrstr := C.strerror(C.int(-status))
errstr := C.GoString(cerrstr)
fmtstr := "error %q while attempting to rewind journal by 1 for container %q"
logrus.Errorf(fmtstr, errstr, s.vars["CONTAINER_ID_FULL"])
}
break
}
// Set up the time and text of the entry.
Expand Down Expand Up @@ -229,19 +298,26 @@ drain:
Attrs: attrs,
}
}
// If we're at the end of the journal, we're done (for now).
// If we've hit the end of the journal, we're done (for now).
sent++
if C.sd_journal_next(j) <= 0 {
eof = true
break
}
}

// If we didn't send any entries, just return the same cursor value.
if oldCursor != nil && sent == 0 {
return oldCursor, eof
}
// free(NULL) is safe
C.free(unsafe.Pointer(oldCursor))
// Take note of which entry we most recently sent.
if C.sd_journal_get_cursor(j, &cursor) != 0 {
// ensure that we won't be freeing an address that's invalid
cursor = nil
}
return cursor
return cursor, eof
}

func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char {
Expand All @@ -256,7 +332,7 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
// or we hit an error.
status := C.wait_for_data_cancelable(j, pfd[0])
for status == 1 {
cursor = s.drainJournal(logWatcher, config, j, cursor)
cursor, _ = s.drainJournal(logWatcher, config, j, cursor)
status = C.wait_for_data_cancelable(j, pfd[0])
}
if status < 0 {
Expand All @@ -270,7 +346,7 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
// exited. Try to drain the journal one more time to pick up any last-minute journal entries.
// Note, this isn't fool-proof and there's no guarantee that we'll get all the trailing
// entries, but this is better than nothing, as it does yield entries more often than not.
cursor = s.drainJournal(logWatcher, config, j, cursor)
cursor, _ = s.drainJournal(logWatcher, config, j, cursor)
}
// Clean up.
C.close(pfd[0])
Expand All @@ -296,27 +372,46 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
var j *C.sd_journal
var cmatch, cursor *C.char
var stamp C.uint64_t
var initiated C.uint64_t = 0xffffffffffffffff
var sinceUnixMicro uint64
var pipes [2]C.int
var ts C.struct_timespec

// Get the current time, so that we know when to stop in non-follow mode.
if C.clock_gettime(C.CLOCK_REALTIME, &ts) == 0 {
initiated = C.uint64_t(ts.tv_sec)*1000000000 + C.uint64_t(ts.tv_nsec)
}
// Get a handle to the journal.
rc := C.sd_journal_open(&j, C.int(0))
if rc != 0 {
logWatcher.Err <- fmt.Errorf("error opening journal")
close(logWatcher.Msg)
return
}
// The journal library uses an inotify descriptor to notice when
// journal files are removed, but it isn't allocated until our first
// call to sd_journal_get_fd(), which means that it will not notice the
// removal of any files that happens after we open the journal and
// before the first time we try to read that descriptor. Do it now,
// even though we don't need its value just yet, to try to make that
// window smaller.
rc = C.sd_journal_get_fd(j)
if rc < 0 {
logWatcher.Err <- fmt.Errorf("error opening journal inotify descriptor")
close(logWatcher.Msg)
return
}
// If we end up following the log, we can set the journal context
// pointer and the channel pointer to nil so that we won't close them
// here, potentially while the goroutine that uses them is still
// running. Otherwise, close them when we return from this function.
following := false
defer func(pfollowing *bool) {
if !*pfollowing {
defer func() {
if !following {
close(logWatcher.Msg)
}
C.sd_journal_close(j)
}(&following)
}()
// Remove limits on the size of data items that we'll retrieve.
rc = C.sd_journal_set_data_threshold(j, C.size_t(0))
if rc != 0 {
Expand Down Expand Up @@ -385,7 +480,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
return
}
}
cursor = s.drainJournal(logWatcher, config, j, nil)
cursor, eof := s.drainJournal(logWatcher, config, j, nil)
if config.Follow {
// Allocate a descriptor for following the journal, if we'll
// need one. Do it here so that we can report if it fails.
Expand All @@ -403,6 +498,29 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
following = true
}
}
} else {
// In case we stopped reading because the output channel was
// temporarily full, keep going until we cross the point where
// the timestamps on entries are later than when we started
// reading the log, to avoid trying to keep going until we
// hit the end of the journal when we just can't keep up.
duration := 10 * time.Millisecond
timer := time.NewTimer(duration)
drainCatchup:
for !eof && stamp < initiated {
timer.Stop()
cursor, eof = s.drainJournal(logWatcher, config, j, cursor)
if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
break drainCatchup
}
timer.Reset(duration)
select {
case <-logWatcher.WatchClose():
break drainCatchup
case <-timer.C:
}
}
timer.Stop()
}

C.free(unsafe.Pointer(cursor))
Expand Down