Skip to content
This repository has been archived by the owner on Dec 26, 2023. It is now read-only.

Commit

Permalink
fix: restart spooler when broker terminates subscription (#600)
Browse files Browse the repository at this point in the history
Fixes #577 (again). The original fix, #593, left out the agent spooler
system, which is responsible for relaying run events to the agent. So
when its subscription was terminated, it didn't restart itself.

This PR changes that: the spooler will now restart itself whenever its
subscription is terminated.
  • Loading branch information
leg100 committed Sep 13, 2023
1 parent cc057e8 commit ce41580
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 60 deletions.
6 changes: 3 additions & 3 deletions internal/agent/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func newSpooler(app client, logger logr.Logger, cfg Config) *spoolerDaemon {
}
}

// start starts the spooler
// start the spooler
func (s *spoolerDaemon) start(ctx context.Context) error {
op := func() error {
return s.reinitialize(ctx)
}
policy := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
return backoff.RetryNotify(op, policy, func(err error, next time.Duration) {
s.Error(fmt.Errorf("%w: reconnecting in %s", err, next), "stream update")
s.Error(err, "restarting spooler", "backoff", next)
})
}

Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *spoolerDaemon) reinitialize(ctx context.Context) error {
return err
}
}
return nil
return pubsub.ErrSubscriptionTerminated
}

func (s *spoolerDaemon) handleEvent(ev pubsub.Event) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/agent/spooler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestSpooler(t *testing.T) {
assert.Equal(t, cancelation{Run: run4, Forceful: false}, <-spooler.getCancelation())
assert.Equal(t, cancelation{Run: run5, Forceful: true}, <-spooler.getCancelation())
cancel()
assert.NoError(t, <-errch)
assert.Equal(t, pubsub.ErrSubscriptionTerminated, <-errch)
}

func TestSpooler_handleEvent(t *testing.T) {
Expand Down
52 changes: 23 additions & 29 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,24 +371,21 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {

subsystems := []*Subsystem{
{
Name: "broker",
BackoffRestart: true,
Logger: d.Logger,
System: d.Broker,
Name: "broker",
Logger: d.Logger,
System: d.Broker,
},
{
Name: "proxy",
BackoffRestart: true,
Logger: d.Logger,
System: d.LogsService,
Name: "proxy",
Logger: d.Logger,
System: d.LogsService,
},
{
Name: "reporter",
BackoffRestart: true,
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(run.ReporterLockID),
Name: "reporter",
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(run.ReporterLockID),
System: &run.Reporter{
Logger: d.Logger.WithValues("component", "reporter"),
VCSProviderService: d.VCSProviderService,
Expand All @@ -399,9 +396,8 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
},
},
{
Name: "webhook purger",
BackoffRestart: true,
Logger: d.Logger,
Name: "webhook purger",
Logger: d.Logger,
System: &repo.Purger{
Logger: d.Logger.WithValues("component", "purger"),
Subscriber: d.Broker,
Expand All @@ -410,12 +406,11 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
},
},
{
Name: "notifier",
BackoffRestart: true,
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(notifications.LockID),
Name: "notifier",
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(notifications.LockID),
System: notifications.NewNotifier(notifications.NotifierOptions{
Logger: d.Logger,
Subscriber: d.Broker,
Expand All @@ -427,12 +422,11 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
}
if !d.DisableScheduler {
subsystems = append(subsystems, &Subsystem{
Name: "scheduler",
BackoffRestart: true,
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(scheduler.LockID),
Name: "scheduler",
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(scheduler.LockID),
System: scheduler.NewScheduler(scheduler.Options{
Logger: d.Logger,
WorkspaceService: d.WorkspaceService,
Expand Down
31 changes: 13 additions & 18 deletions internal/daemon/subsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ type (
Name string
// System is the underlying system to be invoked and supervised.
System Startable
// Backoff and restart subsystem in the event of an error
BackoffRestart bool
// Exclusive: permit only one instance of this subsystem on an OTF
// cluster
Exclusive bool
Expand Down Expand Up @@ -64,25 +62,22 @@ func (s *Subsystem) Start(ctx context.Context, g *errgroup.Group) error {
} else {
err = s.System.Start(ctx)
}
if err != nil {
return err
if ctx.Err() != nil {
// don't return an error if subsystem was terminated via a
// canceled context.
s.V(1).Info("gracefully shutdown subsystem", "name", s.Name)
return nil
}
s.V(1).Info("gracefully shutdown subsystem", "name", s.Name)
return nil
return err
}
if s.BackoffRestart {
// Backoff and retry whenever operation returns an error. If context is
// cancelled then it'll stop retrying and return the context error.
policy := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
g.Go(func() error {
return backoff.RetryNotify(op, policy, func(err error, next time.Duration) {
// re-open semaphore
s.Error(err, "restarting "+s.Name)
})
// Backoff and retry whenever operation returns an error. If context is
// cancelled then it'll stop retrying and return the context error.
policy := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
g.Go(func() error {
return backoff.RetryNotify(op, policy, func(err error, next time.Duration) {
s.Error(err, "restarting subsystem", "name", s.Name, "backoff", next)
})
} else {
g.Go(op)
}
})
s.V(1).Info("started subsystem", "name", s.Name)
return nil
}
15 changes: 6 additions & 9 deletions internal/daemon/subsystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,18 @@ func TestSubsystem(t *testing.T) {

tests := []struct {
name string
backoff bool
exclusive bool
}{
{"default", false, false},
{"backoff", true, false},
{"backoff and wait and lock", true, true},
{"backoff", false},
{"backoff and wait and lock", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sub := &Subsystem{
Name: tt.name,
System: &fakeStartable{},
Logger: logr.Discard(),
BackoffRestart: tt.backoff,
Exclusive: tt.exclusive,
Name: tt.name,
System: &fakeStartable{},
Logger: logr.Discard(),
Exclusive: tt.exclusive,
}
if tt.exclusive {
sub.DB = &fakeWaitAndLock{}
Expand Down

0 comments on commit ce41580

Please sign in to comment.