Skip to content

Commit

Permalink
engine: add status for PodLogStream (#4395)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicks committed Apr 5, 2021
1 parent e0eb3bd commit 8aeb180
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 8 deletions.
4 changes: 2 additions & 2 deletions internal/engine/runtimelog/podlogmanager.go
Expand Up @@ -116,7 +116,7 @@ func (m *PodLogManager) deletePls(ctx context.Context, st store.RStore, pls *Pod
err := m.client.Delete(ctx, pls)
if err != nil &&
!apierrors.IsNotFound(err) {
st.Dispatch(store.NewErrorAction(fmt.Errorf("syncing to apiserver: %v", err)))
st.Dispatch(store.NewErrorAction(fmt.Errorf("deleting PodLogStream from apiserver: %v", err)))
return
}
st.Dispatch(PodLogStreamDeleteAction{Name: pls.Name})
Expand All @@ -129,7 +129,7 @@ func (m *PodLogManager) createPls(ctx context.Context, st store.RStore, pls *Pod
!apierrors.IsNotFound(err) &&
!apierrors.IsConflict(err) &&
!apierrors.IsAlreadyExists(err) {
st.Dispatch(store.NewErrorAction(fmt.Errorf("syncing to apiserver: %v", err)))
st.Dispatch(store.NewErrorAction(fmt.Errorf("creating PodLogStream on apiserver: %v", err)))
return
}
st.Dispatch(PodLogStreamCreateAction{PodLogStream: pls})
Expand Down
21 changes: 20 additions & 1 deletion internal/engine/runtimelog/podlogmanager_test.go
Expand Up @@ -10,9 +10,11 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/tilt-dev/tilt/internal/container"
Expand Down Expand Up @@ -54,8 +56,9 @@ func TestLogs(t *testing.T) {

// Check to make sure that we're enqueuing pod changes as Reconcile() calls.
podNN := types.NamespacedName{Name: string(podID), Namespace: "default"}
streamNN := types.NamespacedName{Name: fmt.Sprintf("default-%s", podID)}
assert.Equal(t, []reconcile.Request{
reconcile.Request{NamespacedName: types.NamespacedName{Name: fmt.Sprintf("default-%s", podID)}},
reconcile.Request{NamespacedName: streamNN},
}, f.plsc.podSource.mapPodNameToEnqueue(podNN))
}

Expand Down Expand Up @@ -95,6 +98,20 @@ func TestLogsFailed(t *testing.T) {
f.onChange(podID)
f.AssertOutputContains("Error streaming pod-id logs")
assert.Contains(t, f.out.String(), "my-error")

// Check to make sure the status has an error.
stream := &PodLogStream{}
streamNN := types.NamespacedName{Name: fmt.Sprintf("default-%s", podID)}
err := f.client.Get(f.ctx, streamNN, stream)
require.NoError(t, err)
assert.Equal(t, stream.Status, PodLogStreamStatus{
ContainerStatuses: []ContainerLogStreamStatus{
ContainerLogStreamStatus{
Name: "cname",
Error: "my-error",
},
},
})
}

func TestLogsCanceledUnexpectedly(t *testing.T) {
Expand Down Expand Up @@ -422,6 +439,7 @@ func (s *plmStore) Dispatch(action store.Action) {
type plmFixture struct {
*tempdir.TempDirFixture
ctx context.Context
client ctrlclient.Client
kClient *k8s.FakeK8sClient
plm *PodLogManager
plsc *PodLogStreamController
Expand All @@ -447,6 +465,7 @@ func newPLMFixture(t *testing.T) *plmFixture {
return &plmFixture{
TempDirFixture: f,
kClient: kClient,
client: fc,
plm: plm,
plsc: plsc,
ctx: ctx,
Expand Down
120 changes: 117 additions & 3 deletions internal/engine/runtimelog/podlogstreamcontroller.go
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/google/go-cmp/cmp"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -27,13 +29,16 @@ var podLogReconnectGap = 2 * time.Second
//
// Collects logs from deployed containers.
type PodLogStreamController struct {
ctx context.Context
client ctrlclient.Client
st store.RStore
kClient k8s.Client
podSource *PodSource
mu sync.Mutex

watches map[podLogKey]PodLogWatch
hasClosedStream map[podLogKey]bool
statuses map[types.NamespacedName]*PodLogStreamStatus

newTicker func(d time.Duration) *time.Ticker
since func(t time.Time) time.Duration
Expand All @@ -45,12 +50,14 @@ var _ store.TearDowner = &PodLogStreamController{}

func NewPodLogStreamController(ctx context.Context, client ctrlclient.Client, st store.RStore, kClient k8s.Client) *PodLogStreamController {
return &PodLogStreamController{
ctx: ctx,
client: client,
st: st,
kClient: kClient,
podSource: NewPodSource(ctx, kClient),
watches: make(map[podLogKey]PodLogWatch),
hasClosedStream: make(map[podLogKey]bool),
statuses: make(map[types.NamespacedName]*PodLogStreamStatus),
newTicker: time.NewTicker,
since: time.Since,
now: time.Now,
Expand Down Expand Up @@ -119,7 +126,7 @@ func (m *PodLogStreamController) shouldStreamContainerLogs(c store.Container, ke
// Reconcile the given stream against what we're currently tracking.
func (r *PodLogStreamController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
stream := &PodLogStream{}
streamName := req.Name
streamName := req.NamespacedName
err := r.client.Get(ctx, req.NamespacedName, stream)
if apierrors.IsNotFound(err) {
r.podSource.handleReconcileRequest(ctx, req.NamespacedName, stream)
Expand Down Expand Up @@ -151,6 +158,7 @@ func (r *PodLogStreamController) Reconcile(ctx context.Context, req reconcile.Re
containers := []store.Container{}
containers = append(containers, initContainers...)
containers = append(containers, runContainers...)
r.ensureStatus(streamName, containers)

containerWatches := make(map[podLogKey]bool)
for i, c := range containers {
Expand Down Expand Up @@ -200,12 +208,18 @@ func (r *PodLogStreamController) Reconcile(ctx context.Context, req reconcile.Re
startWatchTime = <-existing.terminationTime
r.hasClosedStream[key] = true
if c.Terminated {
r.mutateStatus(streamName, c.Name, func(cs *ContainerLogStreamStatus) {
cs.Terminated = true
cs.Active = false
cs.Error = ""
})
continue
}
}

ctx, cancel := context.WithCancel(ctx)
w := PodLogWatch{
streamName: streamName,
ctx: ctx,
cancel: cancel,
podID: k8s.PodID(podNN.Name),
Expand All @@ -228,18 +242,24 @@ func (r *PodLogStreamController) Reconcile(ctx context.Context, req reconcile.Re
}
}

r.updateStatus(streamName)

return reconcile.Result{}, nil
}

// Delete all the streams generated by the named API object
func (c *PodLogStreamController) deleteStreams(streamName string) {
func (c *PodLogStreamController) deleteStreams(streamName types.NamespacedName) {
for k, watch := range c.watches {
if k.streamName != streamName {
continue
}
watch.cancel()
delete(c.watches, k)
}

c.mu.Lock()
delete(c.statuses, streamName)
c.mu.Unlock()
}

func (m *PodLogStreamController) consumeLogs(watch PodLogWatch, st store.RStore) {
Expand All @@ -266,6 +286,12 @@ func (m *PodLogStreamController) consumeLogs(watch PodLogWatch, st store.RStore)
if err != nil {
cancel()

m.mutateStatus(watch.streamName, containerName, func(cs *ContainerLogStreamStatus) {
cs.Active = false
cs.Error = err.Error()
})
m.updateStatus(watch.streamName)

// TODO(nick): Should this be Warnf/Errorf?
logger.Get(ctx).Infof("Error streaming %s logs: %v", pID, err)
return
Expand Down Expand Up @@ -309,19 +335,106 @@ func (m *PodLogStreamController) consumeLogs(watch PodLogWatch, st store.RStore)
}
}()

m.mutateStatus(watch.streamName, containerName, func(cs *ContainerLogStreamStatus) {
cs.Active = true
cs.Error = ""
})
m.updateStatus(watch.streamName)

_, err = io.Copy(logger.Get(ctx).Writer(logger.InfoLvl), reader)
_ = readCloser.Close()
close(done)
cancel()

if !retry && err != nil && ctx.Err() == nil {
m.mutateStatus(watch.streamName, containerName, func(cs *ContainerLogStreamStatus) {
cs.Active = false
cs.Error = err.Error()
})
m.updateStatus(watch.streamName)

// TODO(nick): Should this be Warnf/Errorf?
logger.Get(ctx).Infof("Error streaming %s logs: %v", pID, err)
return
}
}
}

// Set up the status object for a particular stream, tracking each container individually.
func (r *PodLogStreamController) ensureStatus(streamName types.NamespacedName, containers []store.Container) {
r.mu.Lock()
defer r.mu.Unlock()

status, ok := r.statuses[streamName]
if !ok {
status = &PodLogStreamStatus{}
r.statuses[streamName] = status
}

// Make sure the container names are right. If they're not, delete everything and recreate.
isMatching := len(containers) == len(status.ContainerStatuses)
if isMatching {
for i, cs := range status.ContainerStatuses {
if string(containers[i].Name) != cs.Name {
isMatching = false
break
}
}
}

if isMatching {
return
}

statuses := make([]ContainerLogStreamStatus, 0, len(containers))
for _, c := range containers {
statuses = append(statuses, ContainerLogStreamStatus{
Name: string(c.Name),
})
}
status.ContainerStatuses = statuses
}

// Modify the status of a container log stream.
func (r *PodLogStreamController) mutateStatus(streamName types.NamespacedName, containerName container.Name, mutate func(*ContainerLogStreamStatus)) {
r.mu.Lock()
defer r.mu.Unlock()

status, ok := r.statuses[streamName]
if !ok {
return
}

for i, cs := range status.ContainerStatuses {
if cs.Name != string(containerName) {
continue
}

mutate(&cs)
status.ContainerStatuses[i] = cs
}
}

// Update the server with the current container status.
func (r *PodLogStreamController) updateStatus(streamName types.NamespacedName) {
r.mu.Lock()
defer r.mu.Unlock()

status, ok := r.statuses[streamName]
if !ok {
return
}

stream := &PodLogStream{}
err := r.client.Get(r.ctx, streamName, stream)
if err != nil || cmp.Equal(stream.Status, status) {
return
}

status.DeepCopyInto(&stream.Status)
_ = r.client.Status().Update(r.ctx, stream)
}

func (c *PodLogStreamController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&PodLogStream{}).
Expand All @@ -333,6 +446,7 @@ type PodLogWatch struct {
ctx context.Context
cancel func()

streamName types.NamespacedName
podID k8s.PodID
namespace k8s.Namespace
cName container.Name
Expand All @@ -343,7 +457,7 @@ type PodLogWatch struct {
}

type podLogKey struct {
streamName string
streamName types.NamespacedName
podID k8s.PodID
cID container.ID
}
2 changes: 2 additions & 0 deletions internal/engine/runtimelog/types.go
Expand Up @@ -8,4 +8,6 @@ import (

type PodLogStream = v1alpha1.PodLogStream
type PodLogStreamSpec = v1alpha1.PodLogStreamSpec
type PodLogStreamStatus = v1alpha1.PodLogStreamStatus
type ContainerLogStreamStatus = v1alpha1.ContainerLogStreamStatus
type ObjectMeta = metav1.ObjectMeta
23 changes: 22 additions & 1 deletion pkg/apis/core/v1alpha1/podlogstream_types.go
Expand Up @@ -142,12 +142,33 @@ func (in *PodLogStreamList) GetListMeta() *metav1.ListMeta {
//
// TODO(nick): rewrite this status field, i don't think this is quite right.
type PodLogStreamStatus struct {
// A list of containers being watched.
//
// +optional
ContainerStatuses []ContainerLogStreamStatus `json:"containerStatuses,omitempty"`
}

// ContainerLogStreamStatus defines the current status of each individual
// container log stream.
type ContainerLogStreamStatus struct {
// The name of the container.
Name string `json:"name,omitempty"`

// True when the stream is set up and streaming logs properly.
//
// +optional
Active bool `json:"active,omitempty"`

// True when the logs are done stream and the container is terminated.
//
// +optional
Terminated bool `json:"terminated,omitempty"`

// The last error message encountered while streaming.
//
// Empty when the stream is active and healthy.
// Empty when the stream is actively streaming or successfully terminated.
//
// +optional
Error string `json:"error,omitempty"`
}

Expand Down

0 comments on commit 8aeb180

Please sign in to comment.