Skip to content

Commit

Permalink
portforward: reimplement current port forward behavior with api objec…
Browse files Browse the repository at this point in the history
…ts [ch11953] (#4499)
  • Loading branch information
Maia McCormick committed May 3, 2021
1 parent 94c4b07 commit 4079428
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 36 deletions.
4 changes: 4 additions & 0 deletions internal/engine/portforward/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type PortForwardDeleteAction struct {
Name string
}

func NewPortForwardDeleteAction(pfName string) PortForwardDeleteAction {
return PortForwardDeleteAction{Name: pfName}
}

func (PortForwardDeleteAction) Action() {}

func (a PortForwardDeleteAction) Summarize(s *store.ChangeSummary) {
Expand Down
107 changes: 73 additions & 34 deletions internal/engine/portforward/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package portforward

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/tilt-dev/tilt/pkg/apis/core/v1alpha1"

"github.com/tilt-dev/tilt/internal/store/k8sconv"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/google/go-cmp/cmp"

"github.com/tilt-dev/tilt/internal/k8s"
"github.com/tilt-dev/tilt/internal/store"
"github.com/tilt-dev/tilt/pkg/logger"
Expand All @@ -22,6 +24,8 @@ import (
type Subscriber struct {
kClient k8s.Client

// We store active forwards here so that we have a way of canceling running
// forwards--soon this will be the responsibility of the PortForwardReconciler
activeForwards map[k8s.PodID]portForwardEntry
}

Expand Down Expand Up @@ -62,38 +66,54 @@ func (s *Subscriber) diff(ctx context.Context, st store.RStore) (toStart []portF

statePods[podID] = true

apiPf := &v1alpha1.PortForward{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("port-forward-%s", podID),
Annotations: map[string]string{
// Name of the manifest that this Port Forward corresponds to
// (we need this to route the logs correctly)
v1alpha1.AnnotationManifest: manifest.Name.String(),
v1alpha1.AnnotationSpanID: string(k8sconv.SpanIDForPod(podID)),
},
},
Spec: PortForwardSpec{
PodName: podID.String(),
Namespace: pod.Namespace,
Forwards: modelForwardsToApiForwards(forwards),
},
}

ctx, cancel := context.WithCancel(ctx)
entry := portForwardEntry{
PortForward: apiPf,
ctx: ctx,
cancel: cancel,
}

oldEntry, isActive := s.activeForwards[podID]
if isActive {
if cmp.Equal(oldEntry.forwards, forwards, cmp.AllowUnexported(model.PortForward{})) {
if equality.Semantic.DeepEqual(oldEntry.Spec, apiPf.Spec) {
// We're already running this port forward, nothing to do
continue
}
toShutdown = append(toShutdown, oldEntry)
}

ctx, cancel := context.WithCancel(ctx)
entry := portForwardEntry{
podID: podID,
name: ms.Name,
namespace: k8s.Namespace(pod.Namespace),
forwards: forwards,
ctx: ctx,
cancel: cancel,
// Tear down the old version so we can start the new one
toShutdown = append(toShutdown, oldEntry)
}

toStart = append(toStart, entry)
s.activeForwards[podID] = entry
}

// Find all the port-forwards that aren't in the manifest anymore
// and need to be shut down.
for key, value := range s.activeForwards {
_, inState := statePods[key]
// Find all the port-forwards that belong to old pods--these need to be shut down.
for podID, entry := range s.activeForwards {
_, inState := statePods[podID]
if inState {
continue
}

toShutdown = append(toShutdown, value)
delete(s.activeForwards, key)
toShutdown = append(toShutdown, entry)
delete(s.activeForwards, podID)
}

return toStart, toShutdown
Expand All @@ -103,25 +123,31 @@ func (s *Subscriber) OnChange(ctx context.Context, st store.RStore, _ store.Chan
toStart, toShutdown := s.diff(ctx, st)
for _, entry := range toShutdown {
entry.cancel()

// TODO(maia): st.Dispatch(NewPortForwardDeleteAction(entry.Name))
// TODO(maia): delete PF object in API
}

for _, entry := range toStart {
// Treat port-forwarding errors as part of the pod log
ctx := logger.CtxWithLogHandler(entry.ctx, PodLogActionWriter{
Store: st,
PodID: entry.podID,
ManifestName: entry.name,
PodID: k8s.PodID(entry.Spec.PodName),
ManifestName: model.ManifestName(entry.ObjectMeta.Annotations[v1alpha1.AnnotationManifest]),
})

for _, forward := range entry.forwards {
for _, forward := range entry.Spec.Forwards {
entry := entry
forward := forward
go s.startPortForwardLoop(ctx, entry, forward)
}

// TODO(maia): st.Dispatch(NewPortForwardCreateAction(entry.PortForward))
// TODO(maia): create PF object in API
}
}

func (s *Subscriber) startPortForwardLoop(ctx context.Context, entry portForwardEntry, forward model.PortForward) {
func (s *Subscriber) startPortForwardLoop(ctx context.Context, entry portForwardEntry, forward v1alpha1.Forward) {
originalBackoff := wait.Backoff{
Steps: 1000,
Duration: 50 * time.Millisecond,
Expand All @@ -142,7 +168,9 @@ func (s *Subscriber) startPortForwardLoop(ctx context.Context, entry portForward

// Otherwise, repeat the loop, maybe logging the error
if err != nil {
logger.Get(ctx).Infof("Reconnecting... Error port-forwarding %s: %v", entry.name, err)
logger.Get(ctx).Infof("Reconnecting... Error port-forwarding %s (%d -> %d): %v",
entry.ObjectMeta.Annotations[v1alpha1.AnnotationManifest],
forward.LocalPort, forward.ContainerPort, err)
}

// If this failed in less than a second, then we should advance the backoff.
Expand All @@ -155,11 +183,11 @@ func (s *Subscriber) startPortForwardLoop(ctx context.Context, entry portForward
}
}

func (s *Subscriber) onePortForward(ctx context.Context, entry portForwardEntry, forward model.PortForward) error {
ns := entry.namespace
podID := entry.podID
func (s *Subscriber) onePortForward(ctx context.Context, entry portForwardEntry, forward v1alpha1.Forward) error {
ns := k8s.Namespace(entry.Spec.Namespace)
podID := k8s.PodID(entry.Spec.PodName)

pf, err := s.kClient.CreatePortForwarder(ctx, ns, podID, forward.LocalPort, forward.ContainerPort, forward.Host)
pf, err := s.kClient.CreatePortForwarder(ctx, ns, podID, int(forward.LocalPort), int(forward.ContainerPort), forward.Host)
if err != nil {
return err
}
Expand All @@ -173,13 +201,12 @@ func (s *Subscriber) onePortForward(ctx context.Context, entry portForwardEntry,

var _ store.Subscriber = &Subscriber{}

// NOTE(maia): this struct is temporary, soon this subscriber won't maintain the state of PF's
// or be responsible for canceling them
type portForwardEntry struct {
name model.ManifestName
namespace k8s.Namespace
podID k8s.PodID
forwards []model.PortForward
ctx context.Context
cancel func()
*v1alpha1.PortForward
ctx context.Context
cancel func()
}

// Extract the port-forward specs from the manifest. If any of them
Expand Down Expand Up @@ -224,3 +251,15 @@ func (w PodLogActionWriter) Write(level logger.Level, fields logger.Fields, p []
w.Store.Dispatch(store.NewLogAction(w.ManifestName, k8sconv.SpanIDForPod(w.PodID), level, fields, p))
return nil
}

func modelForwardsToApiForwards(forwards []model.PortForward) []v1alpha1.Forward {
res := make([]v1alpha1.Forward, len(forwards))
for i, fwd := range forwards {
res[i] = v1alpha1.Forward{
LocalPort: int32(fwd.LocalPort),
ContainerPort: int32(fwd.ContainerPort),
Host: fwd.Host,
}
}
return res
}
2 changes: 1 addition & 1 deletion internal/k8s/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func newClientTestFixture(t *testing.T) *clientTestFixture {
ret.client = K8sClient{
env: EnvUnknown,
core: core,
portForwardClient: &FakePortForwardClient{},
portForwardClient: NewFakePortfowardClient(),
runtimeAsync: runtimeAsync,
registryAsync: registryAsync,
helmKubeClient: helmKube,
Expand Down
2 changes: 1 addition & 1 deletion internal/store/k8sconv/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func PodConditions(conditions []v1.PodCondition) []v1alpha1.PodCondition {
return result
}

// Convert a Kubernetes Pod into a list if simpler Container models to store in the engine state.
// Convert a Kubernetes Pod into a list of simpler Container models to store in the engine state.
func PodContainers(ctx context.Context, pod *v1.Pod, containerStatuses []v1.ContainerStatus) []v1alpha1.Container {
result := make([]v1alpha1.Container, 0, len(containerStatuses))
for _, cStatus := range containerStatuses {
Expand Down

0 comments on commit 4079428

Please sign in to comment.