Skip to content

Commit

Permalink
journald logger: fix race condition
Browse files Browse the repository at this point in the history
Fix a race in journald driver.  Following the logs implies streaming
until the container is dead.  Streaming happened in one goroutine,
waiting for the container to exit/die and signaling that event happened
in another goroutine.

The nature of having two goroutines running simultaneously is pretty
much the core of the race condition.  When the streaming goroutines
received the signal that the container has exitted, the routine may not
have read and written all of the container's logs.

Fix this race by reading both, the logs and the events, of the container
and stop streaming when the died/exited event has been read.  The died
event is guaranteed to be after all logs in the journal which guarantees
not only consistencty but also a deterministic behavior.

Note that the journald log driver now requires the journald event
backend to be set.

Fixes: containers#10323
Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
  • Loading branch information
vrothberg committed May 26, 2021
1 parent e81457d commit 10569c9
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 117 deletions.
276 changes: 162 additions & 114 deletions libpod/container_log_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ package libpod
import (
"context"
"fmt"
"io"
"math"
"strings"
"time"

"github.com/containers/podman/v3/libpod/define"
"github.com/containers/podman/v3/libpod/events"
"github.com/containers/podman/v3/libpod/logs"
journal "github.com/coreos/go-systemd/v22/sdjournal"
"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand All @@ -24,122 +22,187 @@ const (

// journaldLogErr is the journald priority signifying stderr
journaldLogErr = "3"

// bufLen is the length of the buffer to read from a k8s-file
// formatted log line
// let's set it as 2k just to be safe if k8s-file format ever changes
bufLen = 16384
)

func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
var config journal.JournalReaderConfig
if options.Tail < 0 {
config.NumFromTail = 0
} else if options.Tail == 0 {
config.NumFromTail = math.MaxUint64
} else {
config.NumFromTail = uint64(options.Tail)
journal, err := sdjournal.NewJournal()
if err != nil {
return err
}
if options.Multi {
config.Formatter = journalFormatterWithID
} else {
config.Formatter = journalFormatter
}
defaultTime := time.Time{}
if options.Since != defaultTime {
// coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future
// return nothing instead of falsely printing
if time.Now().Before(options.Since) {
return nil
}
// coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past
config.Since = -time.Since(options.Since)
// While logs are written to the `logChannel`, we inspect each event
// and stop once the container has died. Having logs and events in one
// stream prevents a race condition that we faced in #10323.

// Add the filters for events.
match := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}
match = sdjournal.Match{Field: "PODMAN_ID", Value: c.ID()}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}
config.Matches = append(config.Matches, journal.Match{
Field: "CONTAINER_ID_FULL",
Value: c.ID(),
})
options.WaitGroup.Add(1)

r, err := journal.NewJournalReader(config)
if err != nil {
// Add the filter for logs. Note the disjunction so that we match
// either the events or the logs.
if err := journal.AddDisjunction(); err != nil {
return errors.Wrap(err, "adding filter disjunction to journald logger")
}
match = sdjournal.Match{Field: "CONTAINER_ID_FULL", Value: c.ID()}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}

if err := journal.SeekHead(); err != nil {
return err
}
if r == nil {
return errors.Errorf("journal reader creation failed")
// API requires Next() immediately after SeekHead().
if _, err := journal.Next(); err != nil {
return errors.Wrap(err, "initial journal cursor")
}
if options.Tail == math.MaxInt64 {
r.Rewind()

// API requires a next|prev before getting a cursor.
if _, err := journal.Previous(); err != nil {
return errors.Wrap(err, "initial journal cursor")
}
state, err := c.State()
if err != nil {
return err

// Note that the initial cursor may not yet be ready, so we'll do an
// exponential backoff.
var cursor string
var cursorError error
for i := 1; i <= 3; i++ {
cursor, cursorError = journal.GetCursor()
if err != nil {
continue
}
time.Sleep(time.Duration(i*100) * time.Millisecond)
break
}
if cursorError != nil {
return errors.Wrap(cursorError, "inital journal cursor")
}

// We need the container's events in the same journal to guarantee
// consistency, see #10323.
if options.Follow && c.runtime.config.Engine.EventsLogger != "journald" {
return errors.Errorf("using --follow with the journald --log-driver but without the journald --events-backend (%s) is not supported", c.runtime.config.Engine.EventsLogger)
}

if options.Follow && state == define.ContainerStateRunning {
go func() {
done := make(chan bool)
until := make(chan time.Time)
go func() {
select {
case <-ctx.Done():
until <- time.Time{}
case <-done:
// nothing to do anymore
options.WaitGroup.Add(1)
go func() {
defer func() {
options.WaitGroup.Done()
if err := journal.Close(); err != nil {
logrus.Errorf("Unable to close journal: %v", err)
}
}()

afterTimeStamp := false // needed for options.Since
tailQueue := []*logs.LogLine{} // needed for options.Tail
doTail := options.Tail > 0
for {
select {
case <-ctx.Done():
// Remote client may have closed/lost the connection.
return
default:
// Fallthrough
}

if _, err := journal.Next(); err != nil {
logrus.Errorf("Failed to move journal cursor to next entry: %v", err)
return
}
latestCursor, err := journal.GetCursor()
if err != nil {
logrus.Errorf("Failed to get journal cursor: %v", err)
return
}

// Hit the end of the journal.
if cursor == latestCursor {
if doTail {
// Flush *once* we hit the end of the journal.
startIndex := int64(len(tailQueue)-1) - options.Tail
if startIndex < 0 {
startIndex = 0
}
for i := startIndex; i < int64(len(tailQueue)); i++ {
logChannel <- tailQueue[i]
}
tailQueue = nil
doTail = false
}
// Unless we follow, quit.
if !options.Follow {
return
}
}()
go func() {
// FIXME (#10323): we are facing a terrible
// race condition here. At the time the
// container dies and `c.Wait()` has returned,
// we may not have received all journald logs.
// So far there is no other way than waiting
// for a second. Ultimately, `r.Follow` is
// racy and we may have to implement our custom
// logic here.
c.Wait(ctx)
time.Sleep(time.Second)
until <- time.Time{}
}()
follower := journaldFollowBuffer{logChannel, options.Multi}
err := r.Follow(until, follower)
// Sleep until something's happening on the journal.
journal.Wait(sdjournal.IndefiniteWait)
continue
}
cursor = latestCursor

entry, err := journal.GetEntry()
if err != nil {
logrus.Debugf(err.Error())
logrus.Errorf("Failed to get journal entry: %v", err)
return
}
r.Close()
options.WaitGroup.Done()
done <- true
return
}()
return nil
}

go func() {
bytes := make([]byte, bufLen)
// /me complains about no do-while in go
ec, err := r.Read(bytes)
for ec != 0 && err == nil {
// because we are reusing bytes, we need to make
// sure the old data doesn't get into the new line
bytestr := string(bytes[:ec])
logLine, err2 := logs.NewJournaldLogLine(bytestr, options.Multi)
if err2 != nil {
logrus.Error(err2)
if !afterTimeStamp {
entryTime := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
if entryTime.Before(options.Since) {
continue
}
afterTimeStamp = true
}

// If we're reading an event and the container exited/died,
// then we're done and can return.
event, ok := entry.Fields["PODMAN_EVENT"]
if ok {
status, err := events.StringToStatus(event)
if err != nil {
logrus.Errorf("Failed to translate event: %v", err)
return
}
if status == events.Exited {
return
}
continue
}

var message string
var formatError error

if options.Multi {
message, formatError = journalFormatterWithID(entry)
} else {
message, formatError = journalFormatter(entry)
}

if formatError != nil {
logrus.Errorf("Failed to parse journald log entry: %v", err)
return
}

logLine, err := logs.NewJournaldLogLine(message, options.Multi)
if err != nil {
logrus.Errorf("Failed parse log line: %v", err)
return
}
if doTail {
tailQueue = append(tailQueue, logLine)
continue
}
logChannel <- logLine
ec, err = r.Read(bytes)
}
if err != nil && err != io.EOF {
logrus.Error(err)
}
r.Close()
options.WaitGroup.Done()
}()

return nil
}

func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
func journalFormatterWithID(entry *sdjournal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
Expand All @@ -162,7 +225,7 @@ func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
return output, nil
}

func journalFormatter(entry *journal.JournalEntry) (string, error) {
func journalFormatter(entry *sdjournal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
Expand All @@ -176,7 +239,7 @@ func journalFormatter(entry *journal.JournalEntry) (string, error) {
return output, nil
}

func formatterPrefix(entry *journal.JournalEntry) (string, error) {
func formatterPrefix(entry *sdjournal.JournalEntry) (string, error) {
usec := entry.RealtimeTimestamp
tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat)
output := fmt.Sprintf("%s ", tsString)
Expand All @@ -202,7 +265,7 @@ func formatterPrefix(entry *journal.JournalEntry) (string, error) {
return output, nil
}

func formatterMessage(entry *journal.JournalEntry) (string, error) {
func formatterMessage(entry *sdjournal.JournalEntry) (string, error) {
// Finally, append the message
msg, ok := entry.Fields["MESSAGE"]
if !ok {
Expand All @@ -211,18 +274,3 @@ func formatterMessage(entry *journal.JournalEntry) (string, error) {
msg = strings.TrimSuffix(msg, "\n")
return msg, nil
}

type journaldFollowBuffer struct {
logChannel chan *logs.LogLine
withID bool
}

func (f journaldFollowBuffer) Write(p []byte) (int, error) {
bytestr := string(p)
logLine, err := logs.NewJournaldLogLine(bytestr, f.withID)
if err != nil {
return -1, err
}
f.logChannel <- logLine
return len(p), nil
}
2 changes: 1 addition & 1 deletion test/e2e/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var _ = Describe("Podman logs", func() {
})

It("podman logs on a created container should result in 0 exit code: "+log, func() {
session := podmanTest.Podman([]string{"create", "-t", "--name", "log", ALPINE})
session := podmanTest.Podman([]string{"create", "--log-driver", log, "-t", "--name", "log", ALPINE})
session.WaitWithDefaultTimeout()
Expect(session).To(Exit(0))

Expand Down
Loading

0 comments on commit 10569c9

Please sign in to comment.