Skip to content

Commit

Permalink
podlogstream: move the reconciler into its own package (#4410)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicks committed Apr 7, 2021
1 parent 3ecb71d commit 89be6cb
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 43 deletions.
3 changes: 2 additions & 1 deletion internal/cli/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/tilt-dev/tilt/internal/controllers/core/filewatch/fsevent"
"github.com/tilt-dev/tilt/internal/controllers/core/podlogstream"

"github.com/google/wire"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -87,7 +88,7 @@ var BaseWireSet = wire.NewSet(
clockwork.NewRealClock,
engine.DeployerWireSet,
runtimelog.NewPodLogManager,
runtimelog.NewPodLogStreamController,
podlogstream.NewController,
portforward.NewController,
engine.NewBuildController,
cmd.WireSet,
Expand Down
15 changes: 8 additions & 7 deletions internal/cli/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package runtimelog
package podlogstream

import (
"context"
Expand All @@ -16,6 +16,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/tilt-dev/tilt/internal/container"
"github.com/tilt-dev/tilt/internal/engine/runtimelog"
"github.com/tilt-dev/tilt/internal/k8s"
"github.com/tilt-dev/tilt/internal/store"
"github.com/tilt-dev/tilt/internal/store/k8sconv"
Expand All @@ -28,7 +29,7 @@ var podLogReconnectGap = 2 * time.Second
// Reconciles the PodLogStream API object.
//
// Collects logs from deployed containers.
type PodLogStreamController struct {
type Controller struct {
ctx context.Context
client ctrlclient.Client
st store.RStore
Expand All @@ -45,11 +46,11 @@ type PodLogStreamController struct {
now func() time.Time
}

var _ reconcile.Reconciler = &PodLogStreamController{}
var _ store.TearDowner = &PodLogStreamController{}
var _ reconcile.Reconciler = &Controller{}
var _ store.TearDowner = &Controller{}

func NewPodLogStreamController(ctx context.Context, client ctrlclient.Client, st store.RStore, kClient k8s.Client) *PodLogStreamController {
return &PodLogStreamController{
func NewController(ctx context.Context, client ctrlclient.Client, st store.RStore, kClient k8s.Client) *Controller {
return &Controller{
ctx: ctx,
client: client,
st: st,
Expand All @@ -65,7 +66,7 @@ func NewPodLogStreamController(ctx context.Context, client ctrlclient.Client, st
}

// Filter containers based on the inclusions/exclusions in the PodLogStream spec.
func (m *PodLogStreamController) filterContainers(stream *PodLogStream, containers []store.Container) []store.Container {
func (m *Controller) filterContainers(stream *PodLogStream, containers []store.Container) []store.Container {
if len(stream.Spec.OnlyContainers) > 0 {
only := make(map[container.Name]bool, len(stream.Spec.OnlyContainers))
for _, name := range stream.Spec.OnlyContainers {
Expand Down Expand Up @@ -98,15 +99,15 @@ func (m *PodLogStreamController) filterContainers(stream *PodLogStream, containe
return containers
}

func (c *PodLogStreamController) SetClient(client ctrlclient.Client) {
func (c *Controller) SetClient(client ctrlclient.Client) {
c.client = client
}

func (c *PodLogStreamController) TearDown(ctx context.Context) {
func (c *Controller) TearDown(ctx context.Context) {
c.podSource.TearDown()
}

func (m *PodLogStreamController) shouldStreamContainerLogs(c store.Container, key podLogKey) bool {
func (m *Controller) shouldStreamContainerLogs(c store.Container, key podLogKey) bool {
if c.ID == "" {
return false
}
Expand All @@ -124,7 +125,7 @@ func (m *PodLogStreamController) shouldStreamContainerLogs(c store.Container, ke
}

// Reconcile the given stream against what we're currently tracking.
func (r *PodLogStreamController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
func (r *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
stream := &PodLogStream{}
streamName := req.NamespacedName
err := r.client.Get(ctx, req.NamespacedName, stream)
Expand Down Expand Up @@ -248,7 +249,7 @@ func (r *PodLogStreamController) Reconcile(ctx context.Context, req reconcile.Re
}

// Delete all the streams generated by the named API object
func (c *PodLogStreamController) deleteStreams(streamName types.NamespacedName) {
func (c *Controller) deleteStreams(streamName types.NamespacedName) {
for k, watch := range c.watches {
if k.streamName != streamName {
continue
Expand All @@ -262,7 +263,7 @@ func (c *PodLogStreamController) deleteStreams(streamName types.NamespacedName)
c.mu.Unlock()
}

func (m *PodLogStreamController) consumeLogs(watch PodLogWatch, st store.RStore) {
func (m *Controller) consumeLogs(watch PodLogWatch, st store.RStore) {
pID := watch.podID
ctx := watch.ctx
containerName := watch.cName
Expand Down Expand Up @@ -308,8 +309,8 @@ func (m *PodLogStreamController) consumeLogs(watch PodLogWatch, st store.RStore)
return
}

reader := NewHardCancelReader(ctx, readCloser)
reader.now = m.now
reader := runtimelog.NewHardCancelReader(ctx, readCloser)
reader.Now = m.now

// A hacky workaround for
// https://github.com/tilt-dev/tilt/issues/3908
Expand Down Expand Up @@ -365,7 +366,7 @@ func (m *PodLogStreamController) consumeLogs(watch PodLogWatch, st store.RStore)
}

// Set up the status object for a particular stream, tracking each container individually.
func (r *PodLogStreamController) ensureStatus(streamName types.NamespacedName, containers []store.Container) {
func (r *Controller) ensureStatus(streamName types.NamespacedName, containers []store.Container) {
r.mu.Lock()
defer r.mu.Unlock()

Expand Down Expand Up @@ -400,7 +401,7 @@ func (r *PodLogStreamController) ensureStatus(streamName types.NamespacedName, c
}

// Modify the status of a container log stream.
func (r *PodLogStreamController) mutateStatus(streamName types.NamespacedName, containerName container.Name, mutate func(*ContainerLogStreamStatus)) {
func (r *Controller) mutateStatus(streamName types.NamespacedName, containerName container.Name, mutate func(*ContainerLogStreamStatus)) {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -420,7 +421,7 @@ func (r *PodLogStreamController) mutateStatus(streamName types.NamespacedName, c
}

// Update the server with the current container status.
func (r *PodLogStreamController) updateStatus(streamName types.NamespacedName) {
func (r *Controller) updateStatus(streamName types.NamespacedName) {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -439,7 +440,7 @@ func (r *PodLogStreamController) updateStatus(streamName types.NamespacedName) {
_ = r.client.Status().Update(r.ctx, stream)
}

func (c *PodLogStreamController) SetupWithManager(mgr ctrl.Manager) error {
func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&PodLogStream{}).
Watches(c.podSource, handler.Funcs{}).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package runtimelog
package podlogstream

import (
"context"
Expand All @@ -19,6 +19,7 @@ import (

"github.com/tilt-dev/tilt/internal/container"
"github.com/tilt-dev/tilt/internal/controllers/fake"
"github.com/tilt-dev/tilt/internal/engine/runtimelog"
"github.com/tilt-dev/tilt/internal/k8s"
"github.com/tilt-dev/tilt/internal/store"
"github.com/tilt-dev/tilt/internal/store/k8sconv"
Expand Down Expand Up @@ -357,8 +358,8 @@ func TestIstioContainerLogs(t *testing.T) {

state := f.store.LockMutableStateForTesting()

istioInit := IstioInitContainerName
istioSidecar := IstioSidecarContainerName
istioInit := runtimelog.IstioInitContainerName
istioSidecar := runtimelog.IstioSidecarContainerName
cNormal := container.Name("cNameNormal")
pb := newPodBuilder(podID).
addTerminatedInitContainer(istioInit, "cID-init").
Expand Down Expand Up @@ -419,11 +420,11 @@ func (s *plmStore) Dispatch(action store.Action) {
defer s.UnlockMutableState()

switch action := action.(type) {
case PodLogStreamCreateAction:
case runtimelog.PodLogStreamCreateAction:
state.PodLogStreams[action.PodLogStream.Name] = action.PodLogStream
action.Summarize(&s.summary)
return
case PodLogStreamDeleteAction:
case runtimelog.PodLogStreamDeleteAction:
delete(state.PodLogStreams, action.Name)
action.Summarize(&s.summary)
return
Expand All @@ -445,8 +446,8 @@ type plmFixture struct {
ctx context.Context
client ctrlclient.Client
kClient *k8s.FakeK8sClient
plm *PodLogManager
plsc *PodLogStreamController
plm *runtimelog.PodLogManager
plsc *Controller
cancel func()
out *bufsync.ThreadSafeBuffer
store *plmStore
Expand All @@ -463,8 +464,8 @@ func newPLMFixture(t *testing.T) *plmFixture {

st := newPLMStore(t, out)
fc := fake.NewTiltClient()
plm := NewPodLogManager(fc)
plsc := NewPodLogStreamController(ctx, fc, st, kClient)
plm := runtimelog.NewPodLogManager(fc)
plsc := NewController(ctx, fc, st, kClient)

return &plmFixture{
TempDirFixture: f,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package runtimelog
package podlogstream

import (
"context"
Expand Down
13 changes: 13 additions & 0 deletions internal/controllers/core/podlogstream/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package podlogstream

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/tilt-dev/tilt/pkg/apis/core/v1alpha1"
)

type PodLogStream = v1alpha1.PodLogStream
type PodLogStreamSpec = v1alpha1.PodLogStreamSpec
type PodLogStreamStatus = v1alpha1.PodLogStreamStatus
type ContainerLogStreamStatus = v1alpha1.ContainerLogStreamStatus
type ObjectMeta = metav1.ObjectMeta
4 changes: 2 additions & 2 deletions internal/controllers/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/tilt-dev/tilt/internal/controllers/core/cmd"
"github.com/tilt-dev/tilt/internal/controllers/core/filewatch"
"github.com/tilt-dev/tilt/internal/engine/runtimelog"
"github.com/tilt-dev/tilt/internal/controllers/core/podlogstream"
"github.com/tilt-dev/tilt/pkg/apis/core/v1alpha1"
)

Expand All @@ -19,7 +19,7 @@ var controllerSet = wire.NewSet(
func ProvideControllers(
fileWatch *filewatch.Controller,
cmds *cmd.Controller,
podlogstreams *runtimelog.PodLogStreamController) []Controller {
podlogstreams *podlogstream.Controller) []Controller {
return []Controller{
fileWatch,
cmds,
Expand Down
6 changes: 3 additions & 3 deletions internal/engine/runtimelog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
type HardCancelReader struct {
ctx context.Context
reader io.Reader
now func() time.Time
Now func() time.Time
mu sync.Mutex
lastRead time.Time
}

func NewHardCancelReader(ctx context.Context, reader io.Reader) *HardCancelReader {
return &HardCancelReader{ctx: ctx, reader: reader, now: time.Now}
return &HardCancelReader{ctx: ctx, reader: reader, Now: time.Now}
}

func (r *HardCancelReader) LastReadTime() time.Time {
Expand All @@ -40,7 +40,7 @@ func (r *HardCancelReader) Read(b []byte) (int, error) {

r.mu.Lock()
defer r.mu.Unlock()
r.lastRead = r.now()
r.lastRead = r.Now()

return n, err
}
3 changes: 2 additions & 1 deletion internal/engine/upper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/tilt-dev/tilt/internal/controllers/core/filewatch/fsevent"
"github.com/tilt-dev/tilt/internal/controllers/core/podlogstream"

"github.com/docker/distribution/reference"
dockertypes "github.com/docker/docker/api/types"
Expand Down Expand Up @@ -3766,7 +3767,7 @@ func newTestFixture(t *testing.T) *testFixture {
env := k8s.EnvDockerDesktop
cdc := controllers.ProvideDeferredClient()
plm := runtimelog.NewPodLogManager(cdc)
plsc := runtimelog.NewPodLogStreamController(ctx, cdc, st, kCli)
plsc := podlogstream.NewController(ctx, cdc, st, kCli)
ccb := controllers.NewClientBuilder(cdc).WithUncached(&v1alpha1.FileWatch{})
fwms := fswatch.NewManifestSubscriber(cdc)
pfc := portforward.NewController(kCli)
Expand Down

0 comments on commit 89be6cb

Please sign in to comment.