From b000cac16c13fb34c0dd88f9506a2df251d64c03 Mon Sep 17 00:00:00 2001 From: "elvandlie@gmail.com" Date: Thu, 21 May 2026 02:58:00 +0700 Subject: [PATCH] fix(knative): clean up stale triggers on deploy createTriggers only creates triggers but never removes stale ones. If a user removes a subscription from func.yaml and redeploys, old triggers with higher indices remain orphaned on the cluster. Added a cleanup step after trigger creation that lists existing triggers matching the naming pattern and deletes any whose index exceeds the current subscription count. This mirrors the deleteStaleTriggers pattern used in the k8s deployer. Fixes #3799 --- pkg/knative/deployer.go | 41 ++++++++++++++++++ pkg/knative/deployer_test.go | 84 ++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index 1bd011d142..4cb2f931d5 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "os" + "strconv" "strings" "time" @@ -410,6 +411,46 @@ func createTriggers(ctx context.Context, f fn.Function, client clientservingv1.K return err } } + + // Clean up stale triggers that are no longer in the desired subscription set. + triggerPrefix := fmt.Sprintf("%s-function-trigger-", ksvc.GetName()) + existingTriggers, err := eventingClient.ListTriggers(ctx) + if err != nil { + if errors.IsNotFound(err) || strings.HasPrefix(err.Error(), "no or newer Knative Eventing API found on the backend") { + return nil + } + return fmt.Errorf("knative deployer failed to list triggers for cleanup: %v", err) + } + triggerNames := make([]string, len(existingTriggers.Items)) + for i, t := range existingTriggers.Items { + triggerNames[i] = t.Name + } + return deleteStaleTriggers(triggerNames, triggerPrefix, len(f.Deploy.Subscriptions), func(name string) error { + return eventingClient.DeleteTrigger(ctx, name) + }) +} + +// deleteStaleTriggers removes triggers whose numeric index suffix is >= desiredCount. +// The knative deployer names triggers as . If the subscription count +// decreases, triggers with higher indices become orphaned and must be cleaned up. +func deleteStaleTriggers(triggerNames []string, triggerPrefix string, desiredCount int, deleteFn func(name string) error) error { + for _, name := range triggerNames { + if !strings.HasPrefix(name, triggerPrefix) { + continue + } + idxStr := strings.TrimPrefix(name, triggerPrefix) + idx, parseErr := strconv.Atoi(idxStr) + if parseErr != nil { + continue + } + if idx >= desiredCount { + fmt.Fprintf(os.Stderr, "Deleting stale trigger: %s\n", name) + delErr := deleteFn(name) + if delErr != nil && !errors.IsNotFound(delErr) { + return fmt.Errorf("knative deployer failed to delete stale trigger %s: %v", name, delErr) + } + } + } return nil } diff --git a/pkg/knative/deployer_test.go b/pkg/knative/deployer_test.go index e024ffec6e..7966142fdb 100644 --- a/pkg/knative/deployer_test.go +++ b/pkg/knative/deployer_test.go @@ -461,3 +461,87 @@ func assertAuth(uname, pwd string, w http.ResponseWriter, r *http.Request) bool _, _ = fmt.Fprintln(w, "Unauthorised.") return false } + +// TestDeleteStaleTriggers_NoStale ensures no triggers are deleted when all are within range. +func TestDeleteStaleTriggers_NoStale(t *testing.T) { + triggers := []string{"my-func-function-trigger-0", "my-func-function-trigger-1"} + prefix := "my-func-function-trigger-" + var deleted []string + deleteFn := func(name string) error { + deleted = append(deleted, name) + return nil + } + err := deleteStaleTriggers(triggers, prefix, 2, deleteFn) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(deleted) != 0 { + t.Fatalf("expected no deletions, got %v", deleted) + } +} + +// TestDeleteStaleTriggers_DeletesStale ensures triggers beyond desired count are deleted. +func TestDeleteStaleTriggers_DeletesStale(t *testing.T) { + triggers := []string{ + "my-func-function-trigger-0", + "my-func-function-trigger-1", + "my-func-function-trigger-2", + "my-func-function-trigger-3", + } + prefix := "my-func-function-trigger-" + var deleted []string + deleteFn := func(name string) error { + deleted = append(deleted, name) + return nil + } + // desiredCount=2 means indices 2 and 3 are stale + err := deleteStaleTriggers(triggers, prefix, 2, deleteFn) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(deleted) != 2 { + t.Fatalf("expected 2 deletions, got %v", deleted) + } + if deleted[0] != "my-func-function-trigger-2" || deleted[1] != "my-func-function-trigger-3" { + t.Fatalf("unexpected deleted triggers: %v", deleted) + } +} + +// TestDeleteStaleTriggers_SkipsNonMatching ensures triggers that don't match the prefix or +// have unparseable suffixes are skipped. +func TestDeleteStaleTriggers_SkipsNonMatching(t *testing.T) { + triggers := []string{ + "other-trigger-0", // wrong prefix + "my-func-function-trigger-abc", // unparseable index + "my-func-function-trigger-2", // stale, should be deleted + } + prefix := "my-func-function-trigger-" + var deleted []string + deleteFn := func(name string) error { + deleted = append(deleted, name) + return nil + } + err := deleteStaleTriggers(triggers, prefix, 1, deleteFn) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(deleted) != 1 || deleted[0] != "my-func-function-trigger-2" { + t.Fatalf("expected only trigger-2 deleted, got %v", deleted) + } +} + +// TestDeleteStaleTriggers_DeleteError ensures errors from the delete callback are returned. +func TestDeleteStaleTriggers_DeleteError(t *testing.T) { + triggers := []string{"my-func-function-trigger-5"} + prefix := "my-func-function-trigger-" + deleteFn := func(name string) error { + return fmt.Errorf("connection refused") + } + err := deleteStaleTriggers(triggers, prefix, 1, deleteFn) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "connection refused") { + t.Fatalf("expected 'connection refused' in error, got: %v", err) + } +}