Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CRI: Stop following container log when container exited. #44406

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,10 @@ func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessag

// readLastStringFromContainerLogs attempts to read up to the max log length from the end of the CRI log represented
// by path. It reads up to max log lines.
func readLastStringFromContainerLogs(path string) string {
func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string) string {
value := int64(kubecontainer.MaxContainerTerminationMessageLogLines)
buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength)
if err := ReadLogs(path, &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil {
if err := m.ReadLogs(path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil {
return fmt.Sprintf("Error on reading termination message from logs: %v", err)
}
return buf.String()
Expand Down Expand Up @@ -414,7 +414,7 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, n
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
if checkLogs {
path := buildFullContainerLogsPath(uid, labeledInfo.ContainerName, annotatedInfo.RestartCount)
tMessage = readLastStringFromContainerLogs(path)
tMessage = m.readLastStringFromContainerLogs(path)
}
// Use the termination message written by the application is not empty
if len(tMessage) != 0 {
Expand Down Expand Up @@ -688,7 +688,7 @@ func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *v1.Pod, containerID ku
labeledInfo := getContainerInfoFromLabels(status.Labels)
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
path := buildFullContainerLogsPath(pod.UID, labeledInfo.ContainerName, annotatedInfo.RestartCount)
return ReadLogs(path, logOptions, stdout, stderr)
return m.ReadLogs(path, containerID.ID, logOptions, stdout, stderr)
}

// GetExec gets the endpoint the runtime will serve the exec request from.
Expand Down
71 changes: 46 additions & 25 deletions pkg/kubelet/kuberuntime/kuberuntime_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/golang/glog"

"k8s.io/kubernetes/pkg/api/v1"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/util/tail"
)

Expand All @@ -54,6 +55,11 @@ const (
timeFormat = time.RFC3339Nano
// blockSize is the block size used in tail.
blockSize = 1024

// stateCheckPeriod is the period to check container state while following
// the container log. Kubelet should not keep following the log when the
// container is not running.
stateCheckPeriod = 5 * time.Second
)

var (
Expand Down Expand Up @@ -110,7 +116,9 @@ func newLogOptions(apiOpts *v1.PodLogOptions, now time.Time) *logOptions {
}

// ReadLogs read the container log and redirect into stdout and stderr.
func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {
// Note that containerID is only needed when following the log, or else
// just pass in empty string "".
func (m *kubeGenericRuntimeManager) ReadLogs(path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", path, err)
Expand Down Expand Up @@ -166,8 +174,8 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer)
}
}
// Wait until the next log change.
if err := waitLogs(watcher); err != nil {
return fmt.Errorf("failed to wait logs for log file %q: %v", path, err)
if found, err := m.waitLogs(containerID, watcher); !found {
return err
}
continue
}
Expand Down Expand Up @@ -196,6 +204,41 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer)
}
}

// waitLogs wait for the next log write. It returns a boolean and an error. The boolean
// indicates whether a new log is found; the error is error happens during waiting new logs.
func (m *kubeGenericRuntimeManager) waitLogs(id string, w *fsnotify.Watcher) (bool, error) {
errRetry := 5
for {
select {
case e := <-w.Events:
switch e.Op {
case fsnotify.Write:
return true, nil
default:
glog.Errorf("Unexpected fsnotify event: %v, retrying...", e)
}
case err := <-w.Errors:
glog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry)
if errRetry == 0 {
return false, err
}
errRetry--
case <-time.After(stateCheckPeriod):
s, err := m.runtimeService.ContainerStatus(id)
if err != nil {
return false, err
}
// Only keep following container log when it is running.
if s.State != runtimeapi.ContainerState_CONTAINER_RUNNING {
glog.Errorf("Container %q is not running (state=%q)", id, s.State)
// Do not return error because it's normal that the container stops
// during waiting.
return false, nil
}
}
}
}

// parseFunc is a function parsing one log line to the internal log type.
// Notice that the caller must make sure logMessage is not nil.
type parseFunc func([]byte, *logMessage) error
Expand Down Expand Up @@ -267,28 +310,6 @@ func getParseFunc(log []byte) (parseFunc, error) {
return nil, fmt.Errorf("unsupported log format: %q", log)
}

// waitLogs wait for the next log write.
func waitLogs(w *fsnotify.Watcher) error {
errRetry := 5
for {
select {
case e := <-w.Events:
switch e.Op {
case fsnotify.Write:
return nil
default:
glog.Errorf("Unexpected fsnotify event: %v, retrying...", e)
}
case err := <-w.Errors:
glog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry)
if errRetry == 0 {
return err
}
errRetry--
}
}
}

// logWriter controls the writing into the stream based on the log options.
type logWriter struct {
stdout io.Writer
Expand Down