Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading pod logs returns all container logs #3013

Merged
merged 3 commits into from Jan 9, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 23 additions & 10 deletions probe/kubernetes/client.go
Expand Up @@ -39,7 +39,7 @@ type Client interface {

WatchPods(f func(Event, Pod))

GetLogs(namespaceID, podID string) (io.ReadCloser, error)
GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error)
DeletePod(namespaceID, podID string) error
ScaleUp(resource, namespaceID, id string) error
ScaleDown(resource, namespaceID, id string) error
Expand Down Expand Up @@ -326,15 +326,28 @@ func (c *client) WalkNodes(f func(*apiv1.Node) error) error {
return nil
}

func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) {
req := c.client.CoreV1().Pods(namespaceID).GetLogs(
podID,
&apiv1.PodLogOptions{
Follow: true,
Timestamps: true,
},
)
return req.Stream()
func (c *client) GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) {
readClosersWithLabel := map[io.ReadCloser]string{}
for _, container := range containerNames {
req := c.client.CoreV1().Pods(namespaceID).GetLogs(
podID,
&apiv1.PodLogOptions{
Follow: true,
Timestamps: true,
Container: container,
},
)
readCloser, err := req.Stream()
if err != nil {
for rc := range readClosersWithLabel {
rc.Close()
}
return nil, err
}
readClosersWithLabel[readCloser] = container
}

return NewLogReadCloser(readClosersWithLabel), nil
}

func (c *client) DeletePod(namespaceID, podID string) error {
Expand Down
10 changes: 5 additions & 5 deletions probe/kubernetes/controls.go
Expand Up @@ -18,8 +18,8 @@ const (
)

// GetLogs is the control to get the logs for a kubernetes pod
func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Response {
readCloser, err := r.client.GetLogs(namespaceID, podID)
func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string, containerNames []string) xfer.Response {
readCloser, err := r.client.GetLogs(namespaceID, podID, containerNames)
if err != nil {
return xfer.ResponseError(err)
}
Expand All @@ -43,7 +43,7 @@ func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Res
}
}

func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.Response {
func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string, _ []string) xfer.Response {
if err := r.client.DeletePod(namespaceID, podID); err != nil {
return xfer.ResponseError(err)
}
Expand All @@ -53,7 +53,7 @@ func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.R
}

// CapturePod is exported for testing
func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response {
func (r *Reporter) CapturePod(f func(xfer.Request, string, string, []string) xfer.Response) func(xfer.Request) xfer.Response {
return func(req xfer.Request) xfer.Response {
uid, ok := report.ParsePodNodeID(req.NodeID)
if !ok {
Expand All @@ -70,7 +70,7 @@ func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response
if pod == nil {
return xfer.ResponseErrorf("Pod not found: %s", uid)
}
return f(req, pod.Namespace(), pod.Name())
return f(req, pod.Namespace(), pod.Name(), pod.ContainerNames())
}
}

Expand Down
166 changes: 166 additions & 0 deletions probe/kubernetes/logreadcloser.go
@@ -0,0 +1,166 @@
package kubernetes

import (
"bufio"
"bytes"
"fmt"
"io"
"math"
)

type logReadCloser struct {
labels []string
labelLength int
readClosers []io.ReadCloser
eof []bool
buffer bytes.Buffer
dataChannel chan []byte
stopChannels []chan struct{}
eofChannel chan int

This comment was marked as abuse.

}

// NewLogReadCloser reads from multiple io.ReadCloser, where data is available,
// and annotates each line with the reader's label
func NewLogReadCloser(readClosersWithLabel map[io.ReadCloser]string) io.ReadCloser {
stopChannels := make([]chan struct{}, len(readClosersWithLabel))

This comment was marked as abuse.

This comment was marked as abuse.

labels := make([]string, len(readClosersWithLabel))
readClosers := make([]io.ReadCloser, len(readClosersWithLabel))

i := 0
labelLength := 0
for readCloser, label := range readClosersWithLabel {
stopChannels[i] = make(chan struct{})
readClosers[i] = readCloser
labels[i] = label
labelLength = int(math.Max(float64(labelLength), float64(len(label))))
i++
}

l := logReadCloser{
readClosers: readClosers,
labels: labels,
labelLength: labelLength,
dataChannel: make(chan []byte),
stopChannels: stopChannels,
eofChannel: make(chan int),
eof: make([]bool, len(readClosers)),
}

for idx := range l.readClosers {
go l.readInput(idx)
}

return &l
}

func (l *logReadCloser) Read(p []byte) (int, error) {
if len(p) <= l.buffer.Len() {
return l.readInternalBuffer(p)
}

// if there's data available to read, read it,
// otherwise block
byteCount := 0
if l.buffer.Len() > 0 {
n, err := l.readInternalBuffer(p)
if err != nil {
return n, err
}
byteCount += n
} else {
// block on read or EOF
received := false
for !received && !l.isEOF() {
select {
case data := <-l.dataChannel:
l.buffer.Write(data)
received = true
case idx := <-l.eofChannel:
l.eof[idx] = true
}
}
}

// check if there's more data to read, without blocking
empty := false
for !empty && l.buffer.Len() < len(p) {
select {
case data := <-l.dataChannel:
l.buffer.Write(data)
case idx := <-l.eofChannel:
l.eof[idx] = true
default:
empty = true
}
}

return l.readInternalBuffer(p[byteCount:])
}

func (l *logReadCloser) Close() error {
for i, rc := range l.readClosers {
err := rc.Close()
if err != nil {
return err
}

// synchronous stop:
// the routines write to dataChannel which will be closed by this thread

This comment was marked as abuse.

This comment was marked as abuse.

select {
case <-l.stopChannels[i]:
break
}

This comment was marked as abuse.

This comment was marked as abuse.

close(l.stopChannels[i])
}

close(l.dataChannel)
close(l.eofChannel)
return nil
}

func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) {
n, err := l.buffer.Read(p)
if err == io.EOF && !l.isEOF() {
return n, nil
}
return n, err
}

func (l *logReadCloser) readInput(idx int) {
reader := bufio.NewReader(l.readClosers[idx])
for {
line, err := reader.ReadBytes('\n')
if err == io.EOF {
if len(line) > 0 {
l.dataChannel <- l.annotateLine(idx, line)
}
l.eofChannel <- idx
break
}
if err != nil {
// error, exit
break
}
l.dataChannel <- l.annotateLine(idx, line)
}

// signal the routine won't write to dataChannel
l.stopChannels[idx] <- struct{}{}
}

func (l *logReadCloser) annotateLine(idx int, line []byte) []byte {
// do not annotate if it's the only reader
if len(l.labels) == 1 {
return line
}
return []byte(fmt.Sprintf("[%-*s] %v", l.labelLength, l.labels[idx], string(line)))
}

func (l *logReadCloser) isEOF() bool {
for _, e := range l.eof {
if !e {
return false
}
}
return true
}
87 changes: 87 additions & 0 deletions probe/kubernetes/logreadcloser_test.go
@@ -0,0 +1,87 @@
package kubernetes_test

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"strings"
"testing"

"github.com/weaveworks/scope/probe/kubernetes"
)

func TestLogReadCloser(t *testing.T) {
data0 := []byte("abcdefghijklmno\npqrstuvwxyz\n")
data1 := []byte("ABCDEFGHI\nJKLMNOPQRSTUVWXYZ\n")
data2 := []byte("012345678901\n2345\n\n678\n")

label0 := "zero"
label1 := "one"
label2 := "two"
longestlabelLength := len(label0)

readClosersWithLabel := map[io.ReadCloser]string{}
r0 := ioutil.NopCloser(bytes.NewReader(data0))
readClosersWithLabel[r0] = label0
r1 := ioutil.NopCloser(bytes.NewReader(data1))
readClosersWithLabel[r1] = label1
r2 := ioutil.NopCloser(bytes.NewReader(data2))
readClosersWithLabel[r2] = label2

l := kubernetes.NewLogReadCloser(readClosersWithLabel)

buf := make([]byte, 3000)
count := 0
for {
n, err := l.Read(buf[count:])
if err == io.EOF {
break
}
if err != nil {
t.Error(err)
}
count += n
}

// convert to string for easier comparison
result := map[string]int{}
lineCounter(result, longestlabelLength, label0, data0)
lineCounter(result, longestlabelLength, label1, data1)
lineCounter(result, longestlabelLength, label2, data2)

str := string(buf[:count])
for _, line := range strings.SplitAfter(str, "\n") {
v, ok := result[line]
if ok {
result[line] = v - 1
}
}

for line, v := range result {
if v != 0 {
t.Errorf("Line %v has not be read from reader", line)
}
}

err := l.Close()
if err != nil {
t.Errorf("Close must not return an error: %v", err)
}
}

func lineCounter(counter map[string]int, pad int, label string, data []byte) {
for _, str := range strings.SplitAfter(string(data), "\n") {
if len(str) == 0 {
// SplitAfter ends with an empty string if the last character is '\n'
continue
}
line := fmt.Sprintf("[%-*s] %v", pad, label, str)
v, ok := counter[line]
if !ok {
v = 0
}
v++
counter[line] = v
}
}
9 changes: 9 additions & 0 deletions probe/kubernetes/pod.go
Expand Up @@ -24,6 +24,7 @@ type Pod interface {
NodeName() string
GetNode(probeID string) report.Node
RestartCount() uint
ContainerNames() []string
}

type pod struct {
Expand Down Expand Up @@ -86,3 +87,11 @@ func (p *pod) GetNode(probeID string) report.Node {
WithParents(p.parents).
WithLatestActiveControls(GetLogs, DeletePod)
}

func (p *pod) ContainerNames() []string {
containerNames := make([]string, 0, len(p.Pod.Spec.Containers))
for _, c := range p.Pod.Spec.Containers {
containerNames = append(containerNames, c.Name)
}
return containerNames
}
2 changes: 1 addition & 1 deletion probe/kubernetes/reporter_test.go
Expand Up @@ -155,7 +155,7 @@ func (*mockClient) WalkNodes(f func(*apiv1.Node) error) error {
return nil
}
func (*mockClient) WatchPods(func(kubernetes.Event, kubernetes.Pod)) {}
func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) {
func (c *mockClient) GetLogs(namespaceID, podName string, _ []string) (io.ReadCloser, error) {
r, ok := c.logs[namespaceID+";"+podName]
if !ok {
return nil, fmt.Errorf("Not found")
Expand Down