Skip to content

Commit

Permalink
Allow supplying MONGO_SERVER_URL via chains-config
Browse files Browse the repository at this point in the history
Currently, when using the Mongo docstore for docdb storage backend, the
only way to supply MONGO_SERVER_URL environment variable (which contains
the credentials to connect to MongoDB) is by adding an environment
variable to the Chains controller pod. It's a farily common practice to
update the MONGO_SERVER_URL at regular intervals when the credentials
are rotated.

To facilitate this, this commit adds 2 fields to Chains' configuration:
1. storage.docdb.mongo-server-url
2. storage.docdb.mongo-server-url-dir

`storage.docdb.mongo-server-url` simply allows supplying the value of
MONGO_SERVER_URL as a field. When this field is updated, the chains
controller pod does not restart, unlike when the MONGO_SERVER_URL
environment variable is updated.

`storage.docdb.mongo-server-url-dir` allows reading MONGO_SERVER_URL
from a file in the specified directory. This allows mounting the value
of MONGO_SERER_URL from a secret or other mechanisms. When the value of
MONGO_SERVER_URL is updated in the path, the new value is automatically
picked up and applied.
  • Loading branch information
concaf committed May 6, 2024
1 parent e46a092 commit 43c7e4a
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 11 deletions.
20 changes: 19 additions & 1 deletion pkg/chains/signing.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,17 @@ func (o *ObjectSigner) Sign(ctx context.Context, tektonObj objects.TektonObject)

// Now store those!
for _, backend := range sets.List[string](signableType.StorageBackend(cfg)) {
b := o.Backends[backend]
logger.Infof("signable storage backends: %v", signableType.StorageBackend(cfg))
logger.Infof("o.Backends(): %v", o.Backends)

b, ok := o.Backends[backend]
if !ok {
backendErr := fmt.Errorf("could not find backend '%s' in configured backends (%v) while trying sign: %s/%s", backend, o.getBackendsList(), tektonObj.GetKindName(), tektonObj.GetName())
logger.Error(backendErr)
merr = multierror.Append(merr, backendErr)
continue
}

storageOpts := config.StorageOpts{
ShortKey: signableType.ShortKey(obj),
FullKey: signableType.FullKey(obj),
Expand Down Expand Up @@ -235,6 +245,14 @@ func (o *ObjectSigner) Sign(ctx context.Context, tektonObj objects.TektonObject)
return nil
}

func (o *ObjectSigner) getBackendsList() []string {
backendList := []string{}
for name := range o.Backends {
backendList = append(backendList, name)
}
return backendList
}

func measureMetrics(ctx context.Context, metrictype string, mtr MetricsRecorder) {
if mtr != nil {
mtr.RecordCountMetrics(ctx, metrictype)
Expand Down
195 changes: 193 additions & 2 deletions pkg/chains/storage/docdb/docdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,30 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
"os"
"path"
"slices"
"strings"

"github.com/fsnotify/fsnotify"

Check failure on line 27 in pkg/chains/storage/docdb/docdb.go

View workflow job for this annotation

GitHub Actions / lint

import 'github.com/fsnotify/fsnotify' is not allowed from list 'Main' (depguard)
"github.com/tektoncd/chains/pkg/chains/objects"
"github.com/tektoncd/chains/pkg/config"
"gocloud.dev/docstore"
_ "gocloud.dev/docstore/awsdynamodb"
_ "gocloud.dev/docstore/gcpfirestore"
"gocloud.dev/docstore/mongodocstore"
_ "gocloud.dev/docstore/mongodocstore"
"knative.dev/pkg/logging"

Check failure on line 35 in pkg/chains/storage/docdb/docdb.go

View workflow job for this annotation

GitHub Actions / lint

import 'knative.dev/pkg/logging' is not allowed from list 'Main' (depguard)
)

const (
StorageTypeDocDB = "docdb"
)

var ErrNothingToWatch = fmt.Errorf("backend has nothing to watch")

Check warning on line 42 in pkg/chains/storage/docdb/docdb.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported var ErrNothingToWatch should have comment or be unexported (revive)

// Backend is a storage backend that stores signed payloads in the TaskRun metadata as an annotation.
// It is stored as base64 encoded JSON.
type Backend struct {
Expand All @@ -47,8 +58,21 @@ type SignedDocument struct {

// NewStorageBackend returns a new Tekton StorageBackend that stores signatures on a TaskRun
func NewStorageBackend(ctx context.Context, cfg config.Config) (*Backend, error) {
url := cfg.Storage.DocDB.URL
coll, err := docstore.OpenCollection(ctx, url)
docdbURL := cfg.Storage.DocDB.URL

u, err := url.Parse(docdbURL)
if err != nil {
return nil, err
}

if u.Scheme == mongodocstore.Scheme {
// MONGO_SERVER_URL can be passed in as an environment variable or via config fields
if err := populateMongoServerURL(ctx, cfg); err != nil {
return nil, err
}
}

coll, err := docstore.OpenCollection(ctx, docdbURL)
if err != nil {
return nil, err
}
Expand All @@ -58,6 +82,106 @@ func NewStorageBackend(ctx context.Context, cfg config.Config) (*Backend, error)
}, nil
}

// WatchBackend returns a channel that receives a new Backend each time it needs to be updated
func WatchBackend(ctx context.Context, cfg config.Config, watcherStop chan bool) (chan *Backend, error) {
logger := logging.FromContext(ctx)
docDBURL := cfg.Storage.DocDB.URL

u, err := url.Parse(docDBURL)
if err != nil {
return nil, err
}

// Set up the watcher only for mongo backends
if u.Scheme != "mongo" {
return nil, ErrNothingToWatch
}

// Set up watcher only when `storage.docdb.mongo-server-url-dir` is set
if cfg.Storage.DocDB.MongoServerURLDir == "" {
return nil, ErrNothingToWatch
}

logger.Infof("setting up fsnotify watcher for directory: %s", cfg.Storage.DocDB.MongoServerURLDir)

watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}

pathsToWatch := []string{
// mongo-server-url-dir/MONGO_SERVER_URL is where the MONGO_SERVER_URL environment
// variable is expected to be mounted, either manually or via a Kubernetes secret, etc.
path.Join(cfg.Storage.DocDB.MongoServerURLDir, "MONGO_SERVER_URL"),
// When a Kubernetes secret is mounted on a path, the `data` in that secret is mounted
// under path/..data that is then `symlink`ed to the key of the data. In this instance,
// the mounted path is going to look like:
// file 1 - ..2024_05_03_11_23_23.1253599725
// file 2 - ..data -> ..2024_05_03_11_23_23.1253599725
// file 3 - MONGO_SERVER_URL -> ..data/MONGO_SERVER_URL
// So each time the secret is updated, the file `MONGO_SERVER_URL` is not updated,
// instead the underlying symlink at `..data` is updated and that's what we want to
// capture via the fsnotify event watcher
path.Join(cfg.Storage.DocDB.MongoServerURLDir, "..data"),
}

backendChan := make(chan *Backend)
// Start listening for events.
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
logger.Infof("received event: %s, path: %s", event.Op.String(), event.Name)
// Only respond to create or write events in the directory
if event.Has(fsnotify.Create) || event.Has(fsnotify.Write) {
// event.Name captures the path that is updated
if slices.Contains(pathsToWatch, event.Name) {
logger.Infof("directory %s has been updated, reconfiguring backend...", cfg.Storage.DocDB.MongoServerURLDir)

// Set the new MONGO_SERVER_URL
if err := setMongoServerURLFromDir(ctx, cfg.Storage.DocDB.MongoServerURLDir); err != nil {
logger.Error(err)
return
}

// Now that MONGO_SERVER_URL has been updated, we should update docdb backend again
newDocDBBackend, err := NewStorageBackend(ctx, cfg)
if err != nil {
logger.Error(err)
backendChan <- nil
} else {
// Storing the backend in the signer so everyone has access to the up-to-date backend
backendChan <- newDocDBBackend
}
}
}

// TODO: add condition for REMOVE event

case err, ok := <-watcher.Errors:
if !ok {
return
}
logger.Error(err)

case <-watcherStop:
logger.Info("stopping fsnotify context...")
return
}
}
}()

// Add a path.
err = watcher.Add(cfg.Storage.DocDB.MongoServerURLDir)
if err != nil {
return nil, err
}
return backendChan, nil
}

// StorePayload implements the Payloader interface.
func (b *Backend) StorePayload(ctx context.Context, _ objects.TektonObject, rawPayload []byte, signature string, opts config.StorageOpts) error {
var obj interface{}
Expand Down Expand Up @@ -125,3 +249,70 @@ func (b *Backend) retrieveDocuments(ctx context.Context, opts config.StorageOpts
}
return []SignedDocument{d}, nil
}

func populateMongoServerURL(ctx context.Context, cfg config.Config) error {
// First preference is given to the key `storage.docdb.mongo-server-url-dir`.
// If that doesn't exist, then we move on to `storage.docdb.mongo-server-url`.
// If that doesn't exist, then we check if `MONGO_SERVER_URL` env var is set.
logger := logging.FromContext(ctx)
mongoEnv := "MONGO_SERVER_URL"

if cfg.Storage.DocDB.MongoServerURLDir != "" {
logger.Infof("setting %s from storage.docdb.mongo-server-url-dir: %s", mongoEnv, cfg.Storage.DocDB.MongoServerURLDir)
if err := setMongoServerURLFromDir(ctx, cfg.Storage.DocDB.MongoServerURLDir); err != nil {
return err
}
} else if cfg.Storage.DocDB.MongoServerURL != "" {
logger.Infof("setting %s from storage.docdb.mongo-server-url", mongoEnv)
if err := os.Setenv(mongoEnv, cfg.Storage.DocDB.MongoServerURL); err != nil {
return err
}
}

if _, envExists := os.LookupEnv(mongoEnv); !envExists {
return fmt.Errorf("mongo docstore configured but %s environment variable not set, "+
"supply one of storage.docdb.mongo-server-url-dir, storage.docdb.mongo-server-url or set %s", mongoEnv, mongoEnv)
}

return nil
}

func setMongoServerURLFromDir(ctx context.Context, dir string) error {
logger := logging.FromContext(ctx)

mongoEnv := "MONGO_SERVER_URL"

stat, err := os.Stat(dir)
if err != nil {
if os.IsNotExist(err) {
// If directory does not exist, then create it. This is needed for
// the fsnotify watcher.
// fsnotify does not receive events if the path that it's watching
// is created later.
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
} else {
return err
}
}
// If the path exists but is not a directory, then throw an error
if !stat.IsDir() {
return fmt.Errorf("path specified at storage.docdb.mongo-server-url-dir: %s is not a directory", dir)
}

filePath := path.Join(dir, mongoEnv)
fileData, err := os.ReadFile(filePath)
if err != nil {
return err
}
// A trailing newline is fairly common in mounted files, let's remove it.
fileDataNormalized := strings.TrimSuffix(string(fileData), "\n")

logger.Infof("setting %s from path: %s", mongoEnv, filePath)
if err = os.Setenv(mongoEnv, fileDataNormalized); err != nil {
return err
}

return nil
}
49 changes: 49 additions & 0 deletions pkg/chains/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package storage

import (
"context"
"errors"

"github.com/tektoncd/chains/pkg/chains/objects"
"github.com/tektoncd/chains/pkg/chains/storage/docdb"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/logging"

Check failure on line 31 in pkg/chains/storage/storage.go

View workflow job for this annotation

GitHub Actions / lint

import 'knative.dev/pkg/logging' is not allowed from list 'Main' (depguard)
)

// Backend is an interface to store a chains Payload
Expand All @@ -42,6 +44,8 @@ type Backend interface {

// InitializeBackends creates and initializes every configured storage backend.
func InitializeBackends(ctx context.Context, ps versioned.Interface, kc kubernetes.Interface, cfg config.Config) (map[string]Backend, error) {
logger := logging.FromContext(ctx)

// Add an entry here for every configured backend
configuredBackends := []string{}
if cfg.Artifacts.TaskRuns.Enabled() {
Expand All @@ -53,6 +57,7 @@ func InitializeBackends(ctx context.Context, ps versioned.Interface, kc kubernet
if cfg.Artifacts.PipelineRuns.Enabled() {
configuredBackends = append(configuredBackends, sets.List[string](cfg.Artifacts.PipelineRuns.StorageBackend)...)
}
logger.Infof("configured backends from config: %v", configuredBackends)

// Now only initialize and return the configured ones.
backends := map[string]Backend{}
Expand Down Expand Up @@ -90,5 +95,49 @@ func InitializeBackends(ctx context.Context, ps versioned.Interface, kc kubernet
}

}

backendsList := []string{}
for name := range backends {
backendsList = append(backendsList, name)
}
logger.Infof("successfully initialized backends: %v", backendsList)
return backends, nil
}

// WatchBackends watches backends for any update and keeps them up to date.
func WatchBackends(ctx context.Context, watcherStop chan bool, backends map[string]Backend, cfg config.Config) error {
logger := logging.FromContext(ctx)
for backend := range backends {
switch backend {
case docdb.StorageTypeDocDB:
backendChan, err := docdb.WatchBackend(ctx, cfg, watcherStop)
if err != nil {
if errors.Is(err, docdb.ErrNothingToWatch) {
logger.Info(err)
continue
}
return err
}
go func() {
for {
select {
case newBackend := <-backendChan:
if newBackend == nil {
logger.Errorf("removing backend %s from backends", docdb.StorageTypeDocDB)
delete(backends, docdb.StorageTypeDocDB)
continue
}
logger.Infof("adding to backends: %s", docdb.StorageTypeDocDB)
backends[docdb.StorageTypeDocDB] = newBackend
case <-watcherStop:
logger.Info("stop watching backends...")
return
}
}
}()
default:
logger.Debugf("no backends to watch...")
}
}
return nil
}
23 changes: 15 additions & 8 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ type TektonStorageConfig struct {
}

type DocDBStorageConfig struct {
URL string
URL string
MongoServerURL string
MongoServerURLDir string
}

type GrafeasConfig struct {
Expand Down Expand Up @@ -165,13 +167,16 @@ const (
ociStorageKey = "artifacts.oci.storage"
ociSignerKey = "artifacts.oci.signer"

gcsBucketKey = "storage.gcs.bucket"
ociRepositoryKey = "storage.oci.repository"
ociRepositoryInsecureKey = "storage.oci.repository.insecure"
docDBUrlKey = "storage.docdb.url"
grafeasProjectIDKey = "storage.grafeas.projectid"
grafeasNoteIDKey = "storage.grafeas.noteid"
grafeasNoteHint = "storage.grafeas.notehint"
gcsBucketKey = "storage.gcs.bucket"
ociRepositoryKey = "storage.oci.repository"
ociRepositoryInsecureKey = "storage.oci.repository.insecure"
docDBUrlKey = "storage.docdb.url"
docDBMongoServerURLKey = "storage.docdb.mongo-server-url"
docDBMongoServerURLDirKey = "storage.docdb.mongo-server-url-dir"

grafeasProjectIDKey = "storage.grafeas.projectid"
grafeasNoteIDKey = "storage.grafeas.noteid"
grafeasNoteHint = "storage.grafeas.notehint"

// PubSub - General
pubsubProvider = "storage.pubsub.provider"
Expand Down Expand Up @@ -293,6 +298,8 @@ func NewConfigFromMap(data map[string]string) (*Config, error) {
asString(ociRepositoryKey, &cfg.Storage.OCI.Repository),
asBool(ociRepositoryInsecureKey, &cfg.Storage.OCI.Insecure),
asString(docDBUrlKey, &cfg.Storage.DocDB.URL),
asString(docDBMongoServerURLKey, &cfg.Storage.DocDB.MongoServerURL),
asString(docDBMongoServerURLDirKey, &cfg.Storage.DocDB.MongoServerURLDir),
asString(grafeasProjectIDKey, &cfg.Storage.Grafeas.ProjectID),
asString(grafeasNoteIDKey, &cfg.Storage.Grafeas.NoteID),
asString(grafeasNoteHint, &cfg.Storage.Grafeas.NoteHint),
Expand Down

0 comments on commit 43c7e4a

Please sign in to comment.