From 4ba6aca06c26d298b6e287ffb595ee3f2af42b68 Mon Sep 17 00:00:00 2001 From: Hank Donnay Date: Thu, 3 Feb 2022 10:58:31 -0600 Subject: [PATCH] notifier: use external concurrency in Delivery, Poller, Processor This commit changes Delivery, Poller, and Processor to require the caller to supply goroutines to drive the methods. Giving the caller control over the goroutine makes it possible to use shared resources (e.g. database pool, lock pool) and ensure they're cleaned up correctly. Signed-off-by: Hank Donnay --- notifier/delivery.go | 27 ++++----------------------- notifier/poller.go | 26 ++++++++------------------ notifier/processor.go | 29 +++++------------------------ notifier/processor_safe_test.go | 6 +++--- notifier/service/service.go | 20 ++++++++++---------- 5 files changed, 30 insertions(+), 78 deletions(-) diff --git a/notifier/delivery.go b/notifier/delivery.go index 7db253000e..e53e1b578e 100644 --- a/notifier/delivery.go +++ b/notifier/delivery.go @@ -3,7 +3,6 @@ package notifier import ( "context" "errors" - "strconv" "time" "github.com/google/uuid" @@ -17,51 +16,34 @@ import ( type Delivery struct { // a Deliverer implementation to invoke. Deliverer Deliverer - // the interval at which we will attempt delivery of notifications. - interval time.Duration // a store to retrieve notifications and update their receipts store Store // distributed lock used for mutual exclusion locks Locker - // a integer id used for logging - id int + // the interval at which we will attempt delivery of notifications. + interval time.Duration } -func NewDelivery(id int, d Deliverer, interval time.Duration, store Store, l Locker) *Delivery { +func NewDelivery(store Store, l Locker, d Deliverer, interval time.Duration) *Delivery { return &Delivery{ Deliverer: d, interval: interval, store: store, locks: l, - id: id, } } // Deliver begins delivering notifications. // // Canceling the ctx will end delivery. -func (d *Delivery) Deliver(ctx context.Context) { +func (d *Delivery) Deliver(ctx context.Context) error { ctx = zlog.ContextWithValues(ctx, "deliverer", d.Deliverer.Name(), "component", "notifier/Delivery.Deliver", - "id", strconv.Itoa(d.id), ) zlog.Info(ctx). Msg("delivering notifications") - go d.deliver(ctx) -} - -// deliver is intended to be ran as a go routine. -// -// implements a blocking event loop via a time.Ticker -func (d *Delivery) deliver(ctx context.Context) error { - ctx = zlog.ContextWithValues(ctx, "component", "notifier/Delivery.deliver") - defer func() { - if err := d.locks.Close(ctx); err != nil { - zlog.Warn(ctx).Err(err).Msg("error closing lock source") - } - }() ticker := time.NewTicker(d.interval) defer ticker.Stop() for { @@ -85,7 +67,6 @@ func (d *Delivery) deliver(ctx context.Context) error { func (d *Delivery) RunDelivery(ctx context.Context) error { ctx = zlog.ContextWithValues(ctx, "deliverer", d.Deliverer.Name(), - "id", strconv.Itoa(d.id), "component", "notifier/Delivery.RunDelivery", ) diff --git a/notifier/poller.go b/notifier/poller.go index a679181a15..dd147c2642 100644 --- a/notifier/poller.go +++ b/notifier/poller.go @@ -31,7 +31,7 @@ type Poller struct { interval time.Duration } -func NewPoller(interval time.Duration, store Store, differ matcher.Differ) *Poller { +func NewPoller(store Store, differ matcher.Differ, interval time.Duration) *Poller { return &Poller{ interval: interval, store: store, @@ -46,29 +46,19 @@ type Event struct { uo driver.UpdateOperation } -// Poll is a non blocking call which begins -// polling the Matcher for UpdateOperations. -// -// Returned channel can be listened to for events. +// Poll begins polling the Matcher for UpdateOperations and producing Events on +// the supplied channel. This method takes ownership of the channel and is +// responsible for closing it. // // Cancel ctx to stop the poller. -func (p *Poller) Poll(ctx context.Context) <-chan Event { - c := make(chan Event, MaxChanSize) - go p.poll(ctx, c) - return c -} - -// poll is intended to be ran as a go routine. -// -// implements a blocking event loop via a time.Ticker -func (p *Poller) poll(ctx context.Context, c chan<- Event) { +func (p *Poller) Poll(ctx context.Context, c chan<- Event) error { ctx = zlog.ContextWithValues(ctx, "component", "notifier/Poller.poll") defer close(c) if err := ctx.Err(); err != nil { zlog.Info(ctx). Msg("context canceled before polling began") - return + return err } // loop on interval tick @@ -79,7 +69,7 @@ func (p *Poller) poll(ctx context.Context, c chan<- Event) { case <-ctx.Done(): zlog.Info(ctx). Msg("context canceled. polling ended") - return + return ctx.Err() case <-t.C: zlog.Debug(ctx). Msg("poll interval tick") @@ -111,7 +101,7 @@ func (p *Poller) onTick(ctx context.Context, c chan<- Event) { latest := uo[0] ctx = zlog.ContextWithValues(ctx, "UOID", latest.Ref.String()) // confirm notifications were never created for this UOID. - var errNoReceipt clairerror.ErrNoReceipt + var errNoReceipt *clairerror.ErrNoReceipt _, err := p.store.ReceiptByUOID(ctx, latest.Ref) if errors.As(err, &errNoReceipt) { e := Event{ diff --git a/notifier/processor.go b/notifier/processor.go index 2aa545015b..c6305ab2ae 100644 --- a/notifier/processor.go +++ b/notifier/processor.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strconv" "sync" "github.com/google/uuid" @@ -32,8 +31,6 @@ type Processor struct { matcher matcher.Service // a store instance to persist notifications store Store - // a integer id used for logging - id int // NoSummary controls whether per-manifest vulnerability summarization // should happen. @@ -42,13 +39,12 @@ type Processor struct { NoSummary bool } -func NewProcessor(id int, l Locker, indexer indexer.Service, matcher matcher.Service, store Store) *Processor { +func NewProcessor(store Store, l Locker, indexer indexer.Service, matcher matcher.Service) *Processor { return &Processor{ locks: l, indexer: indexer, matcher: matcher, store: store, - id: id, } } @@ -56,30 +52,15 @@ func NewProcessor(id int, l Locker, indexer indexer.Service, matcher matcher.Ser // updates the notifier system with the "latest" seen UOID. // // Canceling the ctx will end the processing. -func (p *Processor) Process(ctx context.Context, c <-chan Event) { - go p.process(ctx, c) -} - -// process is intended to be ran as a go routine. -// -// implements the blocking event loop of a processor. -func (p *Processor) process(ctx context.Context, c <-chan Event) { - ctx = zlog.ContextWithValues(ctx, - "component", "notifier/Processor.process", - "processor_id", strconv.Itoa(p.id), - ) +func (p *Processor) Process(ctx context.Context, c <-chan Event) error { + ctx = zlog.ContextWithValues(ctx, "component", "notifier/Processor.process") - defer func() { - if err := p.locks.Close(ctx); err != nil { - zlog.Warn(ctx).Err(err).Msg("error closing locker") - } - }() zlog.Debug(ctx).Msg("processing events") for { select { case <-ctx.Done(): zlog.Info(ctx).Msg("context canceled: ending event processing") - return + return ctx.Err() case e := <-c: ctx := zlog.ContextWithValues(ctx, "updater", e.updater, @@ -292,7 +273,7 @@ func (p *Processor) safe(ctx context.Context, e Event) (bool, uuid.UUID) { ctx = zlog.ContextWithValues(ctx, "component", "notifier/Processor.safe") // confirm we are not making duplicate notifications - var errNoReceipt clairerror.ErrNoReceipt + var errNoReceipt *clairerror.ErrNoReceipt _, err := p.store.ReceiptByUOID(ctx, e.uo.Ref) switch { case errors.As(err, &errNoReceipt): diff --git a/notifier/processor_safe_test.go b/notifier/processor_safe_test.go index 7f4d54870c..081959bb33 100644 --- a/notifier/processor_safe_test.go +++ b/notifier/processor_safe_test.go @@ -52,7 +52,7 @@ func testSafe(t *testing.T) { ctx := zlog.Test(context.Background(), t) sm := &MockStore{ ReceiptByUOID_: func(ctx context.Context, id uuid.UUID) (Receipt, error) { - return Receipt{}, clairerror.ErrNoReceipt{} + return Receipt{}, &clairerror.ErrNoReceipt{} }, } mm := &matcher.Mock{ @@ -108,7 +108,7 @@ func testUnsafeMatcherErr(t *testing.T) { ctx := zlog.Test(context.Background(), t) sm := &MockStore{ ReceiptByUOID_: func(ctx context.Context, id uuid.UUID) (Receipt, error) { - return Receipt{}, clairerror.ErrNoReceipt{} + return Receipt{}, &clairerror.ErrNoReceipt{} }, } mm := &matcher.Mock{ @@ -138,7 +138,7 @@ func testUnsafeStaleUOID(t *testing.T) { ctx := zlog.Test(context.Background(), t) sm := &MockStore{ ReceiptByUOID_: func(ctx context.Context, id uuid.UUID) (Receipt, error) { - return Receipt{}, clairerror.ErrNoReceipt{} + return Receipt{}, &clairerror.ErrNoReceipt{} }, } mm := &matcher.Mock{ diff --git a/notifier/service/service.go b/notifier/service/service.go index fc11298add..2b52538e5d 100644 --- a/notifier/service/service.go +++ b/notifier/service/service.go @@ -96,8 +96,9 @@ func New(ctx context.Context, opts Opts) (*service, error) { zlog.Info(ctx). Stringer("interval", opts.PollInterval). Msg("initializing poller") - poller := notifier.NewPoller(opts.PollInterval, store, opts.Matcher) - c := poller.Poll(ctx) + poller := notifier.NewPoller(store, opts.Matcher, opts.PollInterval) + ch := make(chan notifier.Event, notifier.MaxChanSize) + go poller.Poll(ctx, ch) // kick off the processors zlog.Info(ctx). @@ -111,14 +112,13 @@ func New(ctx context.Context, opts Opts) (*service, error) { return nil, err } p := notifier.NewProcessor( - i, + store, l, opts.Indexer, opts.Matcher, - store, ) p.NoSummary = opts.DisableSummary - p.Process(ctx, c) + go p.Process(ctx, ch) } // kick off configured deliverer type @@ -205,7 +205,7 @@ func webhookDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, s if err != nil { return fmt.Errorf("failed to create webhook deliverer: %v", err) } - delivery := notifier.NewDelivery(i, wh, opts.DeliveryInterval, store, l) + delivery := notifier.NewDelivery(store, l, wh, opts.DeliveryInterval) ds = append(ds, delivery) } for _, d := range ds { @@ -237,14 +237,14 @@ func amqpDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, stor if err != nil { return fmt.Errorf("failed to create AMQP deliverer: %v", err) } - delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l) + delivery := notifier.NewDelivery(store, l, q, opts.DeliveryInterval) ds = append(ds, delivery) } else { q, err := namqp.New(conf) if err != nil { return fmt.Errorf("failed to create AMQP deliverer: %v", err) } - delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l) + delivery := notifier.NewDelivery(store, l, q, opts.DeliveryInterval) ds = append(ds, delivery) } } @@ -278,14 +278,14 @@ func stompDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, sto if err != nil { return fmt.Errorf("failed to create STOMP direct deliverer: %v", err) } - delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l) + delivery := notifier.NewDelivery(store, l, q, opts.DeliveryInterval) ds = append(ds, delivery) } else { q, err := stomp.New(conf) if err != nil { return fmt.Errorf("failed to create STOMP deliverer: %v", err) } - delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l) + delivery := notifier.NewDelivery(store, l, q, opts.DeliveryInterval) ds = append(ds, delivery) } }