diff --git a/internal/controllers/core/kubernetesdiscovery/portforwards.go b/internal/controllers/core/kubernetesdiscovery/portforwards.go new file mode 100644 index 0000000000..0c0516044e --- /dev/null +++ b/internal/controllers/core/kubernetesdiscovery/portforwards.go @@ -0,0 +1,176 @@ +package kubernetesdiscovery + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + errorutil "k8s.io/apimachinery/pkg/util/errors" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/tilt-dev/tilt/internal/controllers/apicmp" + "github.com/tilt-dev/tilt/internal/store" + "github.com/tilt-dev/tilt/pkg/apis/core/v1alpha1" +) + +func (r *Reconciler) manageOwnedPortForwards(ctx context.Context, kd *v1alpha1.KubernetesDiscovery) error { + var pfList v1alpha1.PortForwardList + err := r.ctrlClient.List(ctx, &pfList, ctrlclient.InNamespace(kd.Namespace), + ctrlclient.MatchingFields{ownerKey: kd.Name}) + if err != nil { + return fmt.Errorf("failed to fetch managed PortForward objects for KubernetesDiscovery %s: %v", + kd.Name, err) + } + + pf, err := r.toDesiredPortForward(kd) + if err != nil { + return fmt.Errorf("creating portforward: %v", err) + } + + // Delete all the port-forwards that don't match this one. + errs := []error{} + foundDesired := false + for _, existingPF := range pfList.Items { + matchesPF := pf != nil && existingPF.Name == pf.Name + if matchesPF { + foundDesired = true + + // If this PortForward is already in the APIServer, make sure it's up-to-date. + if apicmp.DeepEqual(pf.Spec, existingPF) { + continue + } + + updatedPF := existingPF.DeepCopy() + updatedPF.Spec = pf.Spec + err := r.ctrlClient.Update(ctx, updatedPF) + if err != nil && !apierrors.IsNotFound(err) { + errs = append(errs, fmt.Errorf("updating portforward %s: %v", existingPF.Name, err)) + } + continue + } + + // If this does not match the desired PF, this PF needs to be garbage collected. + deletedPF := existingPF.DeepCopy() + err := r.ctrlClient.Delete(ctx, deletedPF) + if err != nil && !apierrors.IsNotFound(err) { + errs = append(errs, fmt.Errorf("deleting portforward %s: %v", existingPF.Name, err)) + } + } + + if !foundDesired && pf != nil { + err := r.ctrlClient.Create(ctx, pf) + if err != nil && !apierrors.IsAlreadyExists(err) { + errs = append(errs, fmt.Errorf("creating portforward %s: %v", pf.Name, err)) + } + } + + return errorutil.NewAggregate(errs) +} + +// Construct the desired port-forward. May be nil. +func (r *Reconciler) toDesiredPortForward(kd *v1alpha1.KubernetesDiscovery) (*v1alpha1.PortForward, error) { + pfTemplate := kd.Spec.PortForwardTemplateSpec + if pfTemplate == nil { + return nil, nil + } + + pod := pickBestPortForwardPod(kd) + if pod == nil { + return nil, nil + } + + pf := &v1alpha1.PortForward{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", kd.Name, pod.Name), + Annotations: map[string]string{ + v1alpha1.AnnotationManifest: kd.Annotations[v1alpha1.AnnotationManifest], + v1alpha1.AnnotationSpanID: kd.Annotations[v1alpha1.AnnotationSpanID], + }, + }, + Spec: v1alpha1.PortForwardSpec{ + PodName: pod.Name, + Namespace: pod.Namespace, + Forwards: pfTemplate.Forwards, + }, + } + populateContainerPorts(pf, pod) + err := controllerutil.SetControllerReference(kd, pf, r.ctrlClient.Scheme()) + if err != nil { + return nil, err + } + return pf, nil +} + +// If any of the port-forward specs have ContainerPort = 0, populate them with +// the documented ports on the pod. If there's no default documented ports for +// the pod, populate it with the local port. +// +// TODO(nick): This is old legacy behavior, and I'm not totally sure it even +// makes sense. I wonder if we should just insist that ContainerPort is populated. +func populateContainerPorts(pf *v1alpha1.PortForward, pod *v1alpha1.Pod) { + cPorts := store.AllPodContainerPorts(*pod) + for i, forward := range pf.Spec.Forwards { + if forward.ContainerPort == 0 && len(cPorts) > 0 { + forward.ContainerPort = int32(cPorts[0]) + for _, cPort := range cPorts { + if int(forward.LocalPort) == int(cPort) { + forward.ContainerPort = int32(cPort) + break + } + } + } + if forward.ContainerPort == 0 { + forward.ContainerPort = forward.LocalPort + } + pf.Spec.Forwards[i] = forward + } +} + +// We can only portforward to one pod at a time. +// So pick the "best" pod to portforward to. +// May be nil if there is no eligible pod. +func pickBestPortForwardPod(kd *v1alpha1.KubernetesDiscovery) *v1alpha1.Pod { + var bestPod *v1alpha1.Pod + for _, pod := range kd.Status.Pods { + pod := pod + if pod.Name == "" { + continue + } + + // Only do port-forwarding if the pod is running or deleting. + isRunning := pod.Phase != string(v1.PodRunning) + isDeleting := pod.Deleting + if !isRunning && !isDeleting { + continue + } + + // This pod is eligible! Now compare it to the existing candidate (if there is one). + if bestPod == nil || isBetterPortForwardPod(&pod, bestPod) { + bestPod = &pod + } + } + return bestPod +} + +// Check if podA is better than podB for port-forwarding. +func isBetterPortForwardPod(podA, podB *v1alpha1.Pod) bool { + // A non-deleting pod is always better than a deleting pod. + if podB.Deleting && !podA.Deleting { + return true + } else if podA.Deleting && !podB.Deleting { + return false + } + + // Otherwise, a more recent pod is better. + if podA.CreatedAt.After(podB.CreatedAt.Time) { + return true + } else if podB.CreatedAt.After(podA.CreatedAt.Time) { + return false + } + + // Use the name as a tie-breaker. + return podA.Name > podB.Name +} diff --git a/internal/controllers/core/kubernetesdiscovery/portforwards_test.go b/internal/controllers/core/kubernetesdiscovery/portforwards_test.go new file mode 100644 index 0000000000..9c1c76667d --- /dev/null +++ b/internal/controllers/core/kubernetesdiscovery/portforwards_test.go @@ -0,0 +1,101 @@ +package kubernetesdiscovery + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/tilt-dev/tilt/pkg/apis/core/v1alpha1" +) + +func TestPortForwardCreateAndUpdate(t *testing.T) { + f := newFixture(t) + + pod := f.buildPod("pod-ns", "pod", nil, nil) + key := types.NamespacedName{Name: "kd"} + kd := &v1alpha1.KubernetesDiscovery{ + ObjectMeta: metav1.ObjectMeta{Name: "kd"}, + Spec: v1alpha1.KubernetesDiscoverySpec{ + Watches: []v1alpha1.KubernetesWatchRef{ + { + UID: string(pod.UID), + Namespace: pod.Namespace, + Name: pod.Name, + }, + }, + PortForwardTemplateSpec: &v1alpha1.PortForwardTemplateSpec{ + Forwards: []v1alpha1.Forward{ + v1alpha1.Forward{LocalPort: 4000, ContainerPort: 4000}, + }, + }, + }, + } + + f.Create(kd) + f.kClient.UpsertPod(pod) + + f.requireObservedPods(key, ancestorMap{pod.UID: pod.UID}) + + // Simulate the reconcile (which would normally be invoked by the manager on status update). + f.MustReconcile(key) + + var pf v1alpha1.PortForward + f.MustGet(types.NamespacedName{Name: "kd-pod"}, &pf) + require.Equal(t, 1, len(pf.Spec.Forwards)) + assert.Equal(t, 4000, int(pf.Spec.Forwards[0].LocalPort)) + + f.MustGet(key, kd) + kd.Spec.PortForwardTemplateSpec.Forwards[0].LocalPort = 4001 + f.Update(kd) + + f.MustReconcile(key) + + f.MustGet(types.NamespacedName{Name: "kd-pod"}, &pf) + require.Equal(t, 1, len(pf.Spec.Forwards)) + assert.Equal(t, 4001, int(pf.Spec.Forwards[0].LocalPort)) +} + +func TestPortForwardCreateAndDelete(t *testing.T) { + f := newFixture(t) + + pod := f.buildPod("pod-ns", "pod", nil, nil) + key := types.NamespacedName{Name: "kd"} + kd := &v1alpha1.KubernetesDiscovery{ + ObjectMeta: metav1.ObjectMeta{Name: key.Name}, + Spec: v1alpha1.KubernetesDiscoverySpec{ + Watches: []v1alpha1.KubernetesWatchRef{ + { + UID: string(pod.UID), + Namespace: pod.Namespace, + Name: pod.Name, + }, + }, + PortForwardTemplateSpec: &v1alpha1.PortForwardTemplateSpec{ + Forwards: []v1alpha1.Forward{ + v1alpha1.Forward{LocalPort: 4000, ContainerPort: 4000}, + }, + }, + }, + } + + f.Create(kd) + f.kClient.UpsertPod(pod) + + f.requireObservedPods(key, ancestorMap{pod.UID: pod.UID}) + + // Simulate the reconcile (which would normally be invoked by the manager on status update). + f.MustReconcile(key) + + var pf v1alpha1.PortForward + f.MustGet(types.NamespacedName{Name: "kd-pod"}, &pf) + + f.MustGet(key, kd) + kd.Spec.PortForwardTemplateSpec = nil + f.Update(kd) + + f.MustReconcile(key) + assert.False(t, f.Get(types.NamespacedName{Name: "kd-pod"}, &pf)) +} diff --git a/internal/controllers/core/kubernetesdiscovery/reconciler.go b/internal/controllers/core/kubernetesdiscovery/reconciler.go index 13f4022699..3d8995b5a3 100644 --- a/internal/controllers/core/kubernetesdiscovery/reconciler.go +++ b/internal/controllers/core/kubernetesdiscovery/reconciler.go @@ -45,6 +45,17 @@ var ( apiGVStr = v1alpha1.SchemeGroupVersion.String() ) +func ownedByKubernetesDiscoveryIndexFunc(obj ctrlclient.Object) []string { + owner := metav1.GetControllerOf(obj) + if owner == nil { + return nil + } + if owner.APIVersion != apiGVStr || owner.Kind != "KubernetesDiscovery" { + return nil + } + return []string{owner.Name} +} + type watcherSet map[watcherID]bool // watcherID is to disambiguate between K8s object keys and tilt-apiserver KubernetesDiscovery object keys. @@ -97,23 +108,22 @@ type Reconciler struct { func (w *Reconciler) CreateBuilder(mgr ctrl.Manager) (*builder.Builder, error) { // modeled after KubeBuilder example: https://book.kubebuilder.io/cronjob-tutorial/controller-implementation.html#setup // to ensure that KubernetesDiscovery is reconciled whenever one of the objects it creates is modified - err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.PodLogStream{}, ownerKey, func(obj ctrlclient.Object) []string { - owner := metav1.GetControllerOf(obj) - if owner == nil { - return nil - } - if owner.APIVersion != apiGVStr || owner.Kind != "KubernetesDiscovery" { - return nil - } - return []string{owner.Name} - }) + err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.PodLogStream{}, ownerKey, + ownedByKubernetesDiscoveryIndexFunc) + if err != nil { + return nil, err + } + + err = mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.PortForward{}, ownerKey, + ownedByKubernetesDiscoveryIndexFunc) if err != nil { return nil, err } b := ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.KubernetesDiscovery{}). - Owns(&v1alpha1.PodLogStream{}) + Owns(&v1alpha1.PodLogStream{}). + Owns(&v1alpha1.PortForward{}) return b, nil } @@ -172,6 +182,9 @@ func (w *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( w.teardown(key) w.cleanupAbandonedNamespaces() } + + // TODO(nick): Delete any orphaned PortForwards or PodLogStreams. + return ctrl.Result{}, nil } @@ -182,7 +195,11 @@ func (w *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( } } - if err := w.ensurePodLogStreamsExist(ctx, *kd); err != nil { + if err := w.manageOwnedPodLogStreams(ctx, *kd); err != nil { + return ctrl.Result{}, err + } + + if err := w.manageOwnedPortForwards(ctx, kd); err != nil { return ctrl.Result{}, err } @@ -568,7 +585,7 @@ func (w *Reconciler) handlePodDelete(ctx context.Context, namespace k8s.Namespac } } -func (w *Reconciler) ensurePodLogStreamsExist(ctx context.Context, kd v1alpha1.KubernetesDiscovery) error { +func (w *Reconciler) manageOwnedPodLogStreams(ctx context.Context, kd v1alpha1.KubernetesDiscovery) error { var managedPodLogStreams v1alpha1.PodLogStreamList err := w.ctrlClient.List(ctx, &managedPodLogStreams, ctrlclient.InNamespace(kd.Namespace), ctrlclient.MatchingFields{ownerKey: kd.Name})