Skip to content

Commit

Permalink
controllers/portforward: portforwards are run by the reconciler, read…
Browse files Browse the repository at this point in the history
…ing off the engine state [ch11908] (#4523)
  • Loading branch information
Maia McCormick committed May 7, 2021
1 parent bcd5127 commit cae72de
Show file tree
Hide file tree
Showing 8 changed files with 553 additions and 4 deletions.
2 changes: 2 additions & 0 deletions internal/cli/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tilt-dev/tilt/internal/container"
"github.com/tilt-dev/tilt/internal/controllers"
"github.com/tilt-dev/tilt/internal/controllers/core/cmd"
apiportforward "github.com/tilt-dev/tilt/internal/controllers/core/portforward"
"github.com/tilt-dev/tilt/internal/docker"
"github.com/tilt-dev/tilt/internal/dockercompose"
"github.com/tilt-dev/tilt/internal/engine"
Expand Down Expand Up @@ -114,6 +115,7 @@ var BaseWireSet = wire.NewSet(
k8srollout.NewPodMonitor,
telemetry.NewStartTracker,
session.NewController,
apiportforward.NewReconciler, // TODO(maia): remove as subscriber once this is a true reconciler

build.ProvideClock,
provideClock,
Expand Down
9 changes: 6 additions & 3 deletions internal/cli/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

184 changes: 184 additions & 0 deletions internal/controllers/core/portforward/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package portforward

import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/equality"

"github.com/tilt-dev/tilt/pkg/apis/core/v1alpha1"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/tilt-dev/tilt/internal/k8s"
"github.com/tilt-dev/tilt/internal/store"
"github.com/tilt-dev/tilt/pkg/logger"
)

type Reconciler struct {
// until PortForwarding behavior is moved off the PortForwardSubscriber and onto
// the reconciler, we set the reconciler to active/not with a flag (there should
// be no PortForwards on the EngineState anyway, but this is a failsafe).
isActive bool

kClient k8s.Client

// map of PortForward object name --> running forward(s)
activeForwards map[string]portForwardEntry
}

func NewReconciler(kClient k8s.Client) *Reconciler {
return &Reconciler{
kClient: kClient,
activeForwards: make(map[string]portForwardEntry),
}
}

func (r *Reconciler) SetActive() {
// Should only be used for tests; should be deprecated when migration is complete
r.isActive = true
}

// Figure out the diff between what's in the data store and
// what port-forwarding is currently active.
func (r *Reconciler) diff(ctx context.Context, st store.RStore) (toStart []portForwardEntry, toShutdown []portForwardEntry) {
state := st.RLockState()
defer st.RUnlockState()

statePFs := state.PortForwards

for name, existing := range r.activeForwards {
if _, onState := statePFs[name]; !onState {
// This port forward is no longer on the state, shut it down.
toShutdown = r.addToShutdown(toShutdown, existing)
continue
}
}

for name, desired := range statePFs {
existing, isActive := r.activeForwards[name]
if isActive {
// We're already running this PortForward -- do we need to do anything further?
// NOTE(maia): we compare the ManifestName annotation so that if a user changes
// just the manifest name, the PF logs will go to the correct place.
if equality.Semantic.DeepEqual(existing.Spec, desired.Spec) &&
existing.ObjectMeta.Annotations[v1alpha1.AnnotationManifest] ==
desired.ObjectMeta.Annotations[v1alpha1.AnnotationManifest] {
// Nothing has changed, nothing to do
continue
}

// There's been a change to the spec for this PortForward, so tear down the old version
toShutdown = r.addToShutdown(toShutdown, existing)
}

// We're not running this PortForward(/the current version of this PortForward), so spin it up
entry := newEntry(ctx, desired)
toStart = r.addToStart(toStart, entry)
}
return toStart, toShutdown
}

func (r *Reconciler) addToStart(toStart []portForwardEntry, entry portForwardEntry) []portForwardEntry {
r.activeForwards[entry.Name] = entry
return append(toStart, entry)
}

func (r *Reconciler) addToShutdown(toShutdown []portForwardEntry, entry portForwardEntry) []portForwardEntry {
delete(r.activeForwards, entry.Name)
return append(toShutdown, entry)
}

func (r *Reconciler) OnChange(ctx context.Context, st store.RStore, summary store.ChangeSummary) {
if !r.isActive {
return
}

if summary.IsLogOnly() {
return
}

toStart, toShutdown := r.diff(ctx, st)
for _, entry := range toShutdown {
entry.cancel()
}

for _, entry := range toStart {
// Treat port-forwarding errors as part of the pod log
ctx := store.MustObjectLogHandler(entry.ctx, st, entry.PortForward)
for _, forward := range entry.Spec.Forwards {
entry := entry
forward := forward
go r.startPortForwardLoop(ctx, entry, forward)
}
}
}

func (r *Reconciler) startPortForwardLoop(ctx context.Context, entry portForwardEntry, forward Forward) {
originalBackoff := wait.Backoff{
Steps: 1000,
Duration: 50 * time.Millisecond,
Factor: 2.0,
Jitter: 0.1,
Cap: 15 * time.Second,
}
currentBackoff := originalBackoff

for {
start := time.Now()
err := r.onePortForward(ctx, entry, forward)
if ctx.Err() != nil {
// If the context was canceled, we're satisfied.
// Ignore any errors.
return
}

// Otherwise, repeat the loop, maybe logging the error
if err != nil {
logger.Get(ctx).Infof("Reconnecting... Error port-forwarding %s (%d -> %d): %v",
entry.ObjectMeta.Annotations[v1alpha1.AnnotationManifest],
forward.LocalPort, forward.ContainerPort, err)
}

// If this failed in less than a second, then we should advance the backoff.
// Otherwise, reset the backoff.
if time.Since(start) < time.Second {
time.Sleep(currentBackoff.Step())
} else {
currentBackoff = originalBackoff
}
}
}

func (r *Reconciler) onePortForward(ctx context.Context, entry portForwardEntry, forward Forward) error {
ns := k8s.Namespace(entry.Spec.Namespace)
podID := k8s.PodID(entry.Spec.PodName)

pf, err := r.kClient.CreatePortForwarder(ctx, ns, podID, int(forward.LocalPort), int(forward.ContainerPort), forward.Host)
if err != nil {
return err
}

err = pf.ForwardPorts()
if err != nil {
return err
}
return nil
}

var _ store.Subscriber = &Reconciler{}

type portForwardEntry struct {
*PortForward
ctx context.Context
cancel func()
}

func newEntry(ctx context.Context, pf *PortForward) portForwardEntry {
ctx, cancel := context.WithCancel(ctx)
return portForwardEntry{
PortForward: pf,
ctx: ctx,
cancel: cancel,
}
}

0 comments on commit cae72de

Please sign in to comment.