Skip to content

Commit

Permalink
fix: Switch to using plain watches + channels instead of informers
Browse files Browse the repository at this point in the history
  • Loading branch information
codablock committed Jun 9, 2023
1 parent 4b6bfb2 commit 71a5aa6
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 233 deletions.
7 changes: 6 additions & 1 deletion cmd/kluctl/commands/cmd_controller_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path/filepath"
"sigs.k8s.io/cli-utils/pkg/flowcontrol"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
crtlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
Expand Down Expand Up @@ -130,7 +131,11 @@ func (cmd *controllerRunCmd) Run(ctx context.Context) error {
}

if cmd.WriteCommandResult {
resultStore, err := results.NewResultStoreSecrets(ctx, mgr.GetClient(), mgr.GetCache(), cmd.CommandResultNamespace, cmd.KeepCommandResultsCount)
wc, ok := mgr.GetClient().(client.WithWatch)
if !ok {
return fmt.Errorf("client does not implement WithWatch")
}
resultStore, err := results.NewResultStoreSecrets(ctx, wc, cmd.CommandResultNamespace, cmd.KeepCommandResultsCount)
if err != nil {
return err
}
Expand Down
11 changes: 2 additions & 9 deletions cmd/kluctl/commands/cmd_webui.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/kluctl/kluctl/v2/pkg/webui"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -89,18 +88,12 @@ func (cmd *webuiCmd) createResultStores(ctx context.Context) ([]results.ResultSt
return nil, nil, err
}

client, err := client.New(config, client.Options{})
client, err := client.NewWithWatch(config, client.Options{})
if err != nil {
return nil, nil, err
}

cache, err := cache.New(config, cache.Options{})
if err != nil {
return nil, nil, err
}
go cache.Start(ctx)

store, err := results.NewResultStoreSecrets(ctx, client, cache, "", 0)
store, err := results.NewResultStoreSecrets(ctx, client, "", 0)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kluctl/commands/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func withProjectTargetCommandContext(ctx context.Context, args projectTargetComm
return err
}

resultStore, err = results.NewResultStoreSecrets(ctx, client, nil, args.commandResultFlags.CommandResultNamespace, args.commandResultFlags.KeepCommandResultsCount)
resultStore, err = results.NewResultStoreSecrets(ctx, client, args.commandResultFlags.CommandResultNamespace, args.commandResultFlags.KeepCommandResultsCount)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/k8s/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ClientFactory interface {
CloseIdleConnections()

Mapper() meta.RESTMapper
Client(wh rest.WarningHandler) (client.Client, error)
Client(wh rest.WarningHandler) (client.WithWatch, error)

DiscoveryClient() (discovery.DiscoveryInterface, error)
CoreV1Client(wh rest.WarningHandler) (corev1.CoreV1Interface, error)
Expand All @@ -53,11 +53,11 @@ func (r *realClientFactory) Mapper() meta.RESTMapper {
return r.mapper
}

func (r *realClientFactory) Client(wh rest.WarningHandler) (client.Client, error) {
func (r *realClientFactory) Client(wh rest.WarningHandler) (client.WithWatch, error) {
config := rest.CopyConfig(r.config)
config.WarningHandler = wh

return client.New(config, client.Options{
return client.NewWithWatch(config, client.Options{
Mapper: r.mapper,
WarningHandler: client.WarningHandlerOptions{
SuppressWarnings: true,
Expand Down
2 changes: 1 addition & 1 deletion pkg/k8s/fake_client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (f *fakeClientFactory) Mapper() meta.RESTMapper {
return f.mapper
}

func (f *fakeClientFactory) Client(wh rest.WarningHandler) (client.Client, error) {
func (f *fakeClientFactory) Client(wh rest.WarningHandler) (client.WithWatch, error) {
return fake2.NewClientBuilder().
WithScheme(f.scheme).
WithRESTMapper(f.mapper).
Expand Down
2 changes: 1 addition & 1 deletion pkg/k8s/k8s_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (k *K8sCluster) ResetMapper() {
}
}

func (k *K8sCluster) ToClient() (client.Client, error) {
func (k *K8sCluster) ToClient() (client.WithWatch, error) {
return k.clientFactory.Client(nil)
}

Expand Down
160 changes: 59 additions & 101 deletions pkg/results/result-store-secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/watch"
"path"
"regexp"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sort"
"strings"
Expand All @@ -24,56 +23,23 @@ import (

type ResultStoreSecrets struct {
ctx context.Context
client client.Client
cache cache.Cache
reader client.Reader
client client.WithWatch

writeNamespace string
keepResultsCount int

mutex sync.Mutex
informer cache.Informer
mutex sync.Mutex
}

func NewResultStoreSecrets(ctx context.Context, client client.Client, cache cache.Cache, writeNamespace string, keepResultsCount int) (*ResultStoreSecrets, error) {
func NewResultStoreSecrets(ctx context.Context, client client.WithWatch, writeNamespace string, keepResultsCount int) (*ResultStoreSecrets, error) {
s := &ResultStoreSecrets{
ctx: ctx,
client: client,
cache: cache,
writeNamespace: writeNamespace,
keepResultsCount: keepResultsCount,
}
if cache != nil {
s.reader = cache
} else {
s.reader = client
}
return s, nil
}

func (s *ResultStoreSecrets) ensureInformer() error {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.informer != nil {
return nil
}
if s.cache == nil {
return nil
}

var partialSecret metav1.PartialObjectMetadata
partialSecret.SetGroupVersionKind(schema.GroupVersionKind{Version: "v1", Kind: "Secret"})

informer, err := s.cache.GetInformer(s.ctx, &partialSecret)
if err != nil {
return err
}

s.informer = informer
s.cache.WaitForCacheSync(s.ctx)

return nil
return s, nil
}

var invalidChars = regexp.MustCompile(`[^a-zA-Z0-9-]`)
Expand Down Expand Up @@ -106,7 +72,7 @@ func (s *ResultStoreSecrets) ensureWriteNamespace() error {
return fmt.Errorf("missing writeNamespace")
}
var ns corev1.Namespace
err := s.reader.Get(s.ctx, client.ObjectKey{Name: s.writeNamespace}, &ns)
err := s.client.Get(s.ctx, client.ObjectKey{Name: s.writeNamespace}, &ns)
if err != nil && errors.IsNotFound(err) {
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -125,12 +91,7 @@ func (s *ResultStoreSecrets) ensureWriteNamespace() error {
}

func (s *ResultStoreSecrets) WriteCommandResult(cr *result.CommandResult) error {
err := s.ensureInformer()
if err != nil {
return err
}

err = s.ensureWriteNamespace()
err := s.ensureWriteNamespace()
if err != nil {
return err
}
Expand Down Expand Up @@ -231,22 +192,17 @@ func (s *ResultStoreSecrets) cleanupResults(project result.ProjectKey, target re
}

func (s *ResultStoreSecrets) ListCommandResultSummaries(options ListCommandResultSummariesOptions) ([]result.CommandResultSummary, error) {
err := s.ensureInformer()
if err != nil {
return nil, err
}

var l metav1.PartialObjectMetadataList
l.SetGroupVersionKind(schema.GroupVersionKind{Version: "v1", Kind: "SecretList"})
err = s.reader.List(s.ctx, &l, client.HasLabels{"kluctl.io/result-id"})
err := s.client.List(s.ctx, &l, client.HasLabels{"kluctl.io/result-id"})
if err != nil {
return nil, err
}

ret := make([]result.CommandResultSummary, 0, len(l.Items))

for _, x := range l.Items {
summary, err := s.parseSummary(&x)
summary, err := s.parseSummary(x.GetAnnotations())
if err != nil {
continue
}
Expand All @@ -264,11 +220,11 @@ func (s *ResultStoreSecrets) ListCommandResultSummaries(options ListCommandResul
return ret, nil
}

func (s *ResultStoreSecrets) parseSummary(obj client.Object) (*result.CommandResultSummary, error) {
if len(obj.GetAnnotations()) == 0 {
func (s *ResultStoreSecrets) parseSummary(a map[string]string) (*result.CommandResultSummary, error) {
if len(a) == 0 {
return nil, nil
}
summaryJson := obj.GetAnnotations()["kluctl.io/result-summary"]
summaryJson := a["kluctl.io/result-summary"]
if summaryJson == "" {
return nil, nil
}
Expand All @@ -295,67 +251,69 @@ func (s *ResultStoreSecrets) filterSummary(summary *result.CommandResultSummary,
return true
}

func (s *ResultStoreSecrets) WatchCommandResultSummaries(options ListCommandResultSummariesOptions, update func(summary *result.CommandResultSummary), delete func(id string)) (func(), error) {
err := s.ensureInformer()
func (s *ResultStoreSecrets) WatchCommandResultSummaries(options ListCommandResultSummariesOptions) ([]*result.CommandResultSummary, <-chan WatchCommandResultSummaryEvent, context.CancelFunc, error) {
var l metav1.PartialObjectMetadataList
l.SetGroupVersionKind(schema.GroupVersionKind{Version: "v1", Kind: "SecretList"})
w, err := s.client.Watch(s.ctx, &l, client.HasLabels{"kluctl.io/result-id"})
if err != nil {
return nil, err
return nil, nil, nil, err
}

handler := func(obj any, deleted bool) {
obj2, ok := obj.(client.Object)
if !ok {
return
}
summary, err := s.parseSummary(obj2)
var initialListRet []*result.CommandResultSummary
for _, x := range l.Items {
summary, err := s.parseSummary(x.GetAnnotations())
if err != nil {
return
}
if summary == nil {
return
continue
}
if !s.filterSummary(summary, options.ProjectFilter) {
return
}
if deleted {
delete(summary.Id)
} else {
update(summary)
continue
}
initialListRet = append(initialListRet, summary)
}

r, err := s.informer.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handler(obj, false)
},
UpdateFunc: func(oldObj, newObj interface{}) {
handler(newObj, false)
},
DeleteFunc: func(obj interface{}) {
handler(obj, true)
},
})
if err != nil {
return nil, err
}
ch := make(chan WatchCommandResultSummaryEvent)

stopCh := make(chan struct{})
toolscache.WaitForCacheSync(stopCh, r.HasSynced)
go func() {
for x := range w.ResultChan() {
if x.Object == nil {
continue
}
x2, ok := x.Object.(client.Object)
if !ok {
continue
}
summary, err := s.parseSummary(x2.GetAnnotations())
if err != nil {
continue
}
if !s.filterSummary(summary, options.ProjectFilter) {
continue
}
switch x.Type {
case watch.Deleted:
ch <- WatchCommandResultSummaryEvent{
Delete: true,
Summary: summary,
}
case watch.Added, watch.Modified:
ch <- WatchCommandResultSummaryEvent{
Summary: summary,
}
}
}
close(ch)
}()

cancel := func() {
_ = s.informer.RemoveEventHandler(r)
w.Stop()
}
return cancel, nil
return nil, ch, cancel, nil
}

func (s *ResultStoreSecrets) getCommandResultSecret(id string) (*metav1.PartialObjectMetadata, error) {
err := s.ensureInformer()
if err != nil {
return nil, err
}

var l metav1.PartialObjectMetadataList
l.SetGroupVersionKind(schema.GroupVersionKind{Version: "v1", Kind: "SecretList"})
err = s.reader.List(s.ctx, &l, client.MatchingLabels{"kluctl.io/result-id": id})
err := s.client.List(s.ctx, &l, client.MatchingLabels{"kluctl.io/result-id": id})
if err != nil {
return nil, err
}
Expand All @@ -378,7 +336,7 @@ func (s *ResultStoreSecrets) GetCommandResultSummary(id string) (*result.Command
if err != nil {
return nil, err
}
return s.parseSummary(secret)
return s.parseSummary(secret.GetAnnotations())
}

func (s *ResultStoreSecrets) GetCommandResult(options GetCommandResultOptions) (*result.CommandResult, error) {
Expand All @@ -391,7 +349,7 @@ func (s *ResultStoreSecrets) GetCommandResult(options GetCommandResultOptions) (
}

var l corev1.SecretList
err = s.reader.List(s.ctx, &l, client.MatchingLabels{
err = s.client.List(s.ctx, &l, client.MatchingLabels{
"kluctl.io/result-id": options.Id,
})
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/results/result-store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package results

import (
"context"
"github.com/kluctl/kluctl/v2/pkg/types/result"
)

Expand All @@ -21,11 +22,16 @@ type GetCommandResultOptions struct {
Reduced bool `json:"reduced,omitempty"`
}

type WatchCommandResultSummaryEvent struct {
Summary *result.CommandResultSummary `json:"summary"`
Delete bool `json:"delete"`
}

type ResultStore interface {
WriteCommandResult(cr *result.CommandResult) error

ListCommandResultSummaries(options ListCommandResultSummariesOptions) ([]result.CommandResultSummary, error)
WatchCommandResultSummaries(options ListCommandResultSummariesOptions, update func(summary *result.CommandResultSummary), delete func(id string)) (func(), error)
WatchCommandResultSummaries(options ListCommandResultSummariesOptions) ([]*result.CommandResultSummary, <-chan WatchCommandResultSummaryEvent, context.CancelFunc, error)
HasCommandResult(id string) (bool, error)
GetCommandResultSummary(id string) (*result.CommandResultSummary, error)
GetCommandResult(options GetCommandResultOptions) (*result.CommandResult, error)
Expand Down

0 comments on commit 71a5aa6

Please sign in to comment.