Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 76 additions & 30 deletions pkg/gather/clusterconfig/clusterconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ import (
imageregistryv1 "github.com/openshift/client-go/imageregistry/clientset/versioned/typed/imageregistry/v1"

"github.com/openshift/insights-operator/pkg/record"
"github.com/openshift/insights-operator/pkg/record/diskrecorder"
)

const (
// Log compression ratio is defining a multiplier for uncompressed logs
// diskrecorder would refuse to write files larger than MaxLogSize, so GatherClusterOperators
// has to limit the expected size of the buffer for logs
logCompressionRatio = 2
)

var (
Expand All @@ -47,7 +55,7 @@ var (
registrySerializer serializer.CodecFactory
registryScheme = runtime.NewScheme()

// logTailLines defines a number lines to keep when fetching pod logs
// logTailLines sets maximum number of lines to fetch from pod logs
logTailLines = int64(100)
)

Expand Down Expand Up @@ -160,6 +168,7 @@ func GatherClusterOperators(i *Gatherer) func() ([]record.Record, []error) {
}
namespaceEventsCollected := sets.NewString()
now := time.Now()
unhealthyPods := []*corev1.Pod{}
for _, item := range config.Items {
if isHealthyOperator(&item) {
continue
Expand All @@ -176,34 +185,7 @@ func GatherClusterOperators(i *Gatherer) func() ([]record.Record, []error) {
continue
}
records = append(records, record.Record{Name: fmt.Sprintf("config/pod/%s/%s", pod.Namespace, pod.Name), Item: PodAnonymizer{pod}})

// Fetch previous and current pod logs
for _, previous := range []bool{true, false} {
for _, container := range pod.Spec.Containers {
currentOrPrevious := fmt.Sprintf("%s_current.log", container.Name)
if previous {
currentOrPrevious = fmt.Sprintf("%s_previous.log", container.Name)
}

req := i.coreClient.Pods(namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Previous: previous, TailLines: &logTailLines, Container: container.Name})
readCloser, err := req.Stream()
if err != nil {
klog.V(2).Infof("Failed to fetch %s for %s pod in namespace %s for failing operator %s: %q", currentOrPrevious, pod.Name, namespace, item.Name, err)
continue
}

defer readCloser.Close()

buf := bytes.NewBufferString("")
_, err = io.Copy(buf, readCloser)
if err != nil {
klog.V(2).Infof("Failed to write %s for %s pod in namespace %s for failing operator %s: %q", currentOrPrevious, pod.Name, namespace, item.Name, err)
continue
}

records = append(records, record.Record{Name: fmt.Sprintf("config/pod/%s/logs/%s/%s", pod.Namespace, pod.Name, currentOrPrevious), Item: Raw{buf.String()}})
}
}
unhealthyPods = append(unhealthyPods, pod)
}
if namespaceEventsCollected.Has(namespace) {
continue
Expand All @@ -217,11 +199,75 @@ func GatherClusterOperators(i *Gatherer) func() ([]record.Record, []error) {
namespaceEventsCollected.Insert(namespace)
}
}

// Exit early if no unhealthy pods found
if len(unhealthyPods) == 0 {
return records, nil
}

// Fetch a list of containers in unhealthy pods and calculate a log size quota
// Total log size must not exceed maxLogsSize multiplied by logCompressionRatio
klog.V(2).Infof("Found %d unhealthy pods", len(unhealthyPods))
totalUnhealthyContainers := 0
for _, pod := range unhealthyPods {
totalUnhealthyContainers += len(pod.Spec.InitContainers) + len(pod.Spec.Containers)
}
bufferSize := int64(diskrecorder.MaxLogSize * logCompressionRatio / totalUnhealthyContainers / 2)
klog.V(2).Infof("Maximum buffer size: %v bytes", bufferSize)
buf := bytes.NewBuffer(make([]byte, 0, bufferSize))

// Fetch previous and current container logs
for _, isPrevious := range []bool{true, false} {
for _, pod := range unhealthyPods {
allContainers := pod.Spec.InitContainers
allContainers = append(allContainers, pod.Spec.Containers...)
for _, c := range allContainers {
logName := fmt.Sprintf("%s_current.log", c.Name)
if isPrevious {
logName = fmt.Sprintf("%s_previous.log", c.Name)
}
buf.Reset()
klog.V(2).Infof("Fetching logs for %s container %s pod in namespace %s (previous: %v): %q", c.Name, pod.Name, pod.Namespace, isPrevious, err)
// Collect container logs and continue on error
err = collectContainerLogs(i, pod, buf, c.Name, isPrevious, &bufferSize)
if err != nil {
klog.V(2).Infof("Error: %q", err)
continue
}
records = append(records, record.Record{Name: fmt.Sprintf("config/pod/%s/logs/%s/%s", pod.Namespace, pod.Name, logName), Item: Raw{buf.String()}})
}
}
}

return records, nil
}
}

// GatherNodes collects all Nodes.
// collectContainerLogs fetches log lines from the pod
func collectContainerLogs(i *Gatherer, pod *corev1.Pod, buf *bytes.Buffer, containerName string, isPrevious bool, maxBytes *int64) error {
req := i.coreClient.Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Previous: isPrevious, Container: containerName, LimitBytes: maxBytes, TailLines: &logTailLines})
readCloser, err := req.Stream()
if err != nil {
klog.V(2).Infof("Failed to fetch log for %s pod in namespace %s for failing operator %s (previous: %v): %q", pod.Name, pod.Namespace, containerName, isPrevious, err)
return err
}

defer readCloser.Close()

_, err = io.Copy(buf, readCloser)
if err != nil && err != io.ErrShortBuffer {
klog.V(2).Infof("Failed to write log for %s pod in namespace %s for failing operator %s (previous: %v): %q", pod.Name, pod.Namespace, containerName, isPrevious, err)
return err
}
return nil
}

// GatherNodes collects all unhealthy Nodes.
//
// The node is unhealthy when:
// the operator.Status.Conditions.Condition Type
// is OperatorDegrated and Status is True or
// OperatorAvailable and Status is False
//
// The Kubernetes api https://github.com/kubernetes/client-go/blob/master/kubernetes/typed/core/v1/node.go#L78
// Response see https://docs.openshift.com/container-platform/4.3/rest_api/index.html#nodelist-v1core
Expand Down
5 changes: 4 additions & 1 deletion pkg/record/diskrecorder/diskrecorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ func (r memoryRecords) Less(i, j int) bool { return r[i].name < r[j].name }
func (r memoryRecords) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (r memoryRecords) Len() int { return len(r) }

// MaxLogSize defines maximum allowed tarball size
const MaxLogSize = 8 * 1024 * 1024

type Recorder struct {
basePath string
flushCh chan struct{}
Expand All @@ -53,7 +56,7 @@ func New(basePath string, interval time.Duration) *Recorder {
maxAge: interval * 6 * 24,
records: make(map[string]*memoryRecord),
flushCh: make(chan struct{}, 1),
flushSize: 8 * 1024 * 1024,
flushSize: MaxLogSize,
}
}

Expand Down