From 0f6d0e4a6950c64c74f18f1c4d0dee3fbc2eaf01 Mon Sep 17 00:00:00 2001 From: Hank Donnay Date: Mon, 30 Aug 2021 14:15:14 -0500 Subject: [PATCH] notifier: move to ctxlock This migrates off of distlock, which is removed in claircore. Signed-off-by: Hank Donnay --- notifier/delivery.go | 34 ++++++++++++++-------------- notifier/locker.go | 10 +++++++++ notifier/processor.go | 44 ++++++++++++++++--------------------- notifier/service/service.go | 43 ++++++++++++++++++++++++++---------- 4 files changed, 77 insertions(+), 54 deletions(-) create mode 100644 notifier/locker.go diff --git a/notifier/delivery.go b/notifier/delivery.go index c98a6caf8f..1a7f8236df 100644 --- a/notifier/delivery.go +++ b/notifier/delivery.go @@ -6,7 +6,6 @@ import ( "time" "github.com/google/uuid" - "github.com/quay/claircore/pkg/distlock" "github.com/quay/zlog" "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/label" @@ -17,24 +16,24 @@ import ( // Delivery handles the business logic of delivering // notifications. type Delivery struct { - // a Deliverer implemention to invoke. + // a Deliverer implementation to invoke. Deliverer Deliverer // the interval at which we will attempt delivery of notifications. interval time.Duration // a store to retrieve notifications and update their receipts store Store // distributed lock used for mutual exclusion - distLock distlock.Locker + locks Locker // a integer id used for logging id int } -func NewDelivery(id int, d Deliverer, interval time.Duration, store Store, distLock distlock.Locker) *Delivery { +func NewDelivery(id int, d Deliverer, interval time.Duration, store Store, l Locker) *Delivery { return &Delivery{ Deliverer: d, interval: interval, store: store, - distLock: distLock, + locks: l, id: id, } } @@ -61,6 +60,11 @@ func (d *Delivery) deliver(ctx context.Context) error { label.String("component", "notifier/Delivery.deliver"), ) + defer func() { + if err := d.locks.Close(ctx); err != nil { + zlog.Warn(ctx).Err(err).Msg("error closing lock source") + } + }() ticker := time.NewTicker(d.interval) defer ticker.Stop() for { @@ -115,21 +119,17 @@ func (d *Delivery) RunDelivery(ctx context.Context) error { } for _, nID := range toDeliver { - ok, err := d.distLock.TryLock(ctx, nID.String()) - if err != nil { - // distlock failed, back off till next tick - return err - } - if !ok { + var err error + ctx, done := d.locks.TryLock(ctx, nID.String()) + if ok := ctx.Err(); !errors.Is(ok, nil) { zlog.Debug(ctx). + Err(ok). Stringer("notification_id", nID). - Msg("another process is delivering this notification") - // another process is working on this notification - continue + Msg("unable to get lock") + } else { + err = d.do(ctx, nID) } - // an error means we should back off until next tick - err = d.do(ctx, nID) - d.distLock.Unlock() + done() if err != nil { return err } diff --git a/notifier/locker.go b/notifier/locker.go new file mode 100644 index 0000000000..43ad9060b8 --- /dev/null +++ b/notifier/locker.go @@ -0,0 +1,10 @@ +package notifier + +import "context" + +// Locker is any context-based locking API. +type Locker interface { + TryLock(context.Context, string) (context.Context, context.CancelFunc) + Lock(context.Context, string) (context.Context, context.CancelFunc) + Close(context.Context) error +} diff --git a/notifier/processor.go b/notifier/processor.go index 936cf63218..a1d6e2384e 100644 --- a/notifier/processor.go +++ b/notifier/processor.go @@ -9,7 +9,6 @@ import ( "github.com/google/uuid" "github.com/quay/claircore" "github.com/quay/claircore/libvuln/driver" - "github.com/quay/claircore/pkg/distlock" "github.com/quay/zlog" "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/label" @@ -33,7 +32,7 @@ type Processor struct { // makes the defaults line up better. // distributed lock used for mutual exclusion - distLock distlock.Locker + locks Locker // a handle to an indexer service indexer indexer.Service // a handle to a matcher service @@ -44,13 +43,13 @@ type Processor struct { id int } -func NewProcessor(id int, distLock distlock.Locker, indexer indexer.Service, matcher matcher.Service, store Store) *Processor { +func NewProcessor(id int, l Locker, indexer indexer.Service, matcher matcher.Service, store Store) *Processor { return &Processor{ - distLock: distLock, - indexer: indexer, - matcher: matcher, - store: store, - id: id, + locks: l, + indexer: indexer, + matcher: matcher, + store: store, + id: id, } } @@ -72,6 +71,11 @@ func (p *Processor) process(ctx context.Context, c <-chan Event) { label.Int("processor_id", p.id), ) + defer func() { + if err := p.locks.Close(ctx); err != nil { + zlog.Warn(ctx).Err(err).Msg("error closing locker") + } + }() zlog.Debug(ctx).Msg("processing events") for { select { @@ -84,28 +88,18 @@ func (p *Processor) process(ctx context.Context, c <-chan Event) { label.Stringer("UOID", e.uo.Ref), ) zlog.Debug(ctx).Msg("processing") - locked, err := p.distLock.TryLock(ctx, e.uo.Ref.String()) - if err != nil { - zlog.Error(ctx). - Err(err). - Msg("received error trying lock. backing off till next UOID") - continue - } - if !locked { - zlog.Debug(ctx). - Msg("lock acquired by another processor. will not process") - continue - } - // function used to schedule unlock via defer - err = func() error { - defer p.distLock.Unlock() + if err := func() error { + ctx, done := p.locks.TryLock(ctx, e.uo.Ref.String()) + defer done() + if err := ctx.Err(); err != nil { + return err + } safe, prev := p.safe(ctx, e) if !safe { return nil } return p.create(ctx, e, prev) - }() - if err != nil { + }(); err != nil { zlog.Error(ctx). Err(err). Msg("failed to create notifications") diff --git a/notifier/service/service.go b/notifier/service/service.go index 7e76f8973d..13691c13a5 100644 --- a/notifier/service/service.go +++ b/notifier/service/service.go @@ -11,7 +11,7 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v4/pgxpool" _ "github.com/jackc/pgx/v4/stdlib" - pgdl "github.com/quay/claircore/pkg/distlock/postgres" + "github.com/quay/claircore/pkg/ctxlock" "github.com/quay/zlog" "github.com/remind101/migrate" "go.opentelemetry.io/otel/baggage" @@ -107,11 +107,15 @@ func New(ctx context.Context, opts Opts) (*service, error) { Int("count", processors). Msg("initializing processors") for i := 0; i < processors; i++ { - // processors only use try locks - distLock := pgdl.NewPool(lockPool, 0) + // Can't re-use a locker because the Process method unconditionally + // spawns background goroutines. + l, err := ctxlock.New(ctx, lockPool) + if err != nil { + return nil, err + } p := notifier.NewProcessor( i, - distLock, + l, opts.Indexer, opts.Matcher, store, @@ -205,12 +209,17 @@ func webhookDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, s ds := make([]*notifier.Delivery, 0, deliveries) for i := 0; i < deliveries; i++ { - distLock := pgdl.NewPool(lockPool, 0) + // Can't share a ctxlock because the Deliverer object unconditionally + // spawns background goroutines. + l, err := ctxlock.New(ctx, lockPool) + if err != nil { + return fmt.Errorf("failed to create locker: %w", err) + } wh, err := webhook.New(conf, opts.Client) if err != nil { return fmt.Errorf("failed to create webhook deliverer: %v", err) } - delivery := notifier.NewDelivery(i, wh, opts.DeliveryInterval, store, distLock) + delivery := notifier.NewDelivery(i, wh, opts.DeliveryInterval, store, l) ds = append(ds, delivery) } for _, d := range ds { @@ -237,20 +246,25 @@ func amqpDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, stor ds := make([]*notifier.Delivery, 0, deliveries) for i := 0; i < deliveries; i++ { - distLock := pgdl.NewPool(lockPool, 0) + // Can't share a ctxlock because the Deliverer object unconditionally + // spawns background goroutines. + l, err := ctxlock.New(ctx, lockPool) + if err != nil { + return fmt.Errorf("failed to create locker: %w", err) + } if conf.Direct { q, err := namqp.NewDirectDeliverer(conf) if err != nil { return fmt.Errorf("failed to create AMQP deliverer: %v", err) } - delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock) + delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l) ds = append(ds, delivery) } else { q, err := namqp.New(conf) if err != nil { return fmt.Errorf("failed to create AMQP deliverer: %v", err) } - delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock) + delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l) ds = append(ds, delivery) } } @@ -279,20 +293,25 @@ func stompDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, sto ds := make([]*notifier.Delivery, 0, deliveries) for i := 0; i < deliveries; i++ { - distLock := pgdl.NewPool(lockPool, 0) + // Can't share a ctxlock because the Deliverer object unconditionally + // spawns background goroutines. + l, err := ctxlock.New(ctx, lockPool) + if err != nil { + return fmt.Errorf("failed to create locker: %w", err) + } if conf.Direct { q, err := stomp.NewDirectDeliverer(conf) if err != nil { return fmt.Errorf("failed to create STOMP direct deliverer: %v", err) } - delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock) + delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l) ds = append(ds, delivery) } else { q, err := stomp.New(conf) if err != nil { return fmt.Errorf("failed to create STOMP deliverer: %v", err) } - delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock) + delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l) ds = append(ds, delivery) } }