Skip to content

Commit

Permalink
notifier: move to ctxlock
Browse files Browse the repository at this point in the history
This migrates off of distlock, which is removed in claircore.

Signed-off-by: Hank Donnay <hdonnay@redhat.com>
  • Loading branch information
hdonnay committed Sep 2, 2021
1 parent b53aeb7 commit 0f6d0e4
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 54 deletions.
34 changes: 17 additions & 17 deletions notifier/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/google/uuid"
"github.com/quay/claircore/pkg/distlock"
"github.com/quay/zlog"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/label"
Expand All @@ -17,24 +16,24 @@ import (
// Delivery handles the business logic of delivering
// notifications.
type Delivery struct {
// a Deliverer implemention to invoke.
// a Deliverer implementation to invoke.
Deliverer Deliverer
// the interval at which we will attempt delivery of notifications.
interval time.Duration
// a store to retrieve notifications and update their receipts
store Store
// distributed lock used for mutual exclusion
distLock distlock.Locker
locks Locker
// a integer id used for logging
id int
}

func NewDelivery(id int, d Deliverer, interval time.Duration, store Store, distLock distlock.Locker) *Delivery {
func NewDelivery(id int, d Deliverer, interval time.Duration, store Store, l Locker) *Delivery {
return &Delivery{
Deliverer: d,
interval: interval,
store: store,
distLock: distLock,
locks: l,
id: id,
}
}
Expand All @@ -61,6 +60,11 @@ func (d *Delivery) deliver(ctx context.Context) error {
label.String("component", "notifier/Delivery.deliver"),
)

defer func() {
if err := d.locks.Close(ctx); err != nil {
zlog.Warn(ctx).Err(err).Msg("error closing lock source")
}
}()
ticker := time.NewTicker(d.interval)
defer ticker.Stop()
for {
Expand Down Expand Up @@ -115,21 +119,17 @@ func (d *Delivery) RunDelivery(ctx context.Context) error {
}

for _, nID := range toDeliver {
ok, err := d.distLock.TryLock(ctx, nID.String())
if err != nil {
// distlock failed, back off till next tick
return err
}
if !ok {
var err error
ctx, done := d.locks.TryLock(ctx, nID.String())
if ok := ctx.Err(); !errors.Is(ok, nil) {
zlog.Debug(ctx).
Err(ok).
Stringer("notification_id", nID).
Msg("another process is delivering this notification")
// another process is working on this notification
continue
Msg("unable to get lock")
} else {
err = d.do(ctx, nID)
}
// an error means we should back off until next tick
err = d.do(ctx, nID)
d.distLock.Unlock()
done()
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions notifier/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package notifier

import "context"

// Locker is any context-based locking API.
type Locker interface {
TryLock(context.Context, string) (context.Context, context.CancelFunc)
Lock(context.Context, string) (context.Context, context.CancelFunc)
Close(context.Context) error
}
44 changes: 19 additions & 25 deletions notifier/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/google/uuid"
"github.com/quay/claircore"
"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/pkg/distlock"
"github.com/quay/zlog"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/label"
Expand All @@ -33,7 +32,7 @@ type Processor struct {
// makes the defaults line up better.

// distributed lock used for mutual exclusion
distLock distlock.Locker
locks Locker
// a handle to an indexer service
indexer indexer.Service
// a handle to a matcher service
Expand All @@ -44,13 +43,13 @@ type Processor struct {
id int
}

func NewProcessor(id int, distLock distlock.Locker, indexer indexer.Service, matcher matcher.Service, store Store) *Processor {
func NewProcessor(id int, l Locker, indexer indexer.Service, matcher matcher.Service, store Store) *Processor {
return &Processor{
distLock: distLock,
indexer: indexer,
matcher: matcher,
store: store,
id: id,
locks: l,
indexer: indexer,
matcher: matcher,
store: store,
id: id,
}
}

Expand All @@ -72,6 +71,11 @@ func (p *Processor) process(ctx context.Context, c <-chan Event) {
label.Int("processor_id", p.id),
)

defer func() {
if err := p.locks.Close(ctx); err != nil {
zlog.Warn(ctx).Err(err).Msg("error closing locker")
}
}()
zlog.Debug(ctx).Msg("processing events")
for {
select {
Expand All @@ -84,28 +88,18 @@ func (p *Processor) process(ctx context.Context, c <-chan Event) {
label.Stringer("UOID", e.uo.Ref),
)
zlog.Debug(ctx).Msg("processing")
locked, err := p.distLock.TryLock(ctx, e.uo.Ref.String())
if err != nil {
zlog.Error(ctx).
Err(err).
Msg("received error trying lock. backing off till next UOID")
continue
}
if !locked {
zlog.Debug(ctx).
Msg("lock acquired by another processor. will not process")
continue
}
// function used to schedule unlock via defer
err = func() error {
defer p.distLock.Unlock()
if err := func() error {
ctx, done := p.locks.TryLock(ctx, e.uo.Ref.String())
defer done()
if err := ctx.Err(); err != nil {
return err
}
safe, prev := p.safe(ctx, e)
if !safe {
return nil
}
return p.create(ctx, e, prev)
}()
if err != nil {
}(); err != nil {
zlog.Error(ctx).
Err(err).
Msg("failed to create notifications")
Expand Down
43 changes: 31 additions & 12 deletions notifier/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v4/pgxpool"
_ "github.com/jackc/pgx/v4/stdlib"
pgdl "github.com/quay/claircore/pkg/distlock/postgres"
"github.com/quay/claircore/pkg/ctxlock"
"github.com/quay/zlog"
"github.com/remind101/migrate"
"go.opentelemetry.io/otel/baggage"
Expand Down Expand Up @@ -107,11 +107,15 @@ func New(ctx context.Context, opts Opts) (*service, error) {
Int("count", processors).
Msg("initializing processors")
for i := 0; i < processors; i++ {
// processors only use try locks
distLock := pgdl.NewPool(lockPool, 0)
// Can't re-use a locker because the Process method unconditionally
// spawns background goroutines.
l, err := ctxlock.New(ctx, lockPool)
if err != nil {
return nil, err
}
p := notifier.NewProcessor(
i,
distLock,
l,
opts.Indexer,
opts.Matcher,
store,
Expand Down Expand Up @@ -205,12 +209,17 @@ func webhookDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, s

ds := make([]*notifier.Delivery, 0, deliveries)
for i := 0; i < deliveries; i++ {
distLock := pgdl.NewPool(lockPool, 0)
// Can't share a ctxlock because the Deliverer object unconditionally
// spawns background goroutines.
l, err := ctxlock.New(ctx, lockPool)
if err != nil {
return fmt.Errorf("failed to create locker: %w", err)
}
wh, err := webhook.New(conf, opts.Client)
if err != nil {
return fmt.Errorf("failed to create webhook deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, wh, opts.DeliveryInterval, store, distLock)
delivery := notifier.NewDelivery(i, wh, opts.DeliveryInterval, store, l)
ds = append(ds, delivery)
}
for _, d := range ds {
Expand All @@ -237,20 +246,25 @@ func amqpDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, stor

ds := make([]*notifier.Delivery, 0, deliveries)
for i := 0; i < deliveries; i++ {
distLock := pgdl.NewPool(lockPool, 0)
// Can't share a ctxlock because the Deliverer object unconditionally
// spawns background goroutines.
l, err := ctxlock.New(ctx, lockPool)
if err != nil {
return fmt.Errorf("failed to create locker: %w", err)
}
if conf.Direct {
q, err := namqp.NewDirectDeliverer(conf)
if err != nil {
return fmt.Errorf("failed to create AMQP deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock)
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l)
ds = append(ds, delivery)
} else {
q, err := namqp.New(conf)
if err != nil {
return fmt.Errorf("failed to create AMQP deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock)
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l)
ds = append(ds, delivery)
}
}
Expand Down Expand Up @@ -279,20 +293,25 @@ func stompDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, sto

ds := make([]*notifier.Delivery, 0, deliveries)
for i := 0; i < deliveries; i++ {
distLock := pgdl.NewPool(lockPool, 0)
// Can't share a ctxlock because the Deliverer object unconditionally
// spawns background goroutines.
l, err := ctxlock.New(ctx, lockPool)
if err != nil {
return fmt.Errorf("failed to create locker: %w", err)
}
if conf.Direct {
q, err := stomp.NewDirectDeliverer(conf)
if err != nil {
return fmt.Errorf("failed to create STOMP direct deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock)
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l)
ds = append(ds, delivery)
} else {
q, err := stomp.New(conf)
if err != nil {
return fmt.Errorf("failed to create STOMP deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock)
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, l)
ds = append(ds, delivery)
}
}
Expand Down

0 comments on commit 0f6d0e4

Please sign in to comment.