Skip to content

Commit

Permalink
matcher: Add iterator-based DB update APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
jvdm committed Apr 18, 2024
1 parent e8f9aff commit e08f257
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 64 deletions.
9 changes: 9 additions & 0 deletions datastore/enrichment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,22 @@ import (
"github.com/quay/claircore/libvuln/driver"
)

// EnrichmentIter is a function for iterating on enrichment records.
//
// It accepts a callback function 'yield' to handle each enrichment record. If the callback
// returns an error, the iteration is halted, and the error is propagated to indicate cancellation.
type EnrichmentIter func(yield func(enricher *driver.EnrichmentRecord) error) error

// EnrichmentUpdater is an interface exporting the necessary methods
// for storing and querying Enrichments.
type EnrichmentUpdater interface {
// UpdateEnrichments creates a new EnrichmentUpdateOperation, inserts the provided
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
// queries by clients.
UpdateEnrichments(ctx context.Context, kind string, fingerprint driver.Fingerprint, enrichments []driver.EnrichmentRecord) (uuid.UUID, error)
// UpdateEnrichmentsIter performs the same operation as UpdateEnrichments, but
// accepting an iterator function.
UpdateEnrichmentsIter(ctx context.Context, kind string, fingerprint driver.Fingerprint, enIter EnrichmentIter) (uuid.UUID, error)
}

// Enrichment is an interface for querying enrichments from the store.
Expand Down
39 changes: 32 additions & 7 deletions datastore/postgres/enrichment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/jackc/pgx/v4/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/quay/claircore/datastore"
"github.com/quay/zlog"

"github.com/quay/claircore/libvuln/driver"
Expand Down Expand Up @@ -60,10 +61,28 @@ var (
)
)

func (s *MatcherStore) UpdateEnrichmentsIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichmentsIter")
return s.updateEnrichments(ctx, updater, fp, it)

Check warning on line 66 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L64-L66

Added lines #L64 - L66 were not covered by tests
}

// UpdateEnrichments creates a new UpdateOperation, inserts the provided
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
// queried by clients.
func (s *MatcherStore) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) {
func (s *MatcherStore) UpdateEnrichments(ctx context.Context, updater string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichments")
enIter := func(yield func(record *driver.EnrichmentRecord) error) error {
for i := range es {
if err := yield(&es[i]); err != nil {
return err

Check warning on line 77 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L72-L77

Added lines #L72 - L77 were not covered by tests
}
}
return nil

Check warning on line 80 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L80

Added line #L80 was not covered by tests
}
return s.updateEnrichments(ctx, updater, fp, enIter)

Check warning on line 82 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L82

Added line #L82 was not covered by tests
}

func (s *MatcherStore) updateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) {

Check warning on line 85 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L85

Added line #L85 was not covered by tests
const (
create = `
INSERT
Expand Down Expand Up @@ -134,17 +153,23 @@ DO

batch := microbatch.NewInsert(tx, 2000, time.Minute)
start = time.Now()
for i := range es {
hashKind, hash := hashEnrichment(&es[i])
enCt := 0
err = it(func(en *driver.EnrichmentRecord) error {
enCt++
hashKind, hash := hashEnrichment(en)

Check warning on line 159 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L156-L159

Added lines #L156 - L159 were not covered by tests
err := batch.Queue(ctx, insert,
hashKind, hash, name, es[i].Tags, es[i].Enrichment,
hashKind, hash, name, en.Tags, en.Enrichment,

Check warning on line 161 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L161

Added line #L161 was not covered by tests
)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to queue enrichment: %w", err)
return fmt.Errorf("failed to queue enrichment: %w", err)

Check warning on line 164 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L164

Added line #L164 was not covered by tests
}
if err := batch.Queue(ctx, assoc, hashKind, hash, name, id); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
return fmt.Errorf("failed to queue association: %w", err)

Check warning on line 167 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L167

Added line #L167 was not covered by tests
}
return nil

Check warning on line 169 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L169

Added line #L169 was not covered by tests
})
if err != nil {
return uuid.Nil, fmt.Errorf("iterating on enrichments: %w", err)

Check warning on line 172 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L171-L172

Added lines #L171 - L172 were not covered by tests
}
if err := batch.Done(ctx); err != nil {
return uuid.Nil, fmt.Errorf("failed to finish batch enrichment insert: %w", err)
Expand All @@ -160,7 +185,7 @@ DO
}
zlog.Debug(ctx).
Stringer("ref", ref).
Int("inserted", len(es)).
Int("inserted", enCt).

Check warning on line 188 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L188

Added line #L188 was not covered by tests
Msg("update_operation committed")
return ref, nil
}
Expand Down
67 changes: 53 additions & 14 deletions datastore/postgres/updatevulnerabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/quay/zlog"

"github.com/quay/claircore"
"github.com/quay/claircore/datastore"
"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/pkg/microbatch"
"github.com/quay/zlog"
)

var (
Expand Down Expand Up @@ -45,14 +45,33 @@ var (
)
)

type deltaOpts struct {
deletedVulns []string
vulns []*claircore.Vulnerability
}

// UpdateVulnerabilitiesIter implements vulnstore.Updater.
func (s *MatcherStore) UpdateVulnerabilitiesIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.VulnIter) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilitiesIter")
return s.updateVulnerabilities(ctx, updater, fp, it, nil)

Check warning on line 56 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L54-L56

Added lines #L54 - L56 were not covered by tests
}

// UpdateVulnerabilities implements vulnstore.Updater.
//
// It creates a new UpdateOperation for this update call, inserts the
// provided vulnerabilities and computes a diff comprising the removed
// and added vulnerabilities for this UpdateOperation.
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fp driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilities")
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, nil, false)
iterVulns := func(yield func(*claircore.Vulnerability) error) error {
for i := range vulns {
if err := yield(vulns[i]); err != nil {
return err

Check warning on line 69 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L69

Added line #L69 was not covered by tests
}
}
return nil
}
return s.updateVulnerabilities(ctx, updater, fp, iterVulns, nil)
}

// DeltaUpdateVulnerabilities implements vulnstore.Updater.
Expand All @@ -68,10 +87,23 @@ func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string
// - Associate new vulnerabilities with new updateOperation
func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.DeltaUpdateVulnerabilities")
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, deletedVulns, true)
deltaOpts := deltaOpts{
deletedVulns: deletedVulns,
vulns: vulns,
}
iterVulns := func(yield func(*claircore.Vulnerability) error) error {
for i := range vulns {
deltaOpts.vulns = append(deltaOpts.vulns)
if err := yield(vulns[i]); err != nil {
return err

Check warning on line 98 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L98

Added line #L98 was not covered by tests
}
}
return nil
}
return s.updateVulnerabilities(ctx, updater, fingerprint, iterVulns, &deltaOpts)
}

func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string, delta bool) (uuid.UUID, error) {
func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter func(yield func(*claircore.Vulnerability) error) error, deltaOpts *deltaOpts) (uuid.UUID, error) {
const (
// Create makes a new update operation and returns the reference and ID.
create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;`
Expand Down Expand Up @@ -139,6 +171,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err)
}

delta := deltaOpts != nil
updateVulnerabilitiesCounter.WithLabelValues("create", strconv.FormatBool(delta)).Add(1)
updateVulnerabilitiesDuration.WithLabelValues("create", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds())

Expand Down Expand Up @@ -181,13 +214,13 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
}

if len(oldVulns) > 0 {
for _, v := range vulns {
for _, v := range deltaOpts.vulns {
// If we have an existing vuln in the new batch
// delete it from the oldVulns map so it doesn't
// get associated with the new update_operation.
delete(oldVulns, v.Name)
}
for _, delName := range deletedVulns {
for _, delName := range deltaOpts.deletedVulns {
// If we have an existing vuln that has been signaled
// as deleted by the updater then delete it so it doesn't
// get associated with the new update_operation.
Expand All @@ -211,14 +244,15 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string

// batch insert vulnerabilities
skipCt := 0

vulnCt := 0
start = time.Now()

mBatcher := microbatch.NewInsert(tx, 2000, time.Minute)
for _, vuln := range vulns {
err = vulnIter(func(vuln *claircore.Vulnerability) error {
vulnCt++
if vuln.Package == nil || vuln.Package.Name == "" {
skipCt++
continue
return nil

Check warning on line 255 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L255

Added line #L255 was not covered by tests
}

pkg := vuln.Package
Expand All @@ -242,12 +276,17 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper,
)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err)
return fmt.Errorf("failed to queue vulnerability: %w", err)

Check warning on line 279 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L279

Added line #L279 was not covered by tests
}

if err := mBatcher.Queue(ctx, assoc, hashKind, hash, uoID); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
return fmt.Errorf("failed to queue association: %w", err)

Check warning on line 283 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L283

Added line #L283 was not covered by tests
}

return nil
})
if err != nil {
return uuid.Nil, fmt.Errorf("iterating on vulnerabilities: %w", err)

Check warning on line 289 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L289

Added line #L289 was not covered by tests
}
if err := mBatcher.Done(ctx); err != nil {
return uuid.Nil, fmt.Errorf("failed to finish batch vulnerability insert: %w", err)
Expand All @@ -266,7 +305,7 @@ func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string
zlog.Debug(ctx).
Str("ref", ref.String()).
Int("skipped", skipCt).
Int("inserted", len(vulns)-skipCt).
Int("inserted", vulnCt-skipCt).
Msg("update_operation committed")
return ref, nil
}
Expand Down
9 changes: 9 additions & 0 deletions datastore/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
"github.com/quay/claircore/libvuln/driver"
)

// VulnIter is a function for iterating on vulnerabilities.
//
// It accepts a callback function 'yield' to handle each vulnerability. If the callback
// returns an error, the iteration is halted, and the error is propagated to indicate cancellation.
type VulnIter func(yield func(*claircore.Vulnerability) error) error

// Updater is an interface exporting the necessary methods
// for updating a vulnerability database.
type Updater interface {
Expand All @@ -19,6 +25,9 @@ type Updater interface {
// vulnerabilities, and ensures vulnerabilities from previous updates are
// not queried by clients.
UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error)
// UpdateVulnerabilitiesIter performs the same operation as
// UpdateVulnerabilities, but accepting an iterator function.
UpdateVulnerabilitiesIter(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter VulnIter) (uuid.UUID, error)
// DeltaUpdateVulnerabilities creates a new UpdateOperation consisting of existing
// vulnerabilities and new vulnerabilities. It also takes an array of deleted
// vulnerability names which should no longer be available to query.
Expand Down

0 comments on commit e08f257

Please sign in to comment.