diff --git a/pkg/chains/signing.go b/pkg/chains/signing.go index 453bad982..d33aaa672 100644 --- a/pkg/chains/signing.go +++ b/pkg/chains/signing.go @@ -184,7 +184,17 @@ func (o *ObjectSigner) Sign(ctx context.Context, tektonObj objects.TektonObject) // Now store those! for _, backend := range sets.List[string](signableType.StorageBackend(cfg)) { - b := o.Backends[backend] + logger.Infof("signable storage backends: %v", signableType.StorageBackend(cfg)) + logger.Infof("o.Backends(): %v", o.Backends) + + b, ok := o.Backends[backend] + if !ok { + backendErr := fmt.Errorf("could not find backend '%s' in configured backends (%v) while trying sign: %s/%s", backend, o.getBackendsList(), tektonObj.GetKindName(), tektonObj.GetName()) + logger.Error(backendErr) + merr = multierror.Append(merr, backendErr) + continue + } + storageOpts := config.StorageOpts{ ShortKey: signableType.ShortKey(obj), FullKey: signableType.FullKey(obj), @@ -235,6 +245,14 @@ func (o *ObjectSigner) Sign(ctx context.Context, tektonObj objects.TektonObject) return nil } +func (o *ObjectSigner) getBackendsList() []string { + backendList := []string{} + for name := range o.Backends { + backendList = append(backendList, name) + } + return backendList +} + func measureMetrics(ctx context.Context, metrictype string, mtr MetricsRecorder) { if mtr != nil { mtr.RecordCountMetrics(ctx, metrictype) diff --git a/pkg/chains/storage/docdb/docdb.go b/pkg/chains/storage/docdb/docdb.go index d9b82c979..148c50108 100644 --- a/pkg/chains/storage/docdb/docdb.go +++ b/pkg/chains/storage/docdb/docdb.go @@ -17,19 +17,31 @@ import ( "context" "encoding/base64" "encoding/json" + "fmt" + "net/url" + "os" + "path" + "slices" + "strings" + "github.com/fsnotify/fsnotify" "github.com/tektoncd/chains/pkg/chains/objects" "github.com/tektoncd/chains/pkg/config" "gocloud.dev/docstore" _ "gocloud.dev/docstore/awsdynamodb" _ "gocloud.dev/docstore/gcpfirestore" + "gocloud.dev/docstore/mongodocstore" _ "gocloud.dev/docstore/mongodocstore" + "knative.dev/pkg/logging" ) const ( StorageTypeDocDB = "docdb" ) +// ErrNothingToWatch is an error that's returned when the backend doesn't have anything to "watch" +var ErrNothingToWatch = fmt.Errorf("backend has nothing to watch") + // Backend is a storage backend that stores signed payloads in the TaskRun metadata as an annotation. // It is stored as base64 encoded JSON. type Backend struct { @@ -47,8 +59,21 @@ type SignedDocument struct { // NewStorageBackend returns a new Tekton StorageBackend that stores signatures on a TaskRun func NewStorageBackend(ctx context.Context, cfg config.Config) (*Backend, error) { - url := cfg.Storage.DocDB.URL - coll, err := docstore.OpenCollection(ctx, url) + docdbURL := cfg.Storage.DocDB.URL + + u, err := url.Parse(docdbURL) + if err != nil { + return nil, err + } + + if u.Scheme == mongodocstore.Scheme { + // MONGO_SERVER_URL can be passed in as an environment variable or via config fields + if err := populateMongoServerURL(ctx, cfg); err != nil { + return nil, err + } + } + + coll, err := docstore.OpenCollection(ctx, docdbURL) if err != nil { return nil, err } @@ -58,6 +83,100 @@ func NewStorageBackend(ctx context.Context, cfg config.Config) (*Backend, error) }, nil } +// WatchBackend returns a channel that receives a new Backend each time it needs to be updated +func WatchBackend(ctx context.Context, cfg config.Config, watcherStop chan bool) (chan *Backend, error) { + logger := logging.FromContext(ctx) + docDBURL := cfg.Storage.DocDB.URL + + u, err := url.Parse(docDBURL) + if err != nil { + return nil, err + } + + // Set up the watcher only for mongo backends + if u.Scheme != "mongo" { + return nil, ErrNothingToWatch + } + + // Set up watcher only when `storage.docdb.mongo-server-url-dir` is set + if cfg.Storage.DocDB.MongoServerURLDir == "" { + return nil, ErrNothingToWatch + } + + logger.Infof("setting up fsnotify watcher for directory: %s", cfg.Storage.DocDB.MongoServerURLDir) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + pathsToWatch := []string{ + // mongo-server-url-dir/MONGO_SERVER_URL is where the MONGO_SERVER_URL environment + // variable is expected to be mounted, either manually or via a Kubernetes secret, etc. + path.Join(cfg.Storage.DocDB.MongoServerURLDir, "MONGO_SERVER_URL"), + // When a Kubernetes secret is mounted on a path, the `data` in that secret is mounted + // under path/..data that is then `symlink`ed to the key of the data. In this instance, + // the mounted path is going to look like: + // file 1 - ..2024_05_03_11_23_23.1253599725 + // file 2 - ..data -> ..2024_05_03_11_23_23.1253599725 + // file 3 - MONGO_SERVER_URL -> ..data/MONGO_SERVER_URL + // So each time the secret is updated, the file `MONGO_SERVER_URL` is not updated, + // instead the underlying symlink at `..data` is updated and that's what we want to + // capture via the fsnotify event watcher + path.Join(cfg.Storage.DocDB.MongoServerURLDir, "..data"), + } + + backendChan := make(chan *Backend) + // Start listening for events. + go func() { + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + logger.Infof("received event: %s, path: %s", event.Op.String(), event.Name) + // Only respond to create or write events in the directory + if event.Has(fsnotify.Create) || event.Has(fsnotify.Write) { + // event.Name captures the path that is updated + if slices.Contains(pathsToWatch, event.Name) { + logger.Infof("directory %s has been updated, reconfiguring backend...", cfg.Storage.DocDB.MongoServerURLDir) + + // Now that MONGO_SERVER_URL has been updated, we should update docdb backend again + newDocDBBackend, err := NewStorageBackend(ctx, cfg) + if err != nil { + logger.Error(err) + backendChan <- nil + } else { + // Storing the backend in the signer so everyone has access to the up-to-date backend + backendChan <- newDocDBBackend + } + } + } + + // TODO: add condition for REMOVE event + + case err, ok := <-watcher.Errors: + if !ok { + return + } + logger.Error(err) + + case <-watcherStop: + logger.Info("stopping fsnotify context...") + return + } + } + }() + + // Add a path. + err = watcher.Add(cfg.Storage.DocDB.MongoServerURLDir) + if err != nil { + return nil, err + } + return backendChan, nil +} + // StorePayload implements the Payloader interface. func (b *Backend) StorePayload(ctx context.Context, _ objects.TektonObject, rawPayload []byte, signature string, opts config.StorageOpts) error { var obj interface{} @@ -125,3 +244,70 @@ func (b *Backend) retrieveDocuments(ctx context.Context, opts config.StorageOpts } return []SignedDocument{d}, nil } + +func populateMongoServerURL(ctx context.Context, cfg config.Config) error { + // First preference is given to the key `storage.docdb.mongo-server-url-dir`. + // If that doesn't exist, then we move on to `storage.docdb.mongo-server-url`. + // If that doesn't exist, then we check if `MONGO_SERVER_URL` env var is set. + logger := logging.FromContext(ctx) + mongoEnv := "MONGO_SERVER_URL" + + if cfg.Storage.DocDB.MongoServerURLDir != "" { + logger.Infof("setting %s from storage.docdb.mongo-server-url-dir: %s", mongoEnv, cfg.Storage.DocDB.MongoServerURLDir) + if err := setMongoServerURLFromDir(ctx, cfg.Storage.DocDB.MongoServerURLDir); err != nil { + return err + } + } else if cfg.Storage.DocDB.MongoServerURL != "" { + logger.Infof("setting %s from storage.docdb.mongo-server-url", mongoEnv) + if err := os.Setenv(mongoEnv, cfg.Storage.DocDB.MongoServerURL); err != nil { + return err + } + } + + if _, envExists := os.LookupEnv(mongoEnv); !envExists { + return fmt.Errorf("mongo docstore configured but %s environment variable not set, "+ + "supply one of storage.docdb.mongo-server-url-dir, storage.docdb.mongo-server-url or set %s", mongoEnv, mongoEnv) + } + + return nil +} + +func setMongoServerURLFromDir(ctx context.Context, dir string) error { + logger := logging.FromContext(ctx) + + mongoEnv := "MONGO_SERVER_URL" + + stat, err := os.Stat(dir) + if err != nil { + if os.IsNotExist(err) { + // If directory does not exist, then create it. This is needed for + // the fsnotify watcher. + // fsnotify does not receive events if the path that it's watching + // is created later. + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + } else { + return err + } + } + // If the path exists but is not a directory, then throw an error + if !stat.IsDir() { + return fmt.Errorf("path specified at storage.docdb.mongo-server-url-dir: %s is not a directory", dir) + } + + filePath := path.Join(dir, mongoEnv) + fileData, err := os.ReadFile(filePath) + if err != nil { + return err + } + // A trailing newline is fairly common in mounted files, let's remove it. + fileDataNormalized := strings.TrimSuffix(string(fileData), "\n") + + logger.Infof("setting %s from path: %s", mongoEnv, filePath) + if err = os.Setenv(mongoEnv, fileDataNormalized); err != nil { + return err + } + + return nil +} diff --git a/pkg/chains/storage/storage.go b/pkg/chains/storage/storage.go index be3561c96..fe4e49b00 100644 --- a/pkg/chains/storage/storage.go +++ b/pkg/chains/storage/storage.go @@ -15,6 +15,7 @@ package storage import ( "context" + "errors" "github.com/tektoncd/chains/pkg/chains/objects" "github.com/tektoncd/chains/pkg/chains/storage/docdb" @@ -27,6 +28,7 @@ import ( "github.com/tektoncd/pipeline/pkg/client/clientset/versioned" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" + "knative.dev/pkg/logging" ) // Backend is an interface to store a chains Payload @@ -42,6 +44,8 @@ type Backend interface { // InitializeBackends creates and initializes every configured storage backend. func InitializeBackends(ctx context.Context, ps versioned.Interface, kc kubernetes.Interface, cfg config.Config) (map[string]Backend, error) { + logger := logging.FromContext(ctx) + // Add an entry here for every configured backend configuredBackends := []string{} if cfg.Artifacts.TaskRuns.Enabled() { @@ -53,6 +57,7 @@ func InitializeBackends(ctx context.Context, ps versioned.Interface, kc kubernet if cfg.Artifacts.PipelineRuns.Enabled() { configuredBackends = append(configuredBackends, sets.List[string](cfg.Artifacts.PipelineRuns.StorageBackend)...) } + logger.Infof("configured backends from config: %v", configuredBackends) // Now only initialize and return the configured ones. backends := map[string]Backend{} @@ -90,5 +95,49 @@ func InitializeBackends(ctx context.Context, ps versioned.Interface, kc kubernet } } + + backendsList := []string{} + for name := range backends { + backendsList = append(backendsList, name) + } + logger.Infof("successfully initialized backends: %v", backendsList) return backends, nil } + +// WatchBackends watches backends for any update and keeps them up to date. +func WatchBackends(ctx context.Context, watcherStop chan bool, backends map[string]Backend, cfg config.Config) error { + logger := logging.FromContext(ctx) + for backend := range backends { + switch backend { + case docdb.StorageTypeDocDB: + backendChan, err := docdb.WatchBackend(ctx, cfg, watcherStop) + if err != nil { + if errors.Is(err, docdb.ErrNothingToWatch) { + logger.Info(err) + continue + } + return err + } + go func() { + for { + select { + case newBackend := <-backendChan: + if newBackend == nil { + logger.Errorf("removing backend %s from backends", docdb.StorageTypeDocDB) + delete(backends, docdb.StorageTypeDocDB) + continue + } + logger.Infof("adding to backends: %s", docdb.StorageTypeDocDB) + backends[docdb.StorageTypeDocDB] = newBackend + case <-watcherStop: + logger.Info("stop watching backends...") + return + } + } + }() + default: + logger.Debugf("no backends to watch...") + } + } + return nil +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 1d3cb3fbc..a5eef510d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -122,7 +122,9 @@ type TektonStorageConfig struct { } type DocDBStorageConfig struct { - URL string + URL string + MongoServerURL string + MongoServerURLDir string } type GrafeasConfig struct { @@ -165,13 +167,16 @@ const ( ociStorageKey = "artifacts.oci.storage" ociSignerKey = "artifacts.oci.signer" - gcsBucketKey = "storage.gcs.bucket" - ociRepositoryKey = "storage.oci.repository" - ociRepositoryInsecureKey = "storage.oci.repository.insecure" - docDBUrlKey = "storage.docdb.url" - grafeasProjectIDKey = "storage.grafeas.projectid" - grafeasNoteIDKey = "storage.grafeas.noteid" - grafeasNoteHint = "storage.grafeas.notehint" + gcsBucketKey = "storage.gcs.bucket" + ociRepositoryKey = "storage.oci.repository" + ociRepositoryInsecureKey = "storage.oci.repository.insecure" + docDBUrlKey = "storage.docdb.url" + docDBMongoServerURLKey = "storage.docdb.mongo-server-url" + docDBMongoServerURLDirKey = "storage.docdb.mongo-server-url-dir" + + grafeasProjectIDKey = "storage.grafeas.projectid" + grafeasNoteIDKey = "storage.grafeas.noteid" + grafeasNoteHint = "storage.grafeas.notehint" // PubSub - General pubsubProvider = "storage.pubsub.provider" @@ -293,6 +298,8 @@ func NewConfigFromMap(data map[string]string) (*Config, error) { asString(ociRepositoryKey, &cfg.Storage.OCI.Repository), asBool(ociRepositoryInsecureKey, &cfg.Storage.OCI.Insecure), asString(docDBUrlKey, &cfg.Storage.DocDB.URL), + asString(docDBMongoServerURLKey, &cfg.Storage.DocDB.MongoServerURL), + asString(docDBMongoServerURLDirKey, &cfg.Storage.DocDB.MongoServerURLDir), asString(grafeasProjectIDKey, &cfg.Storage.Grafeas.ProjectID), asString(grafeasNoteIDKey, &cfg.Storage.Grafeas.NoteID), asString(grafeasNoteHint, &cfg.Storage.Grafeas.NoteHint), diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index 21beabbc4..e382d2f26 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -53,8 +53,18 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl Pipelineclientset: pipelineClient, TaskRunLister: taskRunInformer.Lister(), } + impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { + watcherStop := make(chan bool) + cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) { + select { + case watcherStop <- true: + logger.Info("sent close event to fsnotify...") + default: + logger.Info("could not send close event to fsnotify...") + } + // get updated config cfg := *value.(*config.Config) @@ -64,6 +74,10 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl logger.Error(err) } psSigner.Backends = backends + + if err := storage.WatchBackends(ctx, watcherStop, psSigner.Backends, cfg); err != nil { + logger.Error(err) + } }) // setup watches for the config names provided by client diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 8695dcfdb..1991f334d 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -49,7 +49,16 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl Pipelineclientset: pipelineClient, } impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { + watcherStop := make(chan bool) + cfgStore := config.NewConfigStore(logger, func(name string, value interface{}) { + select { + case watcherStop <- true: + logger.Info("sent close event to fsnotify") + default: + logger.Info("could not send close event to fsnotify") + } + // get updated config cfg := *value.(*config.Config) @@ -59,6 +68,10 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl logger.Error(err) } tsSigner.Backends = backends + + if err := storage.WatchBackends(ctx, watcherStop, tsSigner.Backends, cfg); err != nil { + logger.Error(err) + } }) // setup watches for the config names provided by client