From 11b46764e5455c5d555704c055f8a2e7b0b8c31f Mon Sep 17 00:00:00 2001 From: Hank Donnay Date: Thu, 28 May 2020 16:50:23 -0400 Subject: [PATCH 1/3] indexer: add Configurable interface Signed-off-by: Hank Donnay --- internal/indexer/versionedscanner.go | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/internal/indexer/versionedscanner.go b/internal/indexer/versionedscanner.go index bf1fe0cd8..123c2cdab 100644 --- a/internal/indexer/versionedscanner.go +++ b/internal/indexer/versionedscanner.go @@ -1,11 +1,17 @@ package indexer +import ( + "context" + "net/http" +) + const ( Package = "package" ) -// VersionedScanner can be imbeded into specific scanner types. This allows for methods and functions -// which only need to compare names and versions of scanners not to require each scanner type as an argument +// VersionedScanner can be embedded into specific scanner types. This allows for +// methods and functions which only need to compare names and versions of +// scanners not to require each scanner type as an argument. type VersionedScanner interface { // unique name of the distribution scanner. Name() string @@ -15,6 +21,24 @@ type VersionedScanner interface { Kind() string } +// ConfigDeserializer can be thought of as an Unmarshal function with the byte +// slice provided. +// +// This will typically be something like (*json.Decoder).Decode. +type ConfigDeserializer func(interface{}) error + +// RPCScanner is an interface scanners can implement to receive configuration +// and denote that they expect to be able to talk to the network at run time. +type RPCScanner interface { + Configure(context.Context, ConfigDeserializer, *http.Client) error +} + +// ConfigurableScanner is an interface scanners can implement to receive +// configuration. +type ConfigurableScanner interface { + Configure(context.Context, ConfigDeserializer) error +} + // VersionedScanners implements a list with construction methods // not concurrency safe type VersionedScanners []VersionedScanner From ac1035164fb4b8727698e0835750835a8edf44dc Mon Sep 17 00:00:00 2001 From: Hank Donnay Date: Tue, 26 May 2020 12:40:27 -0400 Subject: [PATCH 2/3] libindex: use new Configurable interfaces This exploded a bit into also refactoring the layerscanner, which is the primary place where scanners are actually invoked. Signed-off-by: Hank Donnay --- internal/indexer/ecosystem.go | 54 ++- internal/indexer/layerscanner/layerscanner.go | 386 +++++++++--------- .../indexer/layerscanner/layerscanner_test.go | 5 +- internal/indexer/opts.go | 23 +- libindex/controllerfactory.go | 24 +- libindex/libindex.go | 17 +- libindex/opts.go | 15 +- 7 files changed, 301 insertions(+), 223 deletions(-) diff --git a/internal/indexer/ecosystem.go b/internal/indexer/ecosystem.go index 817c0e7e0..56a6a41c3 100644 --- a/internal/indexer/ecosystem.go +++ b/internal/indexer/ecosystem.go @@ -1,6 +1,10 @@ package indexer -import "context" +import ( + "context" + + "github.com/rs/zerolog" +) // Ecosystems group together scanners and a Coalescer which are commonly used together. // @@ -17,7 +21,11 @@ type Ecosystem struct { } // EcosystemsToScanners extracts and dedupes multiple ecosystems and returns their discrete scanners -func EcosystemsToScanners(ctx context.Context, ecosystems []*Ecosystem) ([]PackageScanner, []DistributionScanner, []RepositoryScanner, error) { +func EcosystemsToScanners(ctx context.Context, ecosystems []*Ecosystem, disallowRemote bool) ([]PackageScanner, []DistributionScanner, []RepositoryScanner, error) { + log := zerolog.Ctx(ctx).With(). + Str("component", "internal/indexer/EcosystemsToScanners"). + Logger() + ctx = log.WithContext(ctx) ps := []PackageScanner{} ds := []DistributionScanner{} rs := []RepositoryScanner{} @@ -29,10 +37,18 @@ func EcosystemsToScanners(ctx context.Context, ecosystems []*Ecosystem) ([]Packa return nil, nil, nil, err } for _, s := range pscanners { - if _, ok := seen[s.Name()]; !ok { - ps = append(ps, s) - seen[s.Name()] = struct{}{} + n := s.Name() + if _, ok := seen[n]; ok { + continue + } + seen[n] = struct{}{} + if _, ok := s.(RPCScanner); ok && disallowRemote { + log.Info(). + Str("scanner", n). + Msg("disallowed by configuration") + continue } + ps = append(ps, s) } dscanners, err := ecosystem.DistributionScanners(ctx) @@ -40,10 +56,18 @@ func EcosystemsToScanners(ctx context.Context, ecosystems []*Ecosystem) ([]Packa return nil, nil, nil, err } for _, s := range dscanners { - if _, ok := seen[s.Name()]; !ok { - ds = append(ds, s) - seen[s.Name()] = struct{}{} + n := s.Name() + if _, ok := seen[n]; ok { + continue } + seen[n] = struct{}{} + if _, ok := s.(RPCScanner); ok && disallowRemote { + log.Info(). + Str("scanner", n). + Msg("disallowed by configuration") + continue + } + ds = append(ds, s) } rscanners, err := ecosystem.RepositoryScanners(ctx) @@ -51,10 +75,18 @@ func EcosystemsToScanners(ctx context.Context, ecosystems []*Ecosystem) ([]Packa return nil, nil, nil, err } for _, s := range rscanners { - if _, ok := seen[s.Name()]; !ok { - rs = append(rs, s) - seen[s.Name()] = struct{}{} + n := s.Name() + if _, ok := seen[n]; ok { + continue + } + seen[n] = struct{}{} + if _, ok := s.(RPCScanner); ok && disallowRemote { + log.Info(). + Str("scanner", n). + Msg("disallowed by configuration") + continue } + rs = append(rs, s) } } return ps, ds, rs, nil diff --git a/internal/indexer/layerscanner/layerscanner.go b/internal/indexer/layerscanner/layerscanner.go index ad5a7552b..fd04ea59b 100644 --- a/internal/indexer/layerscanner/layerscanner.go +++ b/internal/indexer/layerscanner/layerscanner.go @@ -3,76 +3,152 @@ package layerscanner import ( "context" "fmt" - "math" + "runtime" + "github.com/rs/zerolog" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" "github.com/quay/claircore" "github.com/quay/claircore/internal/indexer" - "github.com/rs/zerolog" ) -// layerScanner implements the indexer.LayerScanner interface. +// LayerScanner implements the indexer.LayerScanner interface. type layerScanner struct { - // common depedencies - *indexer.Opts - // concurrency level. maximum number of concurrent layer scans - cLevel int - // a channel to implement concurrency control - cc chan struct{} + store indexer.Store + + // Maximum allowed in-flight scanners per Scan call + inflight int64 + + // Pre-constructed and configured scanners. + ps []indexer.PackageScanner + ds []indexer.DistributionScanner + rs []indexer.RepositoryScanner } -// New is a constructor for a defaultLayerScanner -func New(cLevel int, opts *indexer.Opts) indexer.LayerScanner { - return &layerScanner{ - Opts: opts, - cLevel: cLevel, +// New is the constructor for a LayerScanner. +// +// The provided Context is only used for the duration of the call. +func New(ctx context.Context, concurrent int, opts *indexer.Opts) (indexer.LayerScanner, error) { + log := zerolog.Ctx(ctx).With(). + Str("component", "internal/indexer/layerscannner/New"). + Logger() + + switch { + case concurrent < 1: + log.Warn(). + Int("value", concurrent). + Msg("rectifying nonsense 'concurrent' argument") + fallthrough + case concurrent == 0: + concurrent = runtime.NumCPU() } -} -// addToken will block until a spot in the conccurency channel is available -// or the ctx is canceled. -func (ls *layerScanner) addToken(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - case ls.cc <- struct{}{}: - return nil + ps, ds, rs, err := indexer.EcosystemsToScanners(ctx, opts.Ecosystems, opts.Airgap) + if err != nil { + fmt.Errorf("failed to extract scanners from ecosystems: %v", err) } + // Configure and filter the scanners + var i int + i = 0 + for _, s := range ps { + if !configAndFilter(ctx, &log, opts, s) { + ps[i] = s + i++ + } + } + ps = ps[:i] + i = 0 + for _, s := range rs { + if !configAndFilter(ctx, &log, opts, s) { + rs[i] = s + i++ + } + } + rs = rs[:i] + i = 0 + for _, s := range ds { + if !configAndFilter(ctx, &log, opts, s) { + ds[i] = s + i++ + } + } + ds = ds[:i] + + return &layerScanner{ + store: opts.Store, + inflight: int64(concurrent), + ps: ps, + ds: ds, + rs: rs, + }, nil } -// discardToken is only called after addToken. Removes a token -// from the concurrency channel allowing another task to kick off. -func (ls *layerScanner) discardToken() { - select { - case <-ls.cc: +// ConfigAndFilter configures the provided scanner and reports if it should be +// filtered out of the slice or not. +func configAndFilter(ctx context.Context, log *zerolog.Logger, opts *indexer.Opts, s indexer.VersionedScanner) bool { + n := s.Name() + var cfgMap map[string]func(interface{}) error + switch k := s.Kind(); k { + case "package": + cfgMap = opts.ScannerConfig.Package + case "repository": + cfgMap = opts.ScannerConfig.Repo + case "distribution": + cfgMap = opts.ScannerConfig.Dist default: + log.Warn(). + Str("kind", k). + Str("scanner", n). + Msg("unknown scanner kind") + return true } + + if f, ok := cfgMap[n]; ok { + cs, csOK := s.(indexer.ConfigurableScanner) + rs, rsOK := s.(indexer.RPCScanner) + switch { + case !csOK && !rsOK: + log.Warn(). + Str("scanner", n). + Msg("configuration present for an unconfigurable scanner, skipping") + case csOK && rsOK: + fallthrough + case !csOK && rsOK: + if err := rs.Configure(ctx, f, opts.Client); err != nil { + log.Error(). + Str("scanner", n). + Err(err). + Msg("configuration failed") + return true + } + case csOK && !rsOK: + if err := cs.Configure(ctx, f); err != nil { + log.Error(). + Str("scanner", n). + Err(err). + Msg("configuration failed") + return true + } + } + } + return false } -// Scan performs a concurrency controlled scan of each layer by each type of configured scanner, indexing -// the results on successful completion. +// Scan performs a concurrency controlled scan of each layer by each configured +// scanner, indexing the results on successful completion. // -// Scan will launch all pending layer scans in a Go routine. -// Scan will ensure only 'cLevel' routines are actively scanning layers. +// Scan will launch all layer scan goroutines immediately and then only allow +// the configured limit to proceed. // -// If the provided ctx is canceled all routines are canceled and an error will be returned. -// If one or more layer scans fail Scan will report the first received error and all pending and inflight scans will be canceled. +// The provided Context controls cancellation for all scanners. The first error +// reported halts all work and is returned from Scan. func (ls *layerScanner) Scan(ctx context.Context, manifest claircore.Digest, layers []*claircore.Layer) error { - // compute concurrency level - x := float64(len(layers)) - y := float64(ls.cLevel) - if y == 0 { - y++ - } - ccMin := int(math.Min(x, y)) - - ls.cc = make(chan struct{}, ccMin) - - ps, ds, rs, err := indexer.EcosystemsToScanners(ctx, ls.Opts.Ecosystems) - if err != nil { - fmt.Errorf("failed to extract scanners from ecosystems: %v", err) - } + log := zerolog.Ctx(ctx).With(). + Str("component", "internal/indexer/layerscannner/layerScanner.Scan"). + Str("manifest", manifest.String()). + Logger() + ctx = log.WithContext(ctx) layersToScan := make([]*claircore.Layer, 0, len(layers)) dedupe := map[string]struct{}{} @@ -83,100 +159,48 @@ func (ls *layerScanner) Scan(ctx context.Context, manifest claircore.Digest, lay } } - g, gctx := errgroup.WithContext(ctx) - for _, layer := range layersToScan { - ll := layer - - for _, s := range ps { - ss := s - g.Go(func() error { - return ls.scanPackages(gctx, ll, ss) - }) + sem := semaphore.NewWeighted(ls.inflight) + g, ctx := errgroup.WithContext(ctx) + // Launch is a closure to capture the loop variables and then call the + // scanLayer method. + launch := func(l *claircore.Layer, s indexer.VersionedScanner) func() error { + return func() error { + if err := sem.Acquire(ctx, 1); err != nil { + return err + } + defer sem.Release(1) + return ls.scanLayer(ctx, l, s) } - - for _, s := range ds { - ss := s - g.Go(func() error { - return ls.scanDists(gctx, ll, ss) - }) + } + for _, l := range layersToScan { + for _, s := range ls.ps { + g.Go(launch(l, s)) } - - for _, s := range rs { - ss := s - g.Go(func() error { - return ls.scanRepos(gctx, ll, ss) - }) + for _, s := range ls.ds { + g.Go(launch(l, s)) } - } - - if err := g.Wait(); err != nil { - return err - } - - return nil -} - -func (ls *layerScanner) scanPackages(ctx context.Context, layer *claircore.Layer, s indexer.PackageScanner) error { - log := zerolog.Ctx(ctx).With(). - Str("component", "internal/indexer/layerscannner/layerScanner.scanPackages"). - Str("scanner", s.Name()). - Str("layer", layer.Hash.String()). - Logger() - - log.Debug().Msg("starting package scan") - if err := ls.addToken(ctx); err != nil { - return err - } - defer ls.discardToken() - - ok, err := ls.Store.LayerScanned(ctx, layer.Hash, s) - if err != nil { - return err - } - if ok { - log.Debug().Msg("layer already scanned") - return nil - } - - v, err := s.Scan(ctx, layer) - if err != nil { - return fmt.Errorf("scanner: %v error: %v", s.Name(), err) - } - err = ls.Store.SetLayerScanned(ctx, layer.Hash, s) - if err != nil { - return fmt.Errorf("could not set layer scanned: %v", layer) - } - - if v == nil { - log.Debug().Msg("scan returned a nil") - return nil - } - - if len(v) > 0 { - log.Debug().Int("count", len(v)).Msg("scan returned packages") - err = ls.Store.IndexPackages(ctx, v, layer, s) - if err != nil { - return fmt.Errorf("scanner: %v error: %v", s.Name(), err) + for _, s := range ls.rs { + g.Go(launch(l, s)) } } - return nil + return g.Wait() } -func (ls *layerScanner) scanDists(ctx context.Context, layer *claircore.Layer, s indexer.DistributionScanner) error { +// ScanLayer (along with the result type) handles an individual (scanner, layer) +// pair. +func (ls *layerScanner) scanLayer(ctx context.Context, l *claircore.Layer, s indexer.VersionedScanner) error { log := zerolog.Ctx(ctx).With(). - Str("component", "internal/indexer/layerscannner/layerScanner.scanDists"). + Str("component", "internal/indexer/layerscannner/layerScanner.scan"). Str("scanner", s.Name()). - Str("layer", layer.Hash.String()). + Str("kind", s.Kind()). + Str("layer", l.Hash.String()). Logger() + ctx = log.WithContext(ctx) + log.Debug().Msg("scan start") + defer log.Debug().Msg("scan done") - log.Debug().Msg("starting dist scan") - if err := ls.addToken(ctx); err != nil { - return err - } - defer ls.discardToken() - - ok, err := ls.Store.LayerScanned(ctx, layer.Hash, s) + ok, err := ls.store.LayerScanned(ctx, l.Hash, s) if err != nil { return err } @@ -185,74 +209,58 @@ func (ls *layerScanner) scanDists(ctx context.Context, layer *claircore.Layer, s return nil } - v, err := s.Scan(ctx, layer) - if err != nil { - return fmt.Errorf("scanner: %v error: %v", s.Name(), err) - } - err = ls.Store.SetLayerScanned(ctx, layer.Hash, s) - if err != nil { - return fmt.Errorf("could not set layer scanned: %+v %+v", layer, s) - } - - if v == nil { - log.Debug().Msg("scan returned a nil") - return nil + var result result + if err := result.Do(ctx, s, l); err != nil { + return err } - if len(v) > 0 { - log.Debug().Int("count", len(v)).Msg("scan returned dists") - err = ls.Store.IndexDistributions(ctx, v, layer, s) - if err != nil { - return fmt.Errorf("scanner: %v error: %v", s.Name(), err) - } + if err = ls.store.SetLayerScanned(ctx, l.Hash, s); err != nil { + return fmt.Errorf("could not set layer scanned: %v", l) } - return nil + return result.Store(ctx, ls.store, s, l) } -func (ls *layerScanner) scanRepos(ctx context.Context, layer *claircore.Layer, s indexer.RepositoryScanner) error { - log := zerolog.Ctx(ctx).With(). - Str("component", "internal/indexer/layerscannner/layerScanner.scanRepos"). - Str("scanner", s.Name()). - Str("layer", layer.Hash.String()). - Logger() - - log.Debug().Msg("starting repo scan") - if err := ls.addToken(ctx); err != nil { - return err - } - defer ls.discardToken() - - ok, err := ls.Store.LayerScanned(ctx, layer.Hash, s) - if err != nil { - return err - } - if ok { - log.Debug().Msg("layer already scanned") - return nil - } - - v, err := s.Scan(ctx, layer) - if err != nil { - return fmt.Errorf("scanner: %v error: %v", s.Name(), err) - } - err = ls.Store.SetLayerScanned(ctx, layer.Hash, s) - if err != nil { - return fmt.Errorf("could not set layer scanned: %v", layer) - } +// Result is a type that handles the kind-specific bits of the scan process. +type result struct { + pkgs []*claircore.Package + dists []*claircore.Distribution + repos []*claircore.Repository +} - if v == nil { - log.Debug().Msg("scan returned a nil") - return nil +// Do asserts the Scanner back to having a Scan method, and then calls it. +// +// The success value is captured and the error value is returned by Do. +func (r *result) Do(ctx context.Context, s indexer.VersionedScanner, l *claircore.Layer) error { + var err error + switch s := s.(type) { + case indexer.PackageScanner: + r.pkgs, err = s.Scan(ctx, l) + case indexer.DistributionScanner: + r.dists, err = s.Scan(ctx, l) + case indexer.RepositoryScanner: + r.repos, err = s.Scan(ctx, l) + default: + panic(fmt.Sprintf("programmer error: unknown type %T used as scanner", s)) } + return err +} - if len(v) > 0 { - log.Debug().Int("count", len(v)).Msg("scan returned repos") - err = ls.Store.IndexRepositories(ctx, v, layer, s) - if err != nil { - return fmt.Errorf("scanner: %v error: %v", s.Name(), err) - } +// Store calls the properly typed store method on whatever value was captured in +// the result. +func (r *result) Store(ctx context.Context, store indexer.Store, s indexer.VersionedScanner, l *claircore.Layer) error { + log := zerolog.Ctx(ctx).With().Logger() + switch { + case r.pkgs != nil: + log.Debug().Int("count", len(r.pkgs)).Msg("scan returned packages") + return store.IndexPackages(ctx, r.pkgs, l, s) + case r.dists != nil: + log.Debug().Int("count", len(r.dists)).Msg("scan returned dists") + return store.IndexDistributions(ctx, r.dists, l, s) + case r.repos != nil: + log.Debug().Int("count", len(r.repos)).Msg("scan returned repos") + return store.IndexRepositories(ctx, r.repos, l, s) } - + log.Debug().Msg("scan returned a nil") return nil } diff --git a/internal/indexer/layerscanner/layerscanner_test.go b/internal/indexer/layerscanner/layerscanner_test.go index f50e8f74a..7f05514e1 100644 --- a/internal/indexer/layerscanner/layerscanner_test.go +++ b/internal/indexer/layerscanner/layerscanner_test.go @@ -82,7 +82,10 @@ func Test_Scan_NoErrors(t *testing.T) { Ecosystems: []*indexer.Ecosystem{ecosystem}, } - layerscanner := New(1, sOpts) + layerscanner, err := New(ctx, 1, sOpts) + if err != nil { + t.Fatal(err) + } ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() diff --git a/internal/indexer/opts.go b/internal/indexer/opts.go index 64cc347cc..6b415749c 100644 --- a/internal/indexer/opts.go +++ b/internal/indexer/opts.go @@ -1,13 +1,22 @@ package indexer -import "github.com/quay/claircore/pkg/distlock" +import ( + "net/http" + + "github.com/quay/claircore/pkg/distlock" +) // Opts are options to instantiate a indexer type Opts struct { - Store Store - ScanLock distlock.Locker - LayerScanner LayerScanner - Fetcher Fetcher - Ecosystems []*Ecosystem - Vscnrs VersionedScanners + Store Store + ScanLock distlock.Locker + LayerScanner LayerScanner + Fetcher Fetcher + Ecosystems []*Ecosystem + Vscnrs VersionedScanners + Airgap bool + Client *http.Client + ScannerConfig struct { + Package, Dist, Repo map[string]func(interface{}) error + } } diff --git a/libindex/controllerfactory.go b/libindex/controllerfactory.go index 3dddd7c1a..c1b19fcd7 100644 --- a/libindex/controllerfactory.go +++ b/libindex/controllerfactory.go @@ -1,6 +1,8 @@ package libindex import ( + "context" + "github.com/quay/claircore/internal/indexer" "github.com/quay/claircore/internal/indexer/controller" "github.com/quay/claircore/internal/indexer/fetcher" @@ -9,22 +11,28 @@ import ( ) // ControllerFactory is a factory method to return a Controller during libindex runtime. -type ControllerFactory func(lib *Libindex, opts *Opts) (*controller.Controller, error) +type ControllerFactory func(_ context.Context, lib *Libindex, opts *Opts) (*controller.Controller, error) // controllerFactory is the default ControllerFactory -func controllerFactory(lib *Libindex, opts *Opts) (*controller.Controller, error) { +func controllerFactory(ctx context.Context, lib *Libindex, opts *Opts) (*controller.Controller, error) { sc := dlpg.NewLock(lib.db, opts.ScanLockRetry) ft := fetcher.New(lib.client, opts.LayerFetchOpt) // convert libindex.Opts to indexer.Opts sOpts := &indexer.Opts{ - Store: lib.store, - ScanLock: sc, - Fetcher: ft, - Ecosystems: opts.Ecosystems, - Vscnrs: lib.vscnrs, + Store: lib.store, + ScanLock: sc, + Fetcher: ft, + Ecosystems: opts.Ecosystems, + Vscnrs: lib.vscnrs, + Client: lib.client, + ScannerConfig: opts.ScannerConfig, + } + var err error + sOpts.LayerScanner, err = layerscanner.New(ctx, opts.LayerScanConcurrency, sOpts) + if err != nil { + return nil, err } - sOpts.LayerScanner = layerscanner.New(opts.LayerScanConcurrency, sOpts) s := controller.New(sOpts) return s, nil diff --git a/libindex/libindex.go b/libindex/libindex.go index f4ff6e7a4..4b6fcf69e 100644 --- a/libindex/libindex.go +++ b/libindex/libindex.go @@ -30,7 +30,7 @@ type Libindex struct { db *sqlx.DB // a Store which will be shared between scanner instances store indexer.Store - // a sharable http client + // a shareable http client client *http.Client // an opaque and unique string representing the configured // state of the indexer. see setState for more information. @@ -43,7 +43,7 @@ func New(ctx context.Context, opts *Opts) (*Libindex, error) { Str("component", "libindex/New"). Logger() ctx = log.WithContext(ctx) - err := opts.Parse() + err := opts.Parse(ctx) if err != nil { return nil, fmt.Errorf("failed to parse opts: %v", err) } @@ -62,8 +62,12 @@ func New(ctx context.Context, opts *Opts) (*Libindex, error) { } // register any new scanners. - pscnrs, dscnrs, rscnrs, err := indexer.EcosystemsToScanners(ctx, opts.Ecosystems) + pscnrs, dscnrs, rscnrs, err := indexer.EcosystemsToScanners(ctx, opts.Ecosystems, opts.Airgap) + if err != nil { + return nil, err + } vscnrs := indexer.MergeVS(pscnrs, dscnrs, rscnrs) + err = l.store.RegisterScanners(ctx, vscnrs) if err != nil { return nil, fmt.Errorf("failed to register configured scanners: %v", err) @@ -98,7 +102,7 @@ func (l *Libindex) Index(ctx context.Context, manifest *claircore.Manifest) (*cl ctx = log.WithContext(ctx) log.Info().Msg("index request start") defer log.Info().Msg("index request done") - c, err := l.ControllerFactory(l, l.Opts) + c, err := l.ControllerFactory(ctx, l, l.Opts) if err != nil { return nil, fmt.Errorf("scanner factory failed to construct a scanner: %v", err) } @@ -119,7 +123,7 @@ func (l *Libindex) State(ctx context.Context) (string, error) { // configuration state. // // Indexers running different scanner versions will produce different state strings. -// Thus this state value can be used as a que for clients to re-index their manifests +// Thus this state value can be used as a cue for clients to re-index their manifests // and obtain a new IndexReport. func (l *Libindex) setState(ctx context.Context, vscnrs indexer.VersionedScanners) error { h := md5.New() @@ -128,6 +132,9 @@ func (l *Libindex) setState(ctx context.Context, vscnrs indexer.VersionedScanner for _, s := range vscnrs { n := s.Name() m[n] = []byte(n + s.Version() + s.Kind() + "\n") + // TODO(hank) Should this take into account configuration? E.g. If a + // scanner implements the configurable interface, should we expect that + // we can serialize the scanner's concrete type? ns = append(ns, n) } if _, err := io.WriteString(h, versionMagic); err != nil { diff --git a/libindex/opts.go b/libindex/opts.go index 2e2681151..5cf98fb98 100644 --- a/libindex/opts.go +++ b/libindex/opts.go @@ -40,12 +40,23 @@ type Opts struct { ControllerFactory ControllerFactory // a list of ecosystems to use which define which package databases and coalescing methods we use Ecosystems []*indexer.Ecosystem + // Airgap should be set to disallow any scanners that mark themselves as + // making network calls. + Airgap bool + // ScannerConfig holds functions that can be passed into configurable + // scanners. They're broken out by kind, and only used if a scanner + // implements the appropriate interface. + // + // Providing a function for a scanner that's not expecting it is not a fatal + // error. + ScannerConfig struct { + Package, Dist, Repo map[string]func(interface{}) error + } // a convenience method for holding a list of versioned scanners vscnrs indexer.VersionedScanners } -func (o *Opts) Parse() error { - ctx := context.TODO() +func (o *Opts) Parse(ctx context.Context) error { // required if o.ConnString == "" { return fmt.Errorf("ConnString not provided") From 79bad1ee064f662ada65a4110516050770f1a1c6 Mon Sep 17 00:00:00 2001 From: Hank Donnay Date: Tue, 26 May 2020 12:47:51 -0400 Subject: [PATCH 3/3] aws: ensure Close call gets to underlying File Signed-off-by: Hank Donnay --- aws/client.go | 18 +++++++++++++++++- aws/updater.go | 8 -------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/aws/client.go b/aws/client.go index d7bcb9ee0..a762268a6 100644 --- a/aws/client.go +++ b/aws/client.go @@ -1,6 +1,7 @@ package aws import ( + "compress/gzip" "context" "encoding/xml" "fmt" @@ -150,15 +151,30 @@ func (c *Client) Updates(ctx context.Context) (io.ReadCloser, error) { tf.Close() return nil, err } + gz, err := gzip.NewReader(tf) + if err != nil { + return nil, fmt.Errorf("failed to create gzip reader: %v", err) + } log.Debug().Msg("success") - return tf, nil + return &gzippedFile{ + Reader: gz, + Closer: tf, + }, nil } log.Error().Msg("exhausted all mirrors") return nil, fmt.Errorf("all update_info mirrors failed to return a response") } +// gzippedFile implements io.ReadCloser by proxying calls to different +// underlying implementations. This is used to make sure the file that backs the +// downloaded security database has Close called on it. +type gzippedFile struct { + io.Reader + io.Closer +} + func (c *Client) getMirrors(ctx context.Context, release Release) error { var ( req *http.Request diff --git a/aws/updater.go b/aws/updater.go index 3cc519c72..79164e817 100644 --- a/aws/updater.go +++ b/aws/updater.go @@ -1,12 +1,10 @@ package aws import ( - "compress/gzip" "context" "encoding/xml" "fmt" "io" - "io/ioutil" "strings" "time" @@ -59,12 +57,6 @@ func (u *Updater) Fetch(ctx context.Context, fingerprint driver.Fingerprint) (io return nil, "", fmt.Errorf("failed to retrieve update info: %v", err) } - gzip, err := gzip.NewReader(rc) - if err != nil { - return nil, "", fmt.Errorf("failed to create gzip reader: %v", err) - } - rc = ioutil.NopCloser(gzip) - return rc, driver.Fingerprint(updatesRepoMD.Checksum.Sum), nil }