Skip to content

Commit

Permalink
Gradual rollout unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
p-strusiewiczsurmacki-mobica committed May 8, 2024
1 parent af16ed7 commit 59e3de7
Show file tree
Hide file tree
Showing 14 changed files with 1,438 additions and 84 deletions.
13 changes: 9 additions & 4 deletions cmd/configurator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,17 @@ func (e *onLeaderElectionEvent) Start(ctx context.Context) error {

watchNodesErr := make(chan error)
watchConfigsErr := make(chan error)
go e.cm.WatchDeletedNodes(ctx, watchNodesErr)
go e.cm.WatchConfigs(ctx, watchConfigsErr)
leCtx, cancel := context.WithCancel(ctx)
defer cancel()
go e.cm.WatchDeletedNodes(leCtx, watchNodesErr)
go e.cm.WatchConfigs(leCtx, watchConfigsErr)

select {
case <-ctx.Done():
return fmt.Errorf("onLeaderElection context error: %w", ctx.Err())
case <-leCtx.Done():
if err := leCtx.Err(); err != nil {
return fmt.Errorf("onLeaderElection context error: %w", leCtx.Err())
}
return nil
case err := <-watchNodesErr:
return fmt.Errorf("node watcher error: %w", err)
case err := <-watchConfigsErr:
Expand Down
101 changes: 54 additions & 47 deletions pkg/config_manager/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (

type ConfigManager struct {
client client.Client
configsMap *configmap.ConfigMap
configsMap configmap.Interface
cr reconciler.ConfigReconcilerInterface
nr reconciler.NodeReconcilerInterface
changes chan bool
Expand Down Expand Up @@ -57,15 +57,18 @@ func New(c client.Client, cr reconciler.ConfigReconcilerInterface, nr reconciler
}
}

// WatchConfigs waits for cm.deletedNodes channel.
func (cm *ConfigManager) WatchDeletedNodes(ctx context.Context, errCh chan error) {
cm.logger.Info("starting watching for deleted nodes...")
for {
select {
case <-ctx.Done():
if !errors.Is(ctx.Err(), context.Canceled) {
errCh <- fmt.Errorf("error watching deleted nodes: %w", ctx.Err())
errCh <- fmt.Errorf("error watching configs: %w", ctx.Err())
} else {
errCh <- nil
}
errCh <- nil
return
case nodes := <-cm.deletedNodes:
cm.logger.Info("nodes deleted", "nodes", nodes)
for _, n := range nodes {
Expand All @@ -84,7 +87,7 @@ func (cm *ConfigManager) WatchDeletedNodes(ctx context.Context, errCh chan error
config.SetActive(false)
cancel := config.GetCancelFunc()
if cancel != nil {
cancel()
(*cancel)()
}
}
default:
Expand All @@ -93,24 +96,28 @@ func (cm *ConfigManager) WatchDeletedNodes(ctx context.Context, errCh chan error
}
}

// WatchConfigs waits for cm.changes channel.
func (cm *ConfigManager) WatchConfigs(ctx context.Context, errCh chan error) {
cm.logger.Info("starting watching for changes...")
for {
select {
case <-ctx.Done():
if !errors.Is(ctx.Err(), context.Canceled) {
errCh <- fmt.Errorf("error watching configs: %w", ctx.Err())
} else {
errCh <- nil
}
errCh <- nil
return
case <-cm.changes:
cm.logger.Info("got notification about changes")
err := cm.UpdateConfigs()
err := cm.updateConfigs()
if err != nil {
errCh <- fmt.Errorf("error updating configs: %w", err)
return
}
err = cm.DeployConfigs(ctx)
err = cm.deployConfigs(ctx)
if err != nil {
if err := cm.RestoreBackup(ctx); err != nil {
if err := cm.restoreBackup(ctx); err != nil {
cm.logger.Error(err, "error restoring backup")
}
}
Expand All @@ -120,7 +127,35 @@ func (cm *ConfigManager) WatchConfigs(ctx context.Context, errCh chan error) {
}
}

func (cm *ConfigManager) UpdateConfigs() error {
// DirtyStartup will load all previously deployed NodeConfigs into current leader.
func (cm *ConfigManager) DirtyStartup(ctx context.Context) error {
process, err := cm.getProcess(ctx)
if err != nil {
// process object does not exists - there was no operator running on this cluster before
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error getting process object: %w", err)
}

cm.logger.Info("previous leader left cluster in state", "state", process.Spec.State)
cm.logger.Info("using data left by previous leader...")

// get all known backup data and load it into config manager memory
if err := cm.loadConfigs(ctx); err != nil {
return fmt.Errorf("error loading configs: %w", err)
}

// prevouos leader left cluster in provisioning state - restore known backups
if process.Spec.State == nodeconfig.StatusProvisioning {
if err := cm.restoreBackup(ctx); err != nil {
return fmt.Errorf("error restoring backup: %w", err)
}
}
return nil
}

func (cm *ConfigManager) updateConfigs() error {
cm.logger.Info("updating configs...")
currentNodes := cm.nr.GetNodes()
for name := range currentNodes {
Expand All @@ -144,7 +179,7 @@ func (cm *ConfigManager) UpdateConfigs() error {
return nil
}

func (cm *ConfigManager) Deploy(ctx context.Context, configs []*nodeconfig.Config) error {
func (cm *ConfigManager) deploy(ctx context.Context, configs []nodeconfig.ConfigInterface) error {
for _, cfg := range configs {
cfg.SetDeployed(false)
}
Expand All @@ -164,7 +199,7 @@ func (cm *ConfigManager) Deploy(ctx context.Context, configs []*nodeconfig.Confi
errCh := make(chan error, len(configs))
for _, cfg := range configs {
wg.Add(1)
go func(config *nodeconfig.Config) {
go func(config nodeconfig.ConfigInterface) {
defer wg.Done()

if err := cm.sem.Acquire(ctx, 1); err != nil {
Expand Down Expand Up @@ -225,11 +260,11 @@ func (cm *ConfigManager) checkErrors(errCh chan error) error {
}

// nolint: contextcheck
func (cm *ConfigManager) deployConfig(ctx context.Context, cfg *nodeconfig.Config) error {
func (cm *ConfigManager) deployConfig(ctx context.Context, cfg nodeconfig.ConfigInterface) error {
if cfg.GetActive() {
cfgContext, cfgCancel := context.WithTimeout(ctx, cm.timeout)
cfgContext = context.WithValue(cfgContext, nodeconfig.ParentCtx, ctx)
cfg.SetCancelFunc(cfgCancel)
cfg.SetCancelFunc(&cfgCancel)

cm.logger.Info("processing config", "name", cfg.GetName())
if err := cfg.Deploy(cfgContext, cm.client, cm.logger, cm.timeout); err != nil {
Expand All @@ -247,7 +282,7 @@ func (cm *ConfigManager) deployConfig(ctx context.Context, cfg *nodeconfig.Confi
return nil
}

func (cm *ConfigManager) validateConfigs(configs []*nodeconfig.Config) error {
func (cm *ConfigManager) validateConfigs(configs []nodeconfig.ConfigInterface) error {
cm.logger.Info("validating configs...")
for _, cfg := range configs {
if !cfg.GetActive() {
Expand Down Expand Up @@ -284,27 +319,27 @@ func (cm *ConfigManager) setProcessStatus(ctx context.Context, status string) er
return nil
}

func (cm *ConfigManager) DeployConfigs(ctx context.Context) error {
func (cm *ConfigManager) deployConfigs(ctx context.Context) error {
cm.logger.Info("deploying configs ...")
toDeploy, err := cm.configsMap.GetSlice()
if err != nil {
return fmt.Errorf("error converting config map to slice: %w", err)
}

if err := cm.Deploy(ctx, toDeploy); err != nil {
if err := cm.deploy(ctx, toDeploy); err != nil {
return fmt.Errorf("error deploying configs: %w", err)
}

return nil
}

func (cm *ConfigManager) RestoreBackup(ctx context.Context) error {
func (cm *ConfigManager) restoreBackup(ctx context.Context) error {
cm.logger.Info("restoring backup...")
slice, err := cm.configsMap.GetSlice()
if err != nil {
return fmt.Errorf("error converting config map to slice: %w", err)
}
toDeploy := []*nodeconfig.Config{}
toDeploy := []nodeconfig.ConfigInterface{}
for _, cfg := range slice {
if cfg.GetDeployed() {
if backupAvailable := cfg.SetBackupAsNext(); backupAvailable {
Expand All @@ -313,7 +348,7 @@ func (cm *ConfigManager) RestoreBackup(ctx context.Context) error {
}
}

if err := cm.Deploy(ctx, toDeploy); err != nil {
if err := cm.deploy(ctx, toDeploy); err != nil {
return fmt.Errorf("error deploying configs: %w", err)
}

Expand All @@ -333,34 +368,6 @@ func (cm *ConfigManager) getProcess(ctx context.Context) (*v1alpha1.NodeConfigPr
return process, nil
}

// DirtyStartup will load all previously deployed NodeConfigs into current leader.
func (cm *ConfigManager) DirtyStartup(ctx context.Context) error {
process, err := cm.getProcess(ctx)
if err != nil {
// process object does not exists - there was no operator running on this cluster before
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error getting process object: %w", err)
}

cm.logger.Info("previous leader left cluster in state", "state", process.Spec.State)
cm.logger.Info("using data left by previous leader...")

// get all known backup data and load it into config manager memory
if err := cm.loadConfigs(ctx); err != nil {
return fmt.Errorf("error loading configs: %w", err)
}

// prevouos leader left cluster in provisioning state - restore known backups
if process.Spec.State == nodeconfig.StatusProvisioning {
if err := cm.RestoreBackup(ctx); err != nil {
return fmt.Errorf("error restoring backup: %w", err)
}
}
return nil
}

func (cm *ConfigManager) loadConfigs(ctx context.Context) error {
// get all known backup data and load it into config manager memory
nodes, err := reconciler.ListNodes(ctx, cm.client)
Expand Down

0 comments on commit 59e3de7

Please sign in to comment.