Skip to content

Commit

Permalink
Add notifications if a new profile version is available (#1317)
Browse files Browse the repository at this point in the history
* Add notifications if a new profile version is available

* Added tests for Delete operations and added more tests around the notification handling.

* Change default to fully qualified domain

* GetAvailableVersions => ListAvailableVersion

* Small refactors around logging and repeated code segments

* Finally, the notification is woorking

* Merged main

* Simplified the calling parameter list'

* Addressed comments
  • Loading branch information
Skarlso committed Jan 31, 2022
1 parent 9c7c3e3 commit 41e6457
Show file tree
Hide file tree
Showing 9 changed files with 714 additions and 148 deletions.
20 changes: 11 additions & 9 deletions cmd/gitops-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ func main() {
}

const (
addr = "0.0.0.0:8000"
metricsBindAddress = ":9980"
healthzBindAddress = ":9981"
watcherPort = 9443
addr = "0.0.0.0:8000"
metricsBindAddress = ":9980"
healthzBindAddress = ":9981"
notificationBindAddress = "http://notification-controller./"
watcherPort = 9443
)

func NewAPIServerCommand() *cobra.Command {
Expand Down Expand Up @@ -71,11 +72,12 @@ func NewAPIServerCommand() *cobra.Command {
}

profileWatcher, err := watcher.NewWatcher(watcher.Options{
KubeClient: rawClient,
Cache: profileCache,
MetricsBindAddress: metricsBindAddress,
HealthzBindAddress: healthzBindAddress,
WatcherPort: watcherPort,
KubeClient: rawClient,
Cache: profileCache,
MetricsBindAddress: metricsBindAddress,
HealthzBindAddress: healthzBindAddress,
NotificationControllerAddress: notificationBindAddress,
WatcherPort: watcherPort,
})
if err != nil {
return fmt.Errorf("failed to create watcher: %w", err)
Expand Down
33 changes: 18 additions & 15 deletions cmd/gitops/ui/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ import (

// Options contains all the options for the `ui run` command.
type Options struct {
Port string
HelmRepoNamespace string
HelmRepoName string
ProfileCacheLocation string
WatcherMetricsBindAddress string
WatcherHealthzBindAddress string
WatcherPort int
Path string
LoggingEnabled bool
OIDC OIDCAuthenticationOptions
Port string
HelmRepoNamespace string
HelmRepoName string
ProfileCacheLocation string
WatcherMetricsBindAddress string
WatcherHealthzBindAddress string
WatcherPort int
Path string
LoggingEnabled bool
OIDC OIDCAuthenticationOptions
NotificationControllerAddress string
}

// OIDCAuthenticationOptions contains the OIDC authentication options for the
Expand Down Expand Up @@ -74,6 +75,7 @@ func NewCommand() *cobra.Command {
cmd.Flags().StringVar(&options.ProfileCacheLocation, "profile-cache-location", "/tmp/helm-cache", "the location where the cache Profile data lives")
cmd.Flags().StringVar(&options.WatcherHealthzBindAddress, "watcher-healthz-bind-address", ":9981", "bind address for the healthz service of the watcher")
cmd.Flags().StringVar(&options.WatcherMetricsBindAddress, "watcher-metrics-bind-address", ":9980", "bind address for the metrics service of the watcher")
cmd.Flags().StringVar(&options.NotificationControllerAddress, "notification-controller-address", "http://notification-controller./", "the address of the notification-controller running in the cluster")
cmd.Flags().IntVar(&options.WatcherPort, "watcher-port", 9443, "the port on which the watcher is running")

if server.AuthEnabled() {
Expand Down Expand Up @@ -151,11 +153,12 @@ func runCmd(cmd *cobra.Command, args []string) error {
}

profileWatcher, err := watcher.NewWatcher(watcher.Options{
KubeClient: rawClient,
Cache: profileCache,
MetricsBindAddress: options.WatcherMetricsBindAddress,
HealthzBindAddress: options.WatcherHealthzBindAddress,
WatcherPort: options.WatcherPort,
KubeClient: rawClient,
Cache: profileCache,
MetricsBindAddress: options.WatcherMetricsBindAddress,
HealthzBindAddress: options.WatcherHealthzBindAddress,
NotificationControllerAddress: options.NotificationControllerAddress,
WatcherPort: options.WatcherPort,
})
if err != nil {
return fmt.Errorf("failed to start the watcher: %w", err)
Expand Down
56 changes: 50 additions & 6 deletions pkg/helm/watcher/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Cache interface {
// GetProfileValues will try and find a specific values file for the given profileName and profileVersion. Returns an
// error if said version is not found.
GetProfileValues(ctx context.Context, helmRepoNamespace, helmRepoName, profileName, profileVersion string) ([]byte, error)
// ListAvailableVersionsForProfile returns all stored available versions for a profile.
ListAvailableVersionsForProfile(ctx context.Context, helmRepoNamespace, helmRepoName, profileName string) ([]string, error)
}

// Data is explicit data for a specific profile including values.
Expand Down Expand Up @@ -139,19 +141,47 @@ func (c *ProfileCache) ListProfiles(ctx context.Context, helmRepoNamespace, helm
var result []*pb.Profile

listOperation := func() error {
content, err := os.ReadFile(filepath.Join(c.cacheLocation, helmRepoNamespace, helmRepoName, profileFilename))
if err != nil {
if err := c.getProfilesFromFile(helmRepoNamespace, helmRepoName, &result); err != nil {
return fmt.Errorf("failed to read profiles data for helm repo (%s/%s): %w", helmRepoNamespace, helmRepoName, err)
}

return nil
}

if err := c.tryWithLock(ctx, listOperation); err != nil {
return nil, err
}

return result, nil
}

// ListAvailableVersionsForProfile returns all stored available versions for a profile.
func (c *ProfileCache) ListAvailableVersionsForProfile(ctx context.Context, helmRepoNamespace, helmRepoName, profileName string) ([]string, error) {
// Because the folders of versions are only stored when there are values for a version
// we, instead, look in the profiles.yaml file for available versions.
var result []string

getAllAvailableVersionsOp := func() error {
var profiles []*pb.Profile
if err := c.getProfilesFromFile(helmRepoNamespace, helmRepoName, &profiles); err != nil {
if os.IsNotExist(err) {
return nil
}

return fmt.Errorf("failed to read profiles data for helm repo: %w", err)
}

if err := yaml.Unmarshal(content, &result); err != nil {
return fmt.Errorf("failed to unmarshal profiles data: %w", err)
for _, p := range profiles {
if p.Name == profileName {
result = append(result, p.AvailableVersions...)
return nil
}
}

return nil
return fmt.Errorf("profile with name %s not found in cached profiles", profileName)
}

if err := c.tryWithLock(ctx, listOperation); err != nil {
if err := c.tryWithLock(ctx, getAllAvailableVersionsOp); err != nil {
return nil, err
}

Expand Down Expand Up @@ -183,6 +213,20 @@ func (c *ProfileCache) GetProfileValues(ctx context.Context, helmRepoNamespace,
return result, nil
}

// getProfilesFromFile returns profiles loaded from a file.
func (c *ProfileCache) getProfilesFromFile(helmRepoNamespace, helmRepoName string, profiles *[]*pb.Profile) error {
content, err := os.ReadFile(filepath.Join(c.cacheLocation, helmRepoNamespace, helmRepoName, profileFilename))
if err != nil {
return err
}

if err := yaml.Unmarshal(content, profiles); err != nil {
return err
}

return nil
}

// tryWithLock tries to run the given operation by acquiring a lock first.
func (c *ProfileCache) tryWithLock(ctx context.Context, operationFunc func() error) error {
lock := flock.New(filepath.Join(c.cacheLocation, lockFilename))
Expand Down
64 changes: 62 additions & 2 deletions pkg/helm/watcher/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ func TestCacheListProfilesNotFound(t *testing.T) {
}
assert.NoError(t, profileCache.Put(context.Background(), helmNamespace, helmName, data), "put call from cache should have worked")
_, err := profileCache.ListProfiles(context.Background(), "not-found", "none")
assert.EqualError(t, err, fmt.Sprintf("failed to read profiles data for helm repo: open %s: no such file or directory", filepath.Join(dir, "not-found", "none", profileFilename)))
assert.EqualError(t, err,
fmt.Sprintf("failed to read profiles data for helm repo (%s/%s): open %s: no such file or directory",
"not-found",
"none",
filepath.Join(dir, "not-found", "none", profileFilename)))
}

func TestCacheListProfilesInvalidDataInFile(t *testing.T) {
Expand All @@ -74,7 +78,9 @@ func TestCacheListProfilesInvalidDataInFile(t *testing.T) {
assert.NoError(t, profileCache.Put(context.Background(), helmNamespace, helmName, data), "put call from cache should have worked")
assert.NoError(t, os.WriteFile(filepath.Join(dir, helmNamespace, helmName, profileFilename), []byte("empty"), 0700))
_, err := profileCache.ListProfiles(context.Background(), helmNamespace, helmName)
assert.EqualError(t, err, "failed to unmarshal profiles data: error unmarshaling JSON: json: cannot unmarshal string into Go value of type []*profiles.Profile")
assert.EqualError(t, err,
fmt.Sprintf("failed to read profiles data for helm repo (%s/%s): "+
"error unmarshaling JSON: json: cannot unmarshal string into Go value of type []*profiles.Profile", helmNamespace, helmName))
}

func TestCacheGetProfileValues(t *testing.T) {
Expand Down Expand Up @@ -125,6 +131,54 @@ func TestDeleteExistingData(t *testing.T) {
assert.ErrorIs(t, err, os.ErrNotExist)
}

func TestListAvailableVersionsForProfile(t *testing.T) {
profileCache, _ := setupCache(t)
data := Data{
Profiles: []*pb.Profile{profile1},
Values: ValueMap{
profile1.Name: values1,
},
}
assert.NoError(t, profileCache.Put(context.Background(), helmNamespace, helmName, data), "put call from cache should have worked")
versions, err := profileCache.ListAvailableVersionsForProfile(context.Background(), helmNamespace, helmName, profile1.Name)
assert.NoError(t, err)
assert.Equal(t, profile1.AvailableVersions, versions)
}

func TestListAvailableVersionsForProfileNoCachedData(t *testing.T) {
profileCache, _ := setupCache(t)
versions, err := profileCache.ListAvailableVersionsForProfile(context.Background(), helmNamespace, helmName, profile1.Name)
assert.NoError(t, err)
assert.Nil(t, versions)
}

func TestListAvailableVersionsForProfileNameNotFound(t *testing.T) {
profileCache, _ := setupCache(t)
data := Data{
Profiles: []*pb.Profile{profile1},
Values: ValueMap{
profile1.Name: values1,
},
}
assert.NoError(t, profileCache.Put(context.Background(), helmNamespace, helmName, data), "put call from cache should have worked")
_, err := profileCache.ListAvailableVersionsForProfile(context.Background(), helmNamespace, helmName, "notfound")
assert.EqualError(t, err, "profile with name notfound not found in cached profiles")
}

func TestListAvailableVersionsForProfileInvalidYamlData(t *testing.T) {
profileCache, dir := setupCache(t)
data := Data{
Profiles: []*pb.Profile{profile1},
Values: ValueMap{
profile1.Name: values1,
},
}
assert.NoError(t, profileCache.Put(context.Background(), helmNamespace, helmName, data), "put call from cache should have worked")
assert.NoError(t, os.WriteFile(filepath.Join(dir, helmNamespace, helmName, profileFilename), []byte("empty"), 0700))
_, err := profileCache.ListAvailableVersionsForProfile(context.Background(), helmNamespace, helmName, profile1.Name)
assert.EqualError(t, err, "failed to read profiles data for helm repo: error unmarshaling JSON: json: cannot unmarshal string into Go value of type []*profiles.Profile")
}

func TestListProfilesFailedLock(t *testing.T) {
profileCache := &ProfileCache{cacheLocation: "nope"}
_, err := profileCache.ListProfiles(context.Background(), "", "")
Expand All @@ -149,6 +203,12 @@ func TestDeleteFailedLock(t *testing.T) {
assert.EqualError(t, err, "unable to read lock file cache.lock: open nope/cache.lock: no such file or directory")
}

func TestTestListAvailableVersionsForProfileFailedLock(t *testing.T) {
profileCache := &ProfileCache{cacheLocation: "nope"}
_, err := profileCache.ListAvailableVersionsForProfile(context.Background(), "", "", "")
assert.EqualError(t, err, "unable to read lock file cache.lock: open nope/cache.lock: no such file or directory")
}

func setupCache(t *testing.T) (Cache, string) {
dir, err := os.MkdirTemp("", "cache-temp-dir")
assert.NoError(t, err, "creating a temporary folder should have succeeded")
Expand Down
85 changes: 85 additions & 0 deletions pkg/helm/watcher/cache/cachefakes/fake_cache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 41e6457

Please sign in to comment.