Skip to content

Commit

Permalink
enrichments: datamodel updates
Browse files Browse the repository at this point in the history
this commit introduces the necessary changes to support the
enrichment specification's data model changes.

see: https://github.com/quay/clair-enrichment-spec/

In addition to this, a small bug was fixed in the updater code
where errors were not being logged.
  • Loading branch information
ldelossa committed May 4, 2021
1 parent 5d50fc6 commit 06ef7fd
Show file tree
Hide file tree
Showing 16 changed files with 209 additions and 126 deletions.
21 changes: 8 additions & 13 deletions debian/matcher_integration_test.go
Expand Up @@ -6,15 +6,14 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/quay/zlog"

"github.com/quay/claircore"
"github.com/quay/claircore/internal/matcher"
"github.com/quay/claircore/internal/updater"
vulnstore "github.com/quay/claircore/internal/vulnstore/postgres"
"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/libvuln/updates"
"github.com/quay/claircore/test/integration"
)

Expand All @@ -29,19 +28,15 @@ func TestMatcherIntegration(t *testing.T) {
store := vulnstore.NewVulnStore(pool)

m := &Matcher{}
// seed the test vulnstore with CVE data
ch := make(chan driver.Updater)
go func() {
ch <- NewUpdater(Buster)
close(ch)
}()
exec := updater.Online{Pool: pool}

mgr, err := updates.NewManager(ctx, store, pool, nil, updates.WithEnabled(
[]string{"debian"}))

// force update
tctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := exec.Run(tctx, ch); err != nil {
t.Error(err)
if err := mgr.Run(ctx); err != nil {
t.Fatal(err)
}

path := filepath.Join("testdata", "indexreport-buster-jackson-databind.json")
f, err := os.Open(path)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/updater/controller.go
Expand Up @@ -67,7 +67,7 @@ func driveUpdater(ctx context.Context, u driver.Updater, s vulnstore.Updater) er
var prevFP driver.Fingerprint
// Get previous fingerprint, if present.
// A fingerprint being missing is not an error.
opmap, err := s.GetUpdateOperations(ctx, name)
opmap, err := s.GetUpdateOperations(ctx, driver.VulnerabilityKind, name)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/vulnstore/postgres/gc_test.go
Expand Up @@ -118,7 +118,7 @@ func TestGC(t *testing.T) {
}

// confirm update operations exist
ops, err := store.GetUpdateOperations(ctx)
ops, err := store.GetUpdateOperations(ctx, driver.VulnerabilityKind)
if err != nil {
t.Fatalf("failed obtaining update ops: %v", err)
}
Expand All @@ -139,7 +139,7 @@ func TestGC(t *testing.T) {
if tt.updateOps < tt.keep {
wantKeep = tt.updateOps
}
ops, err = store.GetUpdateOperations(ctx)
ops, err = store.GetUpdateOperations(ctx, driver.VulnerabilityKind)
if err != nil {
t.Fatalf("failed obtaining update ops: %v", err)
}
Expand Down
34 changes: 27 additions & 7 deletions internal/vulnstore/postgres/getupdateoperationdiff.go
Expand Up @@ -55,7 +55,14 @@ var (
)
)

func getUpdateDiff(ctx context.Context, pool *pgxpool.Pool, prev, cur uuid.UUID) (*driver.UpdateDiff, error) {
func (s *Store) GetUpdateDiff(ctx context.Context, prev, cur uuid.UUID) (*driver.UpdateDiff, error) {
// confirmRefs will return a row only if both refs are kind = 'vulnerability'
// therefore, if a pgx.ErrNoRows is returned from this query, at least one
// of the incoming refs is not of kind = 'vulnerability'.
const confirmRefs = `
SELECT 1
WHERE ROW ('vulnerability') = ALL (SELECT kind FROM update_operation WHERE ref = $1 OR ref = $2);
`
// Query takes two update IDs and returns rows that only exist in first
// argument's set of vulnerabilities.
const query = `WITH
Expand Down Expand Up @@ -101,15 +108,28 @@ func getUpdateDiff(ctx context.Context, pool *pgxpool.Pool, prev, cur uuid.UUID)
if cur == uuid.Nil {
return nil, errors.New("nil uuid is invalid as \"current\" endpoint")
}
var diff driver.UpdateDiff
if err := populateRefs(ctx, &diff, pool, prev, cur); err != nil {
return nil, err

// confirm both refs are of type == 'vulnerability'
start := time.Now()
rows, err := s.pool.Query(ctx, confirmRefs, cur, prev)
switch err {
case nil:
rows.Close()
case pgx.ErrNoRows:
return nil, fmt.Errorf("provided ref was not of kind 'vulnerability'")
default:
return nil, fmt.Errorf("failed to confirm update op ref types: %w", err)
}
getUpdateDiffCounter.WithLabelValues("confirmrefs").Add(1)
getUpdateDiffDuration.WithLabelValues("confirmrefs").Observe(time.Since(start).Seconds())

// Retrieve added first.
start := time.Now()
var diff driver.UpdateDiff
if err := populateRefs(ctx, &diff, s.pool, prev, cur); err != nil {
return nil, err
}

rows, err := pool.Query(ctx, query, cur, prev)
rows, err = s.pool.Query(ctx, query, cur, prev)
if err != nil {
return nil, fmt.Errorf("failed to retrieve added vulnerabilities: %w", err)
}
Expand Down Expand Up @@ -140,7 +160,7 @@ func getUpdateDiff(ctx context.Context, pool *pgxpool.Pool, prev, cur uuid.UUID)
if prev == uuid.Nil {
return &diff, nil
}
rows, err = pool.Query(ctx, query, prev, cur)
rows, err = s.pool.Query(ctx, query, prev, cur)
if err != nil {
return nil, fmt.Errorf("failed to retrieve removed vulnerabilities: %w", err)
}
Expand Down
171 changes: 121 additions & 50 deletions internal/vulnstore/postgres/getupdateoperations.go
Expand Up @@ -76,21 +76,95 @@ var (
)

// GetLatestUpdateRef implements driver.Updater.
func (s *Store) GetLatestUpdateRef(ctx context.Context) (uuid.UUID, error) {
const query = `SELECT ref FROM update_operation ORDER BY id USING > LIMIT 1;`
func (s *Store) GetLatestUpdateRef(ctx context.Context, kind driver.UpdateKind) (uuid.UUID, error) {
const (
query = `SELECT ref FROM update_operation ORDER BY id USING > LIMIT 1;`
queryEnrichment = `SELECT ref FROM update_operation WHERE kind = 'enrichment' ORDER BY id USING > LIMIT 1;`
queryVulnerability = `SELECT ref FROM update_operation WHERE kind = 'vulnerability' ORDER BY id USING > LIMIT 1;`
)
ctx = baggage.ContextWithValues(ctx,
label.String("component", "internal/vulnstore/postgres/getLatestRef"))

var q string
var label string
switch kind {
case "":
q = query
label = "query"
case driver.EnrichmentKind:
q = queryEnrichment
label = "query_enrichment"
case driver.VulnerabilityKind:
q = queryVulnerability
label = "query_vulnerability"
}

var ref uuid.UUID
start := time.Now()
if err := s.pool.QueryRow(ctx, query).Scan(&ref); err != nil {
if err := s.pool.QueryRow(ctx, q).Scan(&ref); err != nil {
return uuid.Nil, err
}
getLatestUpdateRefCounter.WithLabelValues("query").Add(1)
getLatestUpdateRefDuration.WithLabelValues("query").Observe(time.Since(start).Seconds())
getLatestUpdateRefCounter.WithLabelValues(label).Add(1)
getLatestUpdateRefDuration.WithLabelValues(label).Observe(time.Since(start).Seconds())
return ref, nil
}

func (s *Store) GetLatestUpdateRefs(ctx context.Context, kind driver.UpdateKind) (map[string][]driver.UpdateOperation, error) {
const (
query = `SELECT DISTINCT ON (updater) updater, ref, fingerprint, date FROM update_operation ORDER BY updater, id USING >;`
queryEnrichment = `SELECT DISTINCT ON (updater) updater, ref, fingerprint, date FROM update_operation WHERE kind = 'enrichment' ORDER BY updater, id USING >;`
queryVulnerability = `SELECT DISTINCT ON (updater) updater, ref, fingerprint, date FROM update_operation WHERE kind = 'enrichment' ORDER BY updater, id USING >;`
)

var q string
var label string
switch kind {
case "":
q = query
label = "query"
case driver.EnrichmentKind:
q = queryEnrichment
label = "query_enrichment"
case driver.VulnerabilityKind:
q = queryVulnerability
label = "query_vulnerability"
}

start := time.Now()

rows, err := s.pool.Query(ctx, q)
if err != nil {
return nil, err
}

getLatestRefsCounter.WithLabelValues(label).Add(1)
getLatestRefsDuration.WithLabelValues(label).Observe(time.Since(start).Seconds())

defer rows.Close()

ret := make(map[string][]driver.UpdateOperation)
for rows.Next() {
ops := []driver.UpdateOperation{}
ops = append(ops, driver.UpdateOperation{})
uo := &ops[len(ops)-1]
err := rows.Scan(
&uo.Updater,
&uo.Ref,
&uo.Fingerprint,
&uo.Date,
)
if err != nil {
rows.Close()
return nil, fmt.Errorf("failed to scan update operation for updater %q: %w", uo.Updater, err)
}
ret[uo.Updater] = ops
}
zlog.Debug(ctx).
Int("count", len(ret)).
Msg("found updaters")
return ret, nil
}

func getLatestRefs(ctx context.Context, pool *pgxpool.Pool) (map[string][]driver.UpdateOperation, error) {
const query = `SELECT DISTINCT ON (updater) updater, ref, fingerprint, date FROM update_operation ORDER BY updater, id USING >;`
ctx = baggage.ContextWithValues(ctx,
Expand Down Expand Up @@ -131,15 +205,17 @@ func getLatestRefs(ctx context.Context, pool *pgxpool.Pool) (map[string][]driver
return ret, nil
}

func getUpdateOperations(ctx context.Context, pool *pgxpool.Pool, updater ...string) (map[string][]driver.UpdateOperation, error) {
func (s *Store) GetUpdateOperations(ctx context.Context, kind driver.UpdateKind, updater ...string) (map[string][]driver.UpdateOperation, error) {
const (
query = `SELECT ref, updater, fingerprint, date FROM update_operation WHERE updater = $1 ORDER BY id DESC;`
getUpdaters = `SELECT DISTINCT(updater) FROM update_operation;`
query = `SELECT ref, updater, fingerprint, date FROM update_operation WHERE updater = ANY($1) ORDER BY id DESC;`
queryVulnerability = `SELECT ref, updater, fingerprint, date FROM update_operation WHERE updater = ANY($1) AND kind = 'vulnerability' ORDER BY id DESC;`
queryEnrichment = `SELECT ref, updater, fingerprint, date FROM update_operation WHERE updater = ANY($1) AND kind = 'enrichment' ORDER BY id DESC;`
getUpdaters = `SELECT DISTINCT(updater) FROM update_operation;`
)
ctx = baggage.ContextWithValues(ctx,
label.String("component", "internal/vulnstore/postgres/getUpdateOperations"))

tx, err := pool.Begin(ctx)
tx, err := s.pool.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}
Expand All @@ -156,7 +232,7 @@ func getUpdateOperations(ctx context.Context, pool *pgxpool.Pool, updater ...str
switch {
case err == nil:
case errors.Is(err, pgx.ErrNoRows):
return nil, nil
return out, nil
default:
return nil, fmt.Errorf("failed to get distinct updates: %w", err)
}
Expand All @@ -180,50 +256,45 @@ func getUpdateOperations(ctx context.Context, pool *pgxpool.Pool, updater ...str
rows.Close()
}

// Take care to close the rows object on every iteration.
var rows pgx.Rows
for _, u := range updater {
var q string
var label string
switch kind {
case "":
q = query
label = "query"
case driver.EnrichmentKind:
q = queryEnrichment
label = "query_enrichment"
case driver.VulnerabilityKind:
q = queryVulnerability
label = "query_vulnerability"
}

start := time.Now()
start := time.Now()
rows, err := tx.Query(ctx, q, updater)
switch {
case err == nil:
case errors.Is(err, pgx.ErrNoRows):
return nil, nil
default:
return nil, fmt.Errorf("failed to get distinct updates: %w", err)
}
getUpdateOperationsCounter.WithLabelValues(label).Add(1)
getUpdateOperationsDuration.WithLabelValues(label).Observe(time.Since(start).Seconds())

rows, err = tx.Query(ctx, query, u)
switch {
case err == nil:
case errors.Is(err, pgx.ErrNoRows):
zlog.Warn(ctx).Str("updater", u).Msg("no update operations for this updater")
rows.Close()
continue
default:
for rows.Next() {
var uo driver.UpdateOperation
err := rows.Scan(
&uo.Ref,
&uo.Updater,
&uo.Fingerprint,
&uo.Date,
)
if err != nil {
rows.Close()
return nil, fmt.Errorf("failed to retrieve update operation for updater %v: %w", updater, err)
}
ops := []driver.UpdateOperation{}

getUpdateOperationsCounter.WithLabelValues("query").Add(1)
getUpdateOperationsDuration.WithLabelValues("query").Observe(time.Since(start).Seconds())

for rows.Next() {
ops = append(ops, driver.UpdateOperation{})
uo := &ops[len(ops)-1]
err := rows.Scan(
&uo.Ref,
&uo.Updater,
&uo.Fingerprint,
&uo.Date,
)
if err != nil {
rows.Close()
return nil, fmt.Errorf("failed to scan update operation for updater %q: %w", u, err)
}
}
rows.Close()
if err := rows.Err(); err != nil {
return nil, err
return nil, fmt.Errorf("failed to scan update operation for updater %q: %w", uo.Updater, err)
}
out[u] = ops
}
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("failed to commit transaction: %w", err)
out[uo.Updater] = append(out[uo.Updater], uo)
}
return out, nil
}
17 changes: 0 additions & 17 deletions internal/vulnstore/postgres/store.go
Expand Up @@ -37,11 +37,6 @@ func (s *Store) UpdateVulnerabilities(ctx context.Context, updater string, finge
return updateVulnerabilites(ctx, s.pool, updater, fingerprint, vulns)
}

// GetUpdateOperations implements vulnstore.Updater.
func (s *Store) GetUpdateOperations(ctx context.Context, updater ...string) (map[string][]driver.UpdateOperation, error) {
return getUpdateOperations(ctx, s.pool, updater...)
}

// DeleteUpdateOperations implements vulnstore.Updater.
func (s *Store) DeleteUpdateOperations(ctx context.Context, id ...uuid.UUID) (int64, error) {
const query = `DELETE FROM update_operation WHERE ref = ANY($1::uuid[]);`
Expand All @@ -64,18 +59,6 @@ func (s *Store) DeleteUpdateOperations(ctx context.Context, id ...uuid.UUID) (in
return tag.RowsAffected(), nil
}

// GetUpdateOperationDiff implements vulnstore.Updater.
func (s *Store) GetUpdateOperationDiff(ctx context.Context, a, b uuid.UUID) (*driver.UpdateDiff, error) {
return getUpdateDiff(ctx, s.pool, a, b)
}
func (s *Store) GetUpdateDiff(ctx context.Context, a, b uuid.UUID) (*driver.UpdateDiff, error) {
return getUpdateDiff(ctx, s.pool, a, b)
}

func (s *Store) GetLatestUpdateRefs(ctx context.Context) (map[string][]driver.UpdateOperation, error) {
return getLatestRefs(ctx, s.pool)
}

// Get implements vulnstore.Vulnerability.
func (s *Store) Get(ctx context.Context, records []*claircore.IndexRecord, opts vulnstore.GetOpts) (map[string][]*claircore.Vulnerability, error) {
vulns, err := get(ctx, s.pool, records, opts)
Expand Down
2 changes: 1 addition & 1 deletion internal/vulnstore/postgres/update_e2e_test.go
Expand Up @@ -143,7 +143,7 @@ func (e *e2e) Update(ctx context.Context) func(*testing.T) {
func (e *e2e) GetUpdateOperations(ctx context.Context) func(*testing.T) {
return func(t *testing.T) {
ctx := zlog.Test(ctx, t)
out, err := e.s.GetUpdateOperations(ctx, e.updater)
out, err := e.s.GetUpdateOperations(ctx, driver.VulnerabilityKind, e.updater)
if err != nil {
t.Fatalf("failed to get UpdateOperations: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/vulnstore/postgres/updatevulnerabilities.go
Expand Up @@ -54,7 +54,7 @@ var (
func updateVulnerabilites(ctx context.Context, pool *pgxpool.Pool, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (uuid.UUID, error) {
const (
// Create makes a new update operation and returns the reference and ID.
create = `INSERT INTO update_operation (updater, fingerprint) VALUES ($1, $2) RETURNING id, ref;`
create = `INSERT INTO update_operation (updater, fingerprint, kind) VALUES ($1, $2, 'vulnerability') RETURNING id, ref;`
// Insert attempts to create a new vulnerability. It fails silently.
insert = `
INSERT INTO vuln (
Expand Down

0 comments on commit 06ef7fd

Please sign in to comment.