Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datastore: Add vuln & enrich stream updates #1315

Merged
merged 2 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions datastore/enrichment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@ import (
"github.com/quay/claircore/libvuln/driver"
)

// EnrichmentIter is an [Iter] of enrichment records.
type EnrichmentIter Iter[*driver.EnrichmentRecord]

// EnrichmentUpdater is an interface exporting the necessary methods
// for storing and querying Enrichments.
type EnrichmentUpdater interface {
// UpdateEnrichments creates a new EnrichmentUpdateOperation, inserts the provided
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
// queries by clients.
UpdateEnrichments(ctx context.Context, kind string, fingerprint driver.Fingerprint, enrichments []driver.EnrichmentRecord) (uuid.UUID, error)
// UpdateEnrichmentsIter performs the same operation as UpdateEnrichments, but
// accepting an iterator function.
UpdateEnrichmentsIter(ctx context.Context, kind string, fingerprint driver.Fingerprint, enIter EnrichmentIter) (uuid.UUID, error)
}

// Enrichment is an interface for querying enrichments from the store.
Expand Down
6 changes: 6 additions & 0 deletions datastore/matcher_store.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package datastore

// Iter is an iterator function that accepts a callback 'yield' to handle each
// iterator item. The consumer can signal the iterator to break or retry by
// returning an error. The iterator itself returns an error if the iteration
// cannot continue or was interrupted unexpectedly.
type Iter[T any] func(yield func(T, error) bool)

// MatcherStore aggregates all interface types
type MatcherStore interface {
Updater
Expand Down
46 changes: 38 additions & 8 deletions datastore/postgres/enrichment.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/quay/zlog"

"github.com/quay/claircore/datastore"
"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/pkg/microbatch"
)
Expand Down Expand Up @@ -60,10 +61,27 @@
)
)

func (s *MatcherStore) UpdateEnrichmentsIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichmentsIter")
return s.updateEnrichments(ctx, updater, fp, it)

Check warning on line 66 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L64-L66

Added lines #L64 - L66 were not covered by tests
}

// UpdateEnrichments creates a new UpdateOperation, inserts the provided
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
// queried by clients.
func (s *MatcherStore) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) {
func (s *MatcherStore) UpdateEnrichments(ctx context.Context, updater string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateEnrichments")
enIter := func(yield func(record *driver.EnrichmentRecord, err error) bool) {
for i := range es {
if !yield(&es[i], nil) {
break

Check warning on line 77 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L72-L77

Added lines #L72 - L77 were not covered by tests
}
}
}
return s.updateEnrichments(ctx, updater, fp, enIter)

Check warning on line 81 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L81

Added line #L81 was not covered by tests
}

func (s *MatcherStore) updateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, it datastore.EnrichmentIter) (uuid.UUID, error) {

Check warning on line 84 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L84

Added line #L84 was not covered by tests
const (
create = `
INSERT
Expand Down Expand Up @@ -134,17 +152,29 @@

batch := microbatch.NewInsert(tx, 2000, time.Minute)
start = time.Now()
for i := range es {
hashKind, hash := hashEnrichment(&es[i])
err := batch.Queue(ctx, insert,
hashKind, hash, name, es[i].Tags, es[i].Enrichment,
enCt := 0
it(func(en *driver.EnrichmentRecord, iterErr error) bool {
if iterErr != nil {
err = iterErr
return false

Check warning on line 159 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L155-L159

Added lines #L155 - L159 were not covered by tests
}
enCt++
hashKind, hash := hashEnrichment(en)
err = batch.Queue(ctx, insert,
hashKind, hash, name, en.Tags, en.Enrichment,

Check warning on line 164 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L161-L164

Added lines #L161 - L164 were not covered by tests
)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to queue enrichment: %w", err)
err = fmt.Errorf("failed to queue enrichment: %w", err)
return false

Check warning on line 168 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L167-L168

Added lines #L167 - L168 were not covered by tests
}
if err := batch.Queue(ctx, assoc, hashKind, hash, name, id); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
err = fmt.Errorf("failed to queue association: %w", err)
return false

Check warning on line 172 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L171-L172

Added lines #L171 - L172 were not covered by tests
}
return true

Check warning on line 174 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L174

Added line #L174 was not covered by tests
})
if err != nil {
return uuid.Nil, fmt.Errorf("iterating on enrichments: %w", err)

Check warning on line 177 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L176-L177

Added lines #L176 - L177 were not covered by tests
}
if err := batch.Done(ctx); err != nil {
return uuid.Nil, fmt.Errorf("failed to finish batch enrichment insert: %w", err)
Expand All @@ -160,7 +190,7 @@
}
zlog.Debug(ctx).
Stringer("ref", ref).
Int("inserted", len(es)).
Int("inserted", enCt).

Check warning on line 193 in datastore/postgres/enrichment.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/enrichment.go#L193

Added line #L193 was not covered by tests
Msg("update_operation committed")
return ref, nil
}
Expand Down
77 changes: 61 additions & 16 deletions datastore/postgres/updatevulnerabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"github.com/quay/zlog"

"github.com/quay/claircore"
"github.com/quay/claircore/datastore"
"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/pkg/microbatch"
)
Expand Down Expand Up @@ -45,14 +46,27 @@
)
)

// UpdateVulnerabilitiesIter implements vulnstore.Updater.
func (s *MatcherStore) UpdateVulnerabilitiesIter(ctx context.Context, updater string, fp driver.Fingerprint, it datastore.VulnerabilityIter) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilitiesIter")
return s.updateVulnerabilities(ctx, updater, fp, it, nil)

Check warning on line 52 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L50-L52

Added lines #L50 - L52 were not covered by tests
}

// UpdateVulnerabilities implements vulnstore.Updater.
//
// It creates a new UpdateOperation for this update call, inserts the
// provided vulnerabilities and computes a diff comprising the removed
// and added vulnerabilities for this UpdateOperation.
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
func (s *MatcherStore) UpdateVulnerabilities(ctx context.Context, updater string, fp driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.UpdateVulnerabilities")
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, nil, false)
iterVulns := func(yield func(*claircore.Vulnerability, error) bool) {
for i := range vulns {
if !yield(vulns[i], nil) {
break

Check warning on line 65 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L65

Added line #L65 was not covered by tests
}
}
}
return s.updateVulnerabilities(ctx, updater, fp, iterVulns, nil)
}

// DeltaUpdateVulnerabilities implements vulnstore.Updater.
Expand All @@ -68,10 +82,24 @@
// - Associate new vulnerabilities with new updateOperation
func (s *MatcherStore) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string) (uuid.UUID, error) {
ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/MatcherStore.DeltaUpdateVulnerabilities")
return s.updateVulnerabilities(ctx, updater, fingerprint, vulns, deletedVulns, true)
iterVulns := func(yield func(*claircore.Vulnerability, error) bool) {
for i := range vulns {
if !yield(vulns[i], nil) {
break

Check warning on line 88 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L88

Added line #L88 was not covered by tests
}
}
}
delVulns := func(yield func(string, error) bool) {
for _, s := range deletedVulns {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other implementations above used the index i instead. Curious why the change here?

if !yield(s, nil) {
break

Check warning on line 95 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L95

Added line #L95 was not covered by tests
}
}
}
return s.updateVulnerabilities(ctx, updater, fingerprint, iterVulns, delVulns)
}

func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deletedVulns []string, delta bool) (uuid.UUID, error) {
func (s *MatcherStore) updateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter datastore.VulnerabilityIter, delIter datastore.Iter[string]) (uuid.UUID, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delIter is a bit confusing to me because it makes me think it's short for delta instead of delete. Can we make it more explicit and call it deleteIter?

const (
// Create makes a new update operation and returns the reference and ID.
create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;`
Expand Down Expand Up @@ -139,6 +167,7 @@
return uuid.Nil, fmt.Errorf("failed to create update_operation: %w", err)
}

delta := delIter != nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used?

updateVulnerabilitiesCounter.WithLabelValues("create", strconv.FormatBool(delta)).Add(1)
updateVulnerabilitiesDuration.WithLabelValues("create", strconv.FormatBool(delta)).Observe(time.Since(start).Seconds())

Expand Down Expand Up @@ -181,18 +210,20 @@
}

if len(oldVulns) > 0 {
for _, v := range vulns {
vulnIter(func(v *claircore.Vulnerability, _ error) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this and delIter make assumptions about the passed in err that it's safe to ignore?

// If we have an existing vuln in the new batch
// delete it from the oldVulns map so it doesn't
// get associated with the new update_operation.
delete(oldVulns, v.Name)
}
for _, delName := range deletedVulns {
return true
})
delIter(func(delName string, _ error) bool {
// If we have an existing vuln that has been signaled
// as deleted by the updater then delete it so it doesn't
// get associated with the new update_operation.
delete(oldVulns, delName)
}
return true
})
}
start = time.Now()
// Associate already existing vulnerabilities with new update_operation.
Expand All @@ -211,14 +242,20 @@

// batch insert vulnerabilities
skipCt := 0

vulnCt := 0
start = time.Now()

mBatcher := microbatch.NewInsert(tx, 2000, time.Minute)
for _, vuln := range vulns {

vulnIter(func(vuln *claircore.Vulnerability, iterErr error) bool {
if iterErr != nil {
err = iterErr
return false

Check warning on line 253 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L252-L253

Added lines #L252 - L253 were not covered by tests
}
vulnCt++
if vuln.Package == nil || vuln.Package.Name == "" {
skipCt++
continue
return true

Check warning on line 258 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L258

Added line #L258 was not covered by tests
}

pkg := vuln.Package
Expand All @@ -233,7 +270,7 @@
hashKind, hash := md5Vuln(vuln)
vKind, vrLower, vrUpper := rangefmt(vuln.Range)

err := mBatcher.Queue(ctx, insert,
err = mBatcher.Queue(ctx, insert,
hashKind, hash,
vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity,
pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind,
Expand All @@ -242,12 +279,20 @@
vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper,
)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to queue vulnerability: %w", err)
err = fmt.Errorf("failed to queue vulnerability: %w", err)
return false

Check warning on line 283 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L282-L283

Added lines #L282 - L283 were not covered by tests
}

if err := mBatcher.Queue(ctx, assoc, hashKind, hash, uoID); err != nil {
return uuid.Nil, fmt.Errorf("failed to queue association: %w", err)
err = mBatcher.Queue(ctx, assoc, hashKind, hash, uoID)
if err != nil {
err = fmt.Errorf("failed to queue association: %w", err)
return false

Check warning on line 289 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L288-L289

Added lines #L288 - L289 were not covered by tests
}

return true
})
if err != nil {
return uuid.Nil, fmt.Errorf("iterating on vulnerabilities: %w", err)

Check warning on line 295 in datastore/postgres/updatevulnerabilities.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/updatevulnerabilities.go#L295

Added line #L295 was not covered by tests
}
if err := mBatcher.Done(ctx); err != nil {
return uuid.Nil, fmt.Errorf("failed to finish batch vulnerability insert: %w", err)
Expand All @@ -266,7 +311,7 @@
zlog.Debug(ctx).
Str("ref", ref.String()).
Int("skipped", skipCt).
Int("inserted", len(vulns)-skipCt).
Int("inserted", vulnCt-skipCt).
Msg("update_operation committed")
return ref, nil
}
Expand Down
6 changes: 6 additions & 0 deletions datastore/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/quay/claircore/libvuln/driver"
)

// VulnerabilityIter is an [Iter] of vulnerabilities.
type VulnerabilityIter Iter[*claircore.Vulnerability]

// Updater is an interface exporting the necessary methods
// for updating a vulnerability database.
type Updater interface {
Expand All @@ -19,6 +22,9 @@ type Updater interface {
// vulnerabilities, and ensures vulnerabilities from previous updates are
// not queried by clients.
UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error)
// UpdateVulnerabilitiesIter performs the same operation as
// UpdateVulnerabilities, but accepting an iterator function.
UpdateVulnerabilitiesIter(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter VulnerabilityIter) (uuid.UUID, error)
Comment on lines +25 to +27
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// UpdateVulnerabilitiesIter performs the same operation as
// UpdateVulnerabilities, but accepting an iterator function.
UpdateVulnerabilitiesIter(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter VulnerabilityIter) (uuid.UUID, error)
// UpdateVulnerabilitiesIter performs the same operation as
// UpdateVulnerabilities, but accepting an iterator function.
//
// The passed iterator must be able to iterate over the sequence multiple times.
UpdateVulnerabilitiesIter(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulnIter VulnerabilityIter) (uuid.UUID, error)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// The passed iterator must be able to iterate over the sequence multiple times.

That's not true for UpdateVulnerabilitiesIter. It would be for a potential DeltaUpdateVulnerabilitiesIter, but this patch does not aim to support that.

// DeltaUpdateVulnerabilities creates a new UpdateOperation consisting of existing
// vulnerabilities and new vulnerabilities. It also takes an array of deleted
// vulnerability names which should no longer be available to query.
Expand Down
10 changes: 10 additions & 0 deletions libvuln/jsonblob/jsonblob.go
jvdm marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,16 @@
return uuid.Nil, nil
}

// UpdateEnrichmentsIter is unimplemented.
func (s *Store) UpdateEnrichmentsIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.EnrichmentIter) (uuid.UUID, error) {
return uuid.Nil, errors.ErrUnsupported

Check warning on line 460 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L459-L460

Added lines #L459 - L460 were not covered by tests
}

// UpdateVulnerabilitiesIter is unimplemented.
func (s *Store) UpdateVulnerabilitiesIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.VulnerabilityIter) (uuid.UUID, error) {
return uuid.Nil, errors.ErrUnsupported

Check warning on line 465 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L464-L465

Added lines #L464 - L465 were not covered by tests
}

var bufPool sync.Pool

func getBuf() []byte {
Expand Down