Skip to content

Commit

Permalink
notifier: pass configured client into notifier
Browse files Browse the repository at this point in the history
Signed-off-by: Hank Donnay <hdonnay@redhat.com>
  • Loading branch information
hdonnay committed Oct 2, 2020
1 parent a4e0410 commit 57e1ed0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 11 deletions.
22 changes: 19 additions & 3 deletions initialize/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const (
// DefaultUpdatePeriod is the default period used for running updaters
// within matcher processes.
DefaultUpdatePeriod = 30 * time.Minute
// NotifierIssuer is the value used for the issuer claim of any outgoing
// HTTP requests the notifier makes, if PSK auth is configured.
NotifierIssuer = `clair-notifier`
)

// Services will initialize the correct ClairCore services
Expand Down Expand Up @@ -96,12 +99,17 @@ func (i *Init) Services() error {
Msg: "notifier: failed to parse poll interval: " + err.Error(),
}
}
c, _, err := i.conf.Client(nil, notifierClaim)
if err != nil {
return err
}

n, err := notifier.New(i.GlobalCTX, notifier.Opts{
DeliveryInterval: dInterval,
ConnString: i.conf.Notifier.ConnString,
Indexer: libI,
Matcher: libV,
Client: c,
Migrations: i.conf.Notifier.Migrations,
PollInterval: pInterval,
Webhook: i.conf.Notifier.Webhook,
Expand Down Expand Up @@ -172,7 +180,7 @@ func (i *Init) Services() error {
return fmt.Errorf("failed to initialize libvuln: %v", err)
}
// matcher mode needs a remote indexer client
c, auth, err := i.conf.Client(nil, intraservice)
c, auth, err := i.conf.Client(nil, intraserviceClaim)
switch {
case err != nil:
return err
Expand All @@ -192,7 +200,7 @@ func (i *Init) Services() error {
i.Matcher = libV
case config.NotifierMode:
// notifier uses a remote indexer and matcher
c, auth, err := i.conf.Client(nil, intraservice)
c, auth, err := i.conf.Client(nil, intraserviceClaim)
switch {
case err != nil:
return err
Expand Down Expand Up @@ -229,12 +237,17 @@ func (i *Init) Services() error {
Msg: "notifier: failed to parse poll interval: " + err.Error(),
}
}
c, _, err = i.conf.Client(nil, notifierClaim)
if err != nil {
return err
}

n, err := notifier.New(i.GlobalCTX, notifier.Opts{
DeliveryInterval: dInterval,
ConnString: i.conf.Notifier.ConnString,
Indexer: remoteIndexer,
Matcher: remoteMatcher,
Client: c,
Migrations: i.conf.Notifier.Migrations,
PollInterval: pInterval,
Webhook: i.conf.Notifier.Webhook,
Expand All @@ -257,4 +270,7 @@ func (i *Init) Services() error {
return nil
}

var intraservice = jwt.Claims{Issuer: httptransport.IntraserviceIssuer}
var (
intraserviceClaim = jwt.Claims{Issuer: httptransport.IntraserviceIssuer}
notifierClaim = jwt.Claims{Issuer: NotifierIssuer}
)
19 changes: 11 additions & 8 deletions notifier/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package service
import (
"context"
"fmt"
"net/http"
"os"
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/jmoiron/sqlx"
pgdl "github.com/quay/claircore/pkg/distlock/postgres"
"github.com/remind101/migrate"
"github.com/rs/zerolog"

"github.com/quay/clair/v4/indexer"
"github.com/quay/clair/v4/matcher"
"github.com/quay/clair/v4/notifier"
Expand All @@ -18,9 +23,6 @@ import (
"github.com/quay/clair/v4/notifier/postgres"
"github.com/quay/clair/v4/notifier/stomp"
"github.com/quay/clair/v4/notifier/webhook"
pgdl "github.com/quay/claircore/pkg/distlock/postgres"
"github.com/remind101/migrate"
"github.com/rs/zerolog"
)

const (
Expand Down Expand Up @@ -75,6 +77,7 @@ type Opts struct {
ConnString string
Matcher matcher.Service
Indexer indexer.Service
Client *http.Client
Webhook *webhook.Config
AMQP *namqp.Config
STOMP *stomp.Config
Expand Down Expand Up @@ -109,12 +112,12 @@ func New(ctx context.Context, opts Opts) (*service, error) {
}

// kick off the poller
log.Info().Str("interval", opts.PollInterval.String()).Msg("intializing poller")
log.Info().Str("interval", opts.PollInterval.String()).Msg("initializing poller")
poller := notifier.NewPoller(opts.PollInterval, store, opts.Matcher)
c := poller.Poll(ctx)

// kick off the processors
log.Info().Int("count", processors).Msg("intializing processors")
log.Info().Int("count", processors).Msg("initializing processors")
for i := 0; i < processors; i++ {
// processors only use try locks
distLock := pgdl.NewLock(lockPool, 0)
Expand Down Expand Up @@ -195,7 +198,7 @@ func storeInit(ctx context.Context, opts Opts) (*postgres.Store, *postgres.KeySt
}
}

log.Info().Msg("intializing notifier store")
log.Info().Msg("initializing notifier store")
store := postgres.NewStore(pool)
keystore := postgres.NewKeyStore(pool)
return store, keystore, lockPool, nil
Expand All @@ -220,7 +223,7 @@ func webhookDeliveries(ctx context.Context, opts Opts, lockPool *sqlx.DB, store
Str("component", "notifier/service/webhookInit").
Logger()
ctx = log.WithContext(ctx)
log.Info().Int("count", deliveries).Msg("intializing webhook deliverers")
log.Info().Int("count", deliveries).Msg("initializing webhook deliverers")

conf, err := opts.Webhook.Validate()
if err != nil {
Expand All @@ -230,7 +233,7 @@ func webhookDeliveries(ctx context.Context, opts Opts, lockPool *sqlx.DB, store
ds := make([]*notifier.Delivery, 0, deliveries)
for i := 0; i < deliveries; i++ {
distLock := pgdl.NewLock(lockPool, 0)
wh, err := webhook.New(conf, nil, keymanager)
wh, err := webhook.New(conf, opts.Client, keymanager)
if err != nil {
return fmt.Errorf("failed to create webhook deliverer: %v", err)
}
Expand Down

0 comments on commit 57e1ed0

Please sign in to comment.