Skip to content

Commit

Permalink
notifier: use external concurrency in Delivery, Poller, Processor
Browse files Browse the repository at this point in the history
This commit changes Delivery, Poller, and Processor to require the
caller to supply goroutines to drive the methods. Giving the caller
control over the goroutine makes it possible to use shared resources
(e.g. database pool, lock pool) and ensure they're cleaned up correctly.

Signed-off-by: Hank Donnay <hdonnay@redhat.com>
  • Loading branch information
hdonnay committed Feb 22, 2022
1 parent c8024bb commit 4ba6aca
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 78 deletions.
27 changes: 4 additions & 23 deletions notifier/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package notifier
import (
"context"
"errors"
"strconv"
"time"

"github.com/google/uuid"
Expand All @@ -17,51 +16,34 @@ import (
type Delivery struct {
// a Deliverer implementation to invoke.
Deliverer Deliverer
// the interval at which we will attempt delivery of notifications.
interval time.Duration
// a store to retrieve notifications and update their receipts
store Store
// distributed lock used for mutual exclusion
locks Locker
// a integer id used for logging
id int
// the interval at which we will attempt delivery of notifications.
interval time.Duration
}

func NewDelivery(id int, d Deliverer, interval time.Duration, store Store, l Locker) *Delivery {
func NewDelivery(store Store, l Locker, d Deliverer, interval time.Duration) *Delivery {
return &Delivery{
Deliverer: d,
interval: interval,
store: store,
locks: l,
id: id,
}
}

// Deliver begins delivering notifications.
//
// Canceling the ctx will end delivery.
func (d *Delivery) Deliver(ctx context.Context) {
func (d *Delivery) Deliver(ctx context.Context) error {
ctx = zlog.ContextWithValues(ctx,
"deliverer", d.Deliverer.Name(),
"component", "notifier/Delivery.Deliver",
"id", strconv.Itoa(d.id),
)
zlog.Info(ctx).
Msg("delivering notifications")
go d.deliver(ctx)
}

// deliver is intended to be ran as a go routine.
//
// implements a blocking event loop via a time.Ticker
func (d *Delivery) deliver(ctx context.Context) error {
ctx = zlog.ContextWithValues(ctx, "component", "notifier/Delivery.deliver")

defer func() {
if err := d.locks.Close(ctx); err != nil {
zlog.Warn(ctx).Err(err).Msg("error closing lock source")
}
}()
ticker := time.NewTicker(d.interval)
defer ticker.Stop()
for {
Expand All @@ -85,7 +67,6 @@ func (d *Delivery) deliver(ctx context.Context) error {
func (d *Delivery) RunDelivery(ctx context.Context) error {
ctx = zlog.ContextWithValues(ctx,
"deliverer", d.Deliverer.Name(),
"id", strconv.Itoa(d.id),
"component", "notifier/Delivery.RunDelivery",
)

Expand Down
26 changes: 8 additions & 18 deletions notifier/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Poller struct {
interval time.Duration
}

func NewPoller(interval time.Duration, store Store, differ matcher.Differ) *Poller {
func NewPoller(store Store, differ matcher.Differ, interval time.Duration) *Poller {
return &Poller{
interval: interval,
store: store,
Expand All @@ -46,29 +46,19 @@ type Event struct {
uo driver.UpdateOperation
}

// Poll is a non blocking call which begins
// polling the Matcher for UpdateOperations.
//
// Returned channel can be listened to for events.
// Poll begins polling the Matcher for UpdateOperations and producing Events on
// the supplied channel. This method takes ownership of the channel and is
// responsible for closing it.
//
// Cancel ctx to stop the poller.
func (p *Poller) Poll(ctx context.Context) <-chan Event {
c := make(chan Event, MaxChanSize)
go p.poll(ctx, c)
return c
}

// poll is intended to be ran as a go routine.
//
// implements a blocking event loop via a time.Ticker
func (p *Poller) poll(ctx context.Context, c chan<- Event) {
func (p *Poller) Poll(ctx context.Context, c chan<- Event) error {
ctx = zlog.ContextWithValues(ctx, "component", "notifier/Poller.poll")

defer close(c)
if err := ctx.Err(); err != nil {
zlog.Info(ctx).
Msg("context canceled before polling began")
return
return err
}

// loop on interval tick
Expand All @@ -79,7 +69,7 @@ func (p *Poller) poll(ctx context.Context, c chan<- Event) {
case <-ctx.Done():
zlog.Info(ctx).
Msg("context canceled. polling ended")
return
return ctx.Err()
case <-t.C:
zlog.Debug(ctx).
Msg("poll interval tick")
Expand Down Expand Up @@ -111,7 +101,7 @@ func (p *Poller) onTick(ctx context.Context, c chan<- Event) {
latest := uo[0]
ctx = zlog.ContextWithValues(ctx, "UOID", latest.Ref.String())
// confirm notifications were never created for this UOID.
var errNoReceipt clairerror.ErrNoReceipt
var errNoReceipt *clairerror.ErrNoReceipt
_, err := p.store.ReceiptByUOID(ctx, latest.Ref)
if errors.As(err, &errNoReceipt) {
e := Event{
Expand Down
29 changes: 5 additions & 24 deletions notifier/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"

"github.com/google/uuid"
Expand Down Expand Up @@ -32,8 +31,6 @@ type Processor struct {
matcher matcher.Service
// a store instance to persist notifications
store Store
// a integer id used for logging
id int

// NoSummary controls whether per-manifest vulnerability summarization
// should happen.
Expand All @@ -42,44 +39,28 @@ type Processor struct {
NoSummary bool
}

func NewProcessor(id int, l Locker, indexer indexer.Service, matcher matcher.Service, store Store) *Processor {
func NewProcessor(store Store, l Locker, indexer indexer.Service, matcher matcher.Service) *Processor {
return &Processor{
locks: l,
indexer: indexer,
matcher: matcher,
store: store,
id: id,
}
}

// Process receives new UOs as events, creates and persists notifications, and
// updates the notifier system with the "latest" seen UOID.
//
// Canceling the ctx will end the processing.
func (p *Processor) Process(ctx context.Context, c <-chan Event) {
go p.process(ctx, c)
}

// process is intended to be ran as a go routine.
//
// implements the blocking event loop of a processor.
func (p *Processor) process(ctx context.Context, c <-chan Event) {
ctx = zlog.ContextWithValues(ctx,
"component", "notifier/Processor.process",
"processor_id", strconv.Itoa(p.id),
)
func (p *Processor) Process(ctx context.Context, c <-chan Event) error {
ctx = zlog.ContextWithValues(ctx, "component", "notifier/Processor.process")

defer func() {
if err := p.locks.Close(ctx); err != nil {
zlog.Warn(ctx).Err(err).Msg("error closing locker")
}
}()
zlog.Debug(ctx).Msg("processing events")
for {
select {
case <-ctx.Done():
zlog.Info(ctx).Msg("context canceled: ending event processing")
return
return ctx.Err()
case e := <-c:
ctx := zlog.ContextWithValues(ctx,
"updater", e.updater,
Expand Down Expand Up @@ -292,7 +273,7 @@ func (p *Processor) safe(ctx context.Context, e Event) (bool, uuid.UUID) {
ctx = zlog.ContextWithValues(ctx, "component", "notifier/Processor.safe")

// confirm we are not making duplicate notifications
var errNoReceipt clairerror.ErrNoReceipt
var errNoReceipt *clairerror.ErrNoReceipt
_, err := p.store.ReceiptByUOID(ctx, e.uo.Ref)
switch {
case errors.As(err, &errNoReceipt):
Expand Down
6 changes: 3 additions & 3 deletions notifier/processor_safe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func testSafe(t *testing.T) {
ctx := zlog.Test(context.Background(), t)
sm := &MockStore{
ReceiptByUOID_: func(ctx context.Context, id uuid.UUID) (Receipt, error) {
return Receipt{}, clairerror.ErrNoReceipt{}
return Receipt{}, &clairerror.ErrNoReceipt{}
},
}
mm := &matcher.Mock{
Expand Down Expand Up @@ -108,7 +108,7 @@ func testUnsafeMatcherErr(t *testing.T) {
ctx := zlog.Test(context.Background(), t)
sm := &MockStore{
ReceiptByUOID_: func(ctx context.Context, id uuid.UUID) (Receipt, error) {
return Receipt{}, clairerror.ErrNoReceipt{}
return Receipt{}, &clairerror.ErrNoReceipt{}
},
}
mm := &matcher.Mock{
Expand Down Expand Up @@ -138,7 +138,7 @@ func testUnsafeStaleUOID(t *testing.T) {
ctx := zlog.Test(context.Background(), t)
sm := &MockStore{
ReceiptByUOID_: func(ctx context.Context, id uuid.UUID) (Receipt, error) {
return Receipt{}, clairerror.ErrNoReceipt{}
return Receipt{}, &clairerror.ErrNoReceipt{}
},
}
mm := &matcher.Mock{
Expand Down
20 changes: 10 additions & 10 deletions notifier/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ func New(ctx context.Context, opts Opts) (*service, error) {
zlog.Info(ctx).
Stringer("interval", opts.PollInterval).
Msg("initializing poller")
poller := notifier.NewPoller(opts.PollInterval, store, opts.Matcher)
c := poller.Poll(ctx)
poller := notifier.NewPoller(store, opts.Matcher, opts.PollInterval)
ch := make(chan notifier.Event, notifier.MaxChanSize)
go poller.Poll(ctx, ch)

// kick off the processors
zlog.Info(ctx).
Expand All @@ -111,14 +112,13 @@ func New(ctx context.Context, opts Opts) (*service, error) {
return nil, err
}
p := notifier.NewProcessor(
i,
store,
l,
opts.Indexer,
opts.Matcher,
store,
)
p.NoSummary = opts.DisableSummary
p.Process(ctx, c)
go p.Process(ctx, ch)
}

// kick off configured deliverer type
Expand Down Expand Up @@ -205,7 +205,7 @@ func webhookDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, s
if err != nil {
return fmt.Errorf("failed to create webhook deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, wh, opts.DeliveryInterval, store, l)
delivery := notifier.NewDelivery(store, l, wh, opts.DeliveryInterval)
ds = append(ds, delivery)
}
for _, d := range ds {
Expand Down Expand Up @@ -237,14 +237,14 @@ func amqpDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, stor
if err != nil {
return fmt.Errorf("failed to create AMQP deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l)
delivery := notifier.NewDelivery(store, l, q, opts.DeliveryInterval)
ds = append(ds, delivery)
} else {
q, err := namqp.New(conf)
if err != nil {
return fmt.Errorf("failed to create AMQP deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l)
delivery := notifier.NewDelivery(store, l, q, opts.DeliveryInterval)
ds = append(ds, delivery)
}
}
Expand Down Expand Up @@ -278,14 +278,14 @@ func stompDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, sto
if err != nil {
return fmt.Errorf("failed to create STOMP direct deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l)
delivery := notifier.NewDelivery(store, l, q, opts.DeliveryInterval)
ds = append(ds, delivery)
} else {
q, err := stomp.New(conf)
if err != nil {
return fmt.Errorf("failed to create STOMP deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l)
delivery := notifier.NewDelivery(store, l, q, opts.DeliveryInterval)
ds = append(ds, delivery)
}
}
Expand Down

0 comments on commit 4ba6aca

Please sign in to comment.