Skip to content

Commit

Permalink
datastore: Add vuln & enrich stream updates
Browse files Browse the repository at this point in the history
  • Loading branch information
jvdm committed Apr 11, 2024
1 parent 813e7cc commit 7039a7d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 11 deletions.
3 changes: 3 additions & 0 deletions datastore/enrichment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ type EnrichmentUpdater interface {
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
// queries by clients.
UpdateEnrichments(ctx context.Context, kind string, fingerprint driver.Fingerprint, enrichments []driver.EnrichmentRecord) (uuid.UUID, error)
// StreamUpdateEnrichments is equivalent to UpdateEnrichments, but it consumes
// data streamed from a channel.
StreamUpdateEnrichments(ctx context.Context, kind string, fingerprint driver.Fingerprint, enrichmentsC chan *driver.EnrichmentRecord) (uuid.UUID, error) {

Check failure on line 20 in datastore/enrichment.go

View workflow job for this annotation

GitHub Actions / Tests (1.21)

syntax error: unexpected { in interface type; possibly missing semicolon or newline or }
}

// Enrichment is an interface for querying enrichments from the store.
Expand Down
32 changes: 28 additions & 4 deletions datastore/postgres/enrichment.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,28 @@ var (
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
// queried by clients.
func (s *MatcherStore) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) {
ch := make(chan *driver.EnrichmentRecord)
go func() {
defer close(ch)
for i := range es {
select {
case <-ctx.Done():
return
case ch <- &es[i]:
}
}
}()
return s.updateEnrichments(ctx, name, fp, ch)
}

// StreamUpdateEnrichments is equivalent to UpdateEnrichments, but it consumes
// data from a channel. The enrichments are batched before being sent to
// postgres.
func (s *MatcherStore) StreamUpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, esC chan *driver.EnrichmentRecord) (uuid.UUID, error) {
return s.updateEnrichments(ctx, name, fp, esC)
}

func (s *MatcherStore) updateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, esC chan *driver.EnrichmentRecord) (uuid.UUID, error) {
const (
create = `
INSERT
Expand Down Expand Up @@ -134,17 +156,19 @@ DO

batch := microbatch.NewInsert(tx, 2000, time.Minute)
start = time.Now()
for i := range es {
hashKind, hash := hashEnrichment(&es[i])
inserted := 0
for e := range esC {
hashKind, hash := hashEnrichment(e)
err := batch.Queue(ctx, insert,
hashKind, hash, name, es[i].Tags, es[i].Enrichment,
hashKind, hash, name, e.Tags, e.Enrichment,
)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to queue enrichment: %w", err)
}
if err := batch.Queue(ctx, assoc, hashKind, hash, name, id); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
}
inserted++
}
if err := batch.Done(ctx); err != nil {
return uuid.Nil, fmt.Errorf("failed to finish batch enrichment insert: %w", err)
Expand All @@ -160,7 +184,7 @@ DO
}
zlog.Debug(ctx).
Stringer("ref", ref).
Int("inserted", len(es)).
Int("inserted", inserted).
Msg("update_operation committed")
return ref, nil
}
Expand Down
52 changes: 45 additions & 7 deletions datastore/postgres/updatevulnerabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,30 @@ var (
)
)

// deltaUpdateOpts hold options to enable delta vulnerability updates.
type deltaUpdateOpts struct {
// Perform delta updates
deletedVulns []string
vulns []*claircore.Vulnerability
}

// UpdateVulnerabilities implements vulnstore.Updater.
//
// It creates a new UpdateOperation for this update call, inserts the
// provided vulnerabilities and computes a diff comprising the removed
// and added vulnerabilities for this UpdateOperation.
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilities")
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, nil, false)
vulnC := vulnChan(ctx, vulns)
return s.updateVulnerabilities(ctx, updater, fingerprint, vulnC, nil)
}

// StreamUpdateVulnerabilities implements vulnstore.Updater
//
// The vulns are batched before being sent to postgres.
func (s *MatcherStore) StreamUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnC chan *claircore.Vulnerability) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.StreamUpdateVulnerabilities")
return s.updateVulnerabilities(ctx, updater, fingerprint, vulnC, nil)
}

// DeltaUpdateVulnerabilities implements vulnstore.Updater.
Expand All @@ -68,10 +84,29 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string
// - Associate new vulnerabilities with new updateOperation
func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.DeltaUpdateVulnerabilities")
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, deletedVulns, true)
vulnC := vulnChan(ctx, vulns)
return s.updateVulnerabilities(ctx, updater, fingerprint, vulnC, &deltaUpdateOpts{
vulns: vulns,
deletedVulns: deletedVulns,
})
}

func vulnChan(ctx context.Context, vulns []*claircore.Vulnerability) chan *claircore.Vulnerability {
ch := make(chan *claircore.Vulnerability)
go func() {
defer close(ch)
for i := range vulns {
select {
case <-ctx.Done():
return
case ch <- vulns[i]:
}
}
}()
return ch
}

func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string, delta bool) (uuid.UUID, error) {
func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnC chan *claircore.Vulnerability, deltaOpts *deltaUpdateOpts) (uuid.UUID, error) {
const (
// Create makes a new update operation and returns the reference and ID.
create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;`
Expand Down Expand Up @@ -139,6 +174,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err)
}

delta := deltaOpts != nil
updateVulnerabilitiesCounter.WithLabelValues("create", strconv.FormatBool(delta)).Add(1)
updateVulnerabilitiesDuration.WithLabelValues("create", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds())

Expand Down Expand Up @@ -181,13 +217,13 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
}

if len(oldVulns) > 0 {
for _, v := range vulns {
for _, v := range deltaOpts.vulns {
// If we have an existing vuln in the new batch
// delete it from the oldVulns map so it doesn't
// get associated with the new update_operation.
delete(oldVulns, v.Name)
}
for _, delName := range deletedVulns {
for _, delName := range deltaOpts.deletedVulns {
// If we have an existing vuln that has been signaled
// as deleted by the updater then delete it so it doesn't
// get associated with the new update_operation.
Expand All @@ -211,11 +247,13 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string

// batch insert vulnerabilities
skipCt := 0
vulnCt := 0

start = time.Now()

mBatcher := microbatch.NewInsert(tx, 2000, time.Minute)
for _, vuln := range vulns {
for vuln := range vulnC {
vulnCt++
if vuln.Package == nil || vuln.Package.Name == "" {
skipCt++
continue
Expand Down Expand Up @@ -266,7 +304,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
zlog.Debug(ctx).
Str("ref", ref.String()).
Int("skipped", skipCt).
Int("inserted", len(vulns)-skipCt).
Int("inserted", vulnCt-skipCt).
Msg("update_operation committed")
return ref, nil
}
Expand Down
3 changes: 3 additions & 0 deletions datastore/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Updater interface {
// vulnerabilities, and ensures vulnerabilities from previous updates are
// not queried by clients.
UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error)
// StreamUpdateVulnerabilities is equivalent to UpdateVulnerabilities
// but it consumes data from a channel.
StreamUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns chan *claircore.Vulnerability) (uuid.UUID, error)
// DeltaUpdateVulnerabilities creates a new UpdateOperation consisting of existing
// vulnerabilities and new vulnerabilities. It also takes an array of deleted
// vulnerability names which should no longer be available to query.
Expand Down

0 comments on commit 7039a7d

Please sign in to comment.