Skip to content

Commit

Permalink
Reading pod logs returns all container logs
Browse files Browse the repository at this point in the history
This is achieved by issuing an http request for each container to kubernetes' API, which yields one Reader for the corresponding container.
`logReadCloser' then reads from the above readers in parallel as data is available, buffering when necessary, forwarding it to clients by implementing the io.ReadCloser interface.
  • Loading branch information
Roberto Bruggemann committed Jan 3, 2018
1 parent 90cbd8d commit 899b15e
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 15 deletions.
32 changes: 22 additions & 10 deletions probe/kubernetes/client.go
Expand Up @@ -39,7 +39,7 @@ type Client interface {

WatchPods(f func(Event, Pod))

GetLogs(namespaceID, podID string) (io.ReadCloser, error)
GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error)
DeletePod(namespaceID, podID string) error
ScaleUp(resource, namespaceID, id string) error
ScaleDown(resource, namespaceID, id string) error
Expand Down Expand Up @@ -326,15 +326,27 @@ func (c *client) WalkNodes(f func(*apiv1.Node) error) error {
return nil
}

func (c *client) GetLogs(namespaceID, podID string) (io.ReadCloser, error) {
req := c.client.CoreV1().Pods(namespaceID).GetLogs(
podID,
&apiv1.PodLogOptions{
Follow: true,
Timestamps: true,
},
)
return req.Stream()
func (c *client) GetLogs(namespaceID, podID string, containerNames []string) (io.ReadCloser, error) {
readClosers := make([]io.ReadCloser, len(containerNames))
for i, container := range containerNames {
req := c.client.CoreV1().Pods(namespaceID).GetLogs(
podID,
&apiv1.PodLogOptions{
Follow: true,
Timestamps: true,
Container: container,
},
)
readCloser, err := req.Stream()
if err != nil {
for _, rc := range readClosers {
rc.Close()
}
return nil, err
}
readClosers[i] = readCloser
}
return NewLogReadCloser(readClosers...), nil
}

func (c *client) DeletePod(namespaceID, podID string) error {
Expand Down
10 changes: 5 additions & 5 deletions probe/kubernetes/controls.go
Expand Up @@ -18,8 +18,8 @@ const (
)

// GetLogs is the control to get the logs for a kubernetes pod
func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Response {
readCloser, err := r.client.GetLogs(namespaceID, podID)
func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string, containerNames []string) xfer.Response {
readCloser, err := r.client.GetLogs(namespaceID, podID, containerNames)
if err != nil {
return xfer.ResponseError(err)
}
Expand All @@ -43,7 +43,7 @@ func (r *Reporter) GetLogs(req xfer.Request, namespaceID, podID string) xfer.Res
}
}

func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.Response {
func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string, _ []string) xfer.Response {
if err := r.client.DeletePod(namespaceID, podID); err != nil {
return xfer.ResponseError(err)
}
Expand All @@ -53,7 +53,7 @@ func (r *Reporter) deletePod(req xfer.Request, namespaceID, podID string) xfer.R
}

// CapturePod is exported for testing
func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response) func(xfer.Request) xfer.Response {
func (r *Reporter) CapturePod(f func(xfer.Request, string, string, []string) xfer.Response) func(xfer.Request) xfer.Response {
return func(req xfer.Request) xfer.Response {
uid, ok := report.ParsePodNodeID(req.NodeID)
if !ok {
Expand All @@ -70,7 +70,7 @@ func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response
if pod == nil {
return xfer.ResponseErrorf("Pod not found: %s", uid)
}
return f(req, pod.Namespace(), pod.Name())
return f(req, pod.Namespace(), pod.Name(), pod.ContainerNames())
}
}

Expand Down
148 changes: 148 additions & 0 deletions probe/kubernetes/logreadcloser.go
@@ -0,0 +1,148 @@
package kubernetes

import (
"bytes"
"io"

log "github.com/Sirupsen/logrus"
)

const (
internalBufferSize = 1024
)

type logReadCloser struct {
readClosers []io.ReadCloser
eof []bool
buffer bytes.Buffer
dataChannel chan []byte
stopChannels []chan struct{}
eofChannel chan int
}

// NewLogReadCloser takes multiple io.ReadCloser and reads where data is available.
func NewLogReadCloser(readClosers ...io.ReadCloser) io.ReadCloser {
stopChannels := make([]chan struct{}, len(readClosers))
for i := range readClosers {
stopChannels[i] = make(chan struct{})
}

l := logReadCloser{
readClosers: readClosers,
dataChannel: make(chan []byte),
stopChannels: stopChannels,
eofChannel: make(chan int),
eof: make([]bool, len(readClosers)),
}

for idx := range l.readClosers {
go l.readInput(idx)
}

return &l
}

func (l *logReadCloser) Read(p []byte) (int, error) {
if len(p) <= l.buffer.Len() {
return l.readInternalBuffer(p)
}

// if there's data available to read, read it,
// otherwise block
byteCount := 0
if l.buffer.Len() > 0 {
n, err := l.readInternalBuffer(p)
if err != nil {
return n, err
}
byteCount += n
} else {
// block on read or EOF
received := false
for !received && !l.isEOF() {
select {
case data := <-l.dataChannel:
l.buffer.Write(data)
received = true
case idx := <-l.eofChannel:
l.eof[idx] = true
}
}
}

// check if there's more data to read, without blocking
empty := false
for !empty && l.buffer.Len() < len(p) {
select {
case data := <-l.dataChannel:
l.buffer.Write(data)
case idx := <-l.eofChannel:
l.eof[idx] = true
default:
empty = true
}
}

return l.readInternalBuffer(p[byteCount:])
}

func (l *logReadCloser) Close() error {
for i, rc := range l.readClosers {
err := rc.Close()
if err != nil {
return err
}

// synchronous stop:
// the routines write to dataChannel which will be closed by this thread
select {
case <-l.stopChannels[i]:
break
}
close(l.stopChannels[i])
}

close(l.dataChannel)
close(l.eofChannel)
return nil
}

func (l *logReadCloser) readInternalBuffer(p []byte) (int, error) {
n, err := l.buffer.Read(p)
if err == io.EOF && !l.isEOF() {
return n, nil
}

return n, err
}

func (l *logReadCloser) readInput(idx int) {
tmpBuffer := make([]byte, internalBufferSize)
for {
n, err := l.readClosers[idx].Read(tmpBuffer)
if err == io.EOF {
if n > 0 {
l.dataChannel <- tmpBuffer[:n]
}
l.eofChannel <- idx
break
}
if err != nil {
log.Errorf("Failed to read: %v", err)
break
}
l.dataChannel <- tmpBuffer[:n]
}

// signal the routine won't write to dataChannel
l.stopChannels[idx] <- struct{}{}
}

func (l *logReadCloser) isEOF() bool {
for _, e := range l.eof {
if !e {
return false
}
}
return true
}
77 changes: 77 additions & 0 deletions probe/kubernetes/logreadcloser_test.go
@@ -0,0 +1,77 @@
package kubernetes_test

import (
"bytes"
"io"
"io/ioutil"
"testing"

"github.com/weaveworks/scope/probe/kubernetes"
)

func TestLogReadCloser(t *testing.T) {
s0 := []byte("abcdefghijklmnopqrstuvwxyz")
s1 := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
s2 := []byte("0123456789012345")

r0 := ioutil.NopCloser(bytes.NewReader(s0))
r1 := ioutil.NopCloser(bytes.NewReader(s1))
r2 := ioutil.NopCloser(bytes.NewReader(s2))

l := kubernetes.NewLogReadCloser(r0, r1, r2)

buf := make([]byte, 3000)
count := 0
for {
n, err := l.Read(buf[count:])
if err == io.EOF {
break
}
if err != nil {
t.Error(err)
}
count += n
}

total := len(s0) + len(s1) + len(s2)
if count != total {
t.Errorf("Must read %v characters, but got %v", total, count)
}

// check every byte
byteCounter := map[byte]int{}
byteCount(byteCounter, s0)
byteCount(byteCounter, s1)
byteCount(byteCounter, s2)

for i := 0; i < count; i++ {
b := buf[i]
v, ok := byteCounter[b]
if ok {
v--
byteCounter[b] = v
}
}

for b, c := range byteCounter {
if c != 0 {
t.Errorf("%v should be 0 instead of %v", b, c)
}
}

err := l.Close()
if err != nil {
t.Errorf("Close must not return an error: %v", err)
}
}

func byteCount(accumulator map[byte]int, s []byte) {
for _, b := range s {
v, ok := accumulator[b]
if !ok {
v = 0
}
v++
accumulator[b] = v
}
}
9 changes: 9 additions & 0 deletions probe/kubernetes/pod.go
Expand Up @@ -24,6 +24,7 @@ type Pod interface {
NodeName() string
GetNode(probeID string) report.Node
RestartCount() uint
ContainerNames() []string
}

type pod struct {
Expand Down Expand Up @@ -86,3 +87,11 @@ func (p *pod) GetNode(probeID string) report.Node {
WithParents(p.parents).
WithLatestActiveControls(GetLogs, DeletePod)
}

func (p *pod) ContainerNames() []string {
containerNames := make([]string, 0, len(p.Pod.Spec.Containers))
for _, c := range p.Pod.Spec.Containers {
containerNames = append(containerNames, c.Name)
}
return containerNames
}

0 comments on commit 899b15e

Please sign in to comment.