Skip to content

Commit

Permalink
fix:add graceful shutdown to courier handler (#296)
Browse files Browse the repository at this point in the history
Courier would not stop with the provided Background handler.
This changes the methods of Courier so that the graceful package can be
used in the same way as the http endpoints can be used.

Closes #295
  • Loading branch information
Gibheer committed Mar 17, 2020
1 parent 537d496 commit 235d784
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
4 changes: 2 additions & 2 deletions cmd/daemon/serve.go
@@ -1,7 +1,6 @@
package daemon

import (
stdctx "context"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -165,9 +164,10 @@ func sqa(cmd *cobra.Command, d driver.Driver) *metricsx.Service {
func bgTasks(d driver.Driver, wg *sync.WaitGroup, cmd *cobra.Command, args []string) {
defer wg.Done()

if err := d.Registry().Courier().Work(stdctx.Background()); err != nil {
if err := graceful.Graceful(d.Registry().Courier().Work, d.Registry().Courier().Shutdown); err != nil {
d.Logger().WithError(err).Fatalf("Failed to run courier worker.")
}
d.Logger().Println("courier worker was shutdown gracefully")
}

func ServeAll(d driver.Driver) func(cmd *cobra.Command, args []string) {
Expand Down
26 changes: 20 additions & 6 deletions courier/courier.go
Expand Up @@ -27,6 +27,9 @@ type (
dialer *gomail.Dialer
d smtpDependencies
c configuration.Provider
// graceful shutdown handling
ctx context.Context
shutdown context.CancelFunc
}
Provider interface {
Courier() *Courier
Expand All @@ -38,9 +41,12 @@ func NewSMTP(d smtpDependencies, c configuration.Provider) *Courier {
sslSkipVerify, _ := strconv.ParseBool(uri.Query().Get("skip_ssl_verify"))
password, _ := uri.User.Password()
port, _ := strconv.ParseInt(uri.Port(), 10, 64)
ctx, cancel := context.WithCancel(context.Background())
return &Courier{
d: d,
c: c,
d: d,
c: c,
ctx: ctx,
shutdown: cancel,
dialer: &gomail.Dialer{
Host: uri.Hostname(),
Port: int(port),
Expand Down Expand Up @@ -82,20 +88,28 @@ func (m *Courier) QueueEmail(ctx context.Context, t EmailTemplate) (uuid.UUID, e
return message.ID, nil
}

func (m *Courier) Work(ctx context.Context) error {
func (m *Courier) Work() error {
errChan := make(chan error)
defer close(errChan)

go m.watchMessages(ctx, errChan)
go m.watchMessages(m.ctx, errChan)

select {
case <-ctx.Done():
return ctx.Err()
case <-m.ctx.Done():
if m.ctx.Err() == context.Canceled {
return nil
}
return m.ctx.Err()
case err := <-errChan:
return err
}
}

func (m *Courier) Shutdown(ctx context.Context) error {
m.shutdown()
return nil
}

func (m *Courier) watchMessages(ctx context.Context, errChan chan error) {
for {
if err := backoff.Retry(func() error {
Expand Down
2 changes: 1 addition & 1 deletion courier/courier_test.go
Expand Up @@ -104,7 +104,7 @@ func TestSMTP(t *testing.T) {
c := reg.Courier()

go func() {
require.NoError(t, c.Work(context.Background()))
require.NoError(t, c.Work())
}()

t.Run("case=queue messages", func(t *testing.T) {
Expand Down

0 comments on commit 235d784

Please sign in to comment.