Skip to content

Commit

Permalink
jsonblob: Support per updater iterators
Browse files Browse the repository at this point in the history
Signed-off-by: J. Victor Martins <jvdm@sdf.org>
  • Loading branch information
jvdm committed Apr 19, 2024
1 parent a5bed27 commit 0feda8e
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 49 deletions.
183 changes: 134 additions & 49 deletions libvuln/jsonblob/jsonblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,70 +45,97 @@ type Store struct {
}

// Load reads in all the records serialized in the provided [io.Reader].
func Load(ctx context.Context, r io.Reader) (*Loader, error) {
func Load(_ context.Context, r io.Reader) (*Loader, error) {
l := Loader{
dec: json.NewDecoder(r),
cur: uuid.Nil,
}
l.de.Ref = uuid.Nil
return &l, nil
}

// Loader is an iterator that returns a series of [Entry].
// Loader is an iterator that returns objects grouped by update operations from
// the [Store].
//
// Users should call [*Loader.Next] until it reports false, then check for
// errors via [*Loader.Err].
// Users should either call [*Loader.Next] or [*Loader.NextIter], and if they
// report false, check for errors via [*Loader.Err].
//
// 1. Use [*Loader.Next] to read all objects for a single update operation into
// an [Entry]. Use [*Loader.Entry] to read it.
//
// 2. Use [*Loader.NextIter] to iterate over each object for a single update
// operation. Use [*Loader.Iter] get the iterator (see [iter] for more).
type Loader struct {
err error
e *Entry

dec *json.Decoder
next *Entry
de diskEntry
cur uuid.UUID
dec *json.Decoder
de diskEntry
cur uuid.UUID
}

// Next reports whether there's an [Entry] to be processed.
func (l *Loader) Next() bool {
// iter wraps a disk entry with methods to read the next object for the same
// update operation.
//
// Users are expected to call [*iter.Next()] to read and parse the next object,
// which can be retrieved using [iter.Vulnerability] or [iter.Enrichment] based
// on the update kind. Errors are reported in the [Loader.Err].
type iter struct {
*diskEntry

// Copied when disk entry is first assigned.
Updater string
Fingerprint driver.Fingerprint
Kind driver.UpdateKind

Vulnerability *claircore.Vulnerability
Enrichment *driver.EnrichmentRecord

loader *Loader
}

// NextIter reports if there is a new set of update operations to iterate on. If
// called in the middle of an iteration, all objects for the current operation
// are discarded.
func (l *Loader) NextIter() bool {
if l.err != nil {
return false
}

for l.err = l.dec.Decode(&l.de); l.err == nil; l.err = l.dec.Decode(&l.de) {
id := l.de.Ref
// If we just hit a new Entry, promote the current one.
if id != l.cur {
l.e = l.next
l.next = &Entry{}
l.next.Updater = l.de.Updater
l.next.Fingerprint = l.de.Fingerprint
l.next.Date = l.de.Date
if l.cur == l.de.Ref {
// Read until we are not on the same update operation.
for l.readNext() {
}
switch l.de.Kind {
}
if l.err != nil {
return false
}
// Mark we are in a new update operation.
l.cur = l.de.Ref
return true
}

// Next reports whether there's an [Entry] to be processed.
func (l *Loader) Next() bool {
if !l.NextIter() {
return false
}
e := Entry{}
it := l.Iter()
for it.Next() {
e.CommonEntry = it.CommonEntry
switch it.Kind {
case driver.VulnerabilityKind:
vuln := claircore.Vulnerability{}
if err := json.Unmarshal(l.de.Vuln.buf, &vuln); err != nil {
l.err = err
return false
}
l.next.Vuln = append(l.next.Vuln, &vuln)
e.Vuln = append(e.Vuln, it.Vulnerability)
case driver.EnrichmentKind:
en := driver.EnrichmentRecord{}
if err := json.Unmarshal(l.de.Enrichment.buf, &en); err != nil {
l.err = err
return false
}
l.next.Enrichment = append(l.next.Enrichment, en)
}
// If this was an initial diskEntry, promote the ref.
if id != l.cur {
l.cur = id
// If we have an Entry ready, report that.
if l.e != nil {
return true
}
e.Enrichment = append(e.Enrichment, *it.Enrichment)
default:
l.err = fmt.Errorf("unknown entry type: %s", it.Kind)
break

Check warning on line 132 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L130-L132

Added lines #L130 - L132 were not covered by tests
}
}
l.e = l.next
if l.Err() != nil {
return false

Check warning on line 136 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L136

Added line #L136 was not covered by tests
}
l.e = &e
return true
}

Expand All @@ -117,6 +144,54 @@ func (l *Loader) Entry() *Entry {
return l.e
}

// Iter returns an iterator for read all objects for the current update operation.
func (l *Loader) Iter() *iter {
return &iter{
Updater: l.de.Updater,
Fingerprint: l.de.Fingerprint,
Kind: l.de.Kind,
loader: l,
}
}

// Next reports if there is a new object available in this iterator.
func (i *iter) Next() bool {
if i.loader.err != nil {
return false

Check warning on line 160 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L160

Added line #L160 was not covered by tests
}
if i.diskEntry != nil {
// Only need the next disk entry if not the first iteration.
if !i.loader.readNext() {
return false
}
}
i.diskEntry = &i.loader.de
switch i.Kind {
case driver.VulnerabilityKind:
vuln := claircore.Vulnerability{}
if err := json.Unmarshal(i.diskEntry.Vuln.buf, &vuln); err != nil {
i.loader.err = err
return false

Check warning on line 174 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L173-L174

Added lines #L173 - L174 were not covered by tests
}
i.Vulnerability = &vuln
case driver.EnrichmentKind:
en := driver.EnrichmentRecord{}
if err := json.Unmarshal(i.diskEntry.Enrichment.buf, &en); err != nil {
i.loader.err = err
return false

Check warning on line 181 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L180-L181

Added lines #L180 - L181 were not covered by tests
}
i.Enrichment = &en
}
return true
}

// readNext will decode the next [diskEntry] from the store, and return true
// only if there were no errors and the entry belongs to the current update operation.
func (l *Loader) readNext() bool {
l.err = l.dec.Decode(&l.de)
return l.err == nil && l.cur == l.de.Ref
}

// Err is the latest encountered error.
func (l *Loader) Err() error {
// Don't report EOF as an error.
Expand Down Expand Up @@ -441,18 +516,28 @@ func (s *Store) UpdateEnrichments(ctx context.Context, kind string, fp driver.Fi
}

// RecordUpdaterStatus is unimplemented.
func (s *Store) RecordUpdaterStatus(ctx context.Context, updaterName string, updateTime time.Time, fingerprint driver.Fingerprint, updaterError error) error {
return nil
func (s *Store) RecordUpdaterStatus(_ context.Context, _ string, _ time.Time, _ driver.Fingerprint, _ error) error {
return errors.ErrUnsupported

Check warning on line 520 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L519-L520

Added lines #L519 - L520 were not covered by tests
}

// RecordUpdaterSetStatus is unimplemented.
func (s *Store) RecordUpdaterSetStatus(ctx context.Context, updaterSet string, updateTime time.Time) error {
return nil
func (s *Store) RecordUpdaterSetStatus(_ context.Context, _ string, _ time.Time) error {
return errors.ErrUnsupported

Check warning on line 525 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L524-L525

Added lines #L524 - L525 were not covered by tests
}

// DeltaUpdateVulnerabilities is a noop
func (s *Store) DeltaUpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability, deleted []string) (uuid.UUID, error) {
return uuid.Nil, nil
func (s *Store) DeltaUpdateVulnerabilities(_ context.Context, _ string, _ driver.Fingerprint, _ []*claircore.Vulnerability, _ []string) (uuid.UUID, error) {
return uuid.Nil, errors.ErrUnsupported

Check warning on line 530 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L529-L530

Added lines #L529 - L530 were not covered by tests
}

// UpdateEnrichmentsIter is unimplemented.
func (s *Store) UpdateEnrichmentsIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.EnrichmentIter) (uuid.UUID, error) {
return uuid.Nil, errors.ErrUnsupported

Check warning on line 535 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L534-L535

Added lines #L534 - L535 were not covered by tests
}

// UpdateVulnerabilitiesIter is unimplemented.
func (s *Store) UpdateVulnerabilitiesIter(_ context.Context, _ string, _ driver.Fingerprint, _ datastore.VulnerabilityIter) (uuid.UUID, error) {
return uuid.Nil, errors.ErrUnsupported

Check warning on line 540 in libvuln/jsonblob/jsonblob.go

View check run for this annotation

Codecov / codecov/patch

libvuln/jsonblob/jsonblob.go#L539-L540

Added lines #L539 - L540 were not covered by tests
}

var bufPool sync.Pool
Expand Down
69 changes: 69 additions & 0 deletions libvuln/jsonblob/jsonblob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,72 @@ func TestEnrichments(t *testing.T) {
}
t.Logf("wrote:\n%s", buf.String())
}

func TestNextIter(t *testing.T) {
ctx := context.Background()
a, err := New()
if err != nil {
t.Fatal(err)
}

var want, got struct {
V map[string][]*claircore.Vulnerability
}

want.V = make(map[string][]*claircore.Vulnerability)
got.V = make(map[string][]*claircore.Vulnerability)

foo := test.GenUniqueVulnerabilities(10, "foo")
bar := test.GenUniqueVulnerabilities(10, "bar")

ref, err := a.UpdateVulnerabilities(ctx, "foo", "", foo)
if err != nil {
t.Error(err)
}
t.Logf("ref: %v", ref)

ref, err = a.UpdateVulnerabilities(ctx, "bar", "", bar)
if err != nil {
t.Error(err)
}
t.Logf("ref: %v", ref)

// We will break the foo vulns at the 5th element.
const fooBreak = 5
want.V["foo"] = foo[:fooBreak]
want.V["bar"] = bar

var buf bytes.Buffer
defer func() {
t.Logf("wrote:\n%s", buf.String())
}()
r, w := io.Pipe()
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { defer w.Close(); return a.Store(w) })
eg.Go(func() error {
l, err := Load(ctx, io.TeeReader(r, &buf))
if err != nil {
return err
}
for l.NextIter() {
it := l.Iter()
for i := 0; it.Next(); i++ {
if it.Updater == "foo" && i == fooBreak {
t.Logf("breaking foo at %d:\n%s", i, it.Vulnerability.Name)
break
}
got.V[it.Updater] = append(got.V[it.Updater], it.Vulnerability)
}
}
if err := l.Err(); err != nil {
return err
}
return nil
})
if err := eg.Wait(); err != nil {
t.Error(err)
}
if !cmp.Equal(got, want) {
t.Error(cmp.Diff(got, want))
}
}

0 comments on commit 0feda8e

Please sign in to comment.