diff --git a/vervet-underground/config/config.go b/vervet-underground/config/config.go index de004cc3..73dfeeda 100644 --- a/vervet-underground/config/config.go +++ b/vervet-underground/config/config.go @@ -14,6 +14,7 @@ type StorageType string const ( StorageTypeMemory StorageType = "memory" + StorageTypeDisk StorageType = "disk" StorageTypeS3 StorageType = "s3" StorageTypeGCS StorageType = "gcs" ) @@ -70,6 +71,12 @@ type StorageConfig struct { IamRoleEnabled bool S3 S3Config GCS GcsConfig + Disk DiskConfig +} + +// DiskConfig defines configuration options for local disk storage. +type DiskConfig struct { + Path string } // S3Config defines configuration options for AWS S3 storage. diff --git a/vervet-underground/config/config_test.go b/vervet-underground/config/config_test.go index b45e8c05..9f09e107 100644 --- a/vervet-underground/config/config_test.go +++ b/vervet-underground/config/config_test.go @@ -87,6 +87,34 @@ func TestLoad(t *testing.T) { c.Assert(*conf, qt.DeepEquals, expected) }) + c.Run("disk config", func(c *qt.C) { + f := createTestFile(c, []byte(`{ + "host": "0.0.0.0", + "services": [{"url":"localhost","name":"localhost"}], + "storage": { + "type": "disk", + "disk": { + "path": "/tmp/foobar" + } + } + }`)) + + conf, err := config.Load(f.Name()) + c.Assert(err, qt.IsNil) + + expected := config.ServerConfig{ + Host: "0.0.0.0", + Services: []config.ServiceConfig{{URL: "localhost", Name: "localhost"}}, + Storage: config.StorageConfig{ + Type: config.StorageTypeDisk, + Disk: config.DiskConfig{ + Path: "/tmp/foobar", + }, + }, + } + c.Assert(*conf, qt.DeepEquals, expected) + }) + c.Run("s3 config", func(c *qt.C) { f := createTestFile(c, []byte(`{ "host": "0.0.0.0", diff --git a/vervet-underground/internal/scraper/scraper_test.go b/vervet-underground/internal/scraper/scraper_test.go index 8aab2089..6fdcb53d 100644 --- a/vervet-underground/internal/scraper/scraper_test.go +++ b/vervet-underground/internal/scraper/scraper_test.go @@ -6,20 +6,17 @@ import ( "fmt" "net/http" "net/http/httptest" - "strings" "testing" "time" qt "github.com/frankban/quicktest" "github.com/getkin/kin-openapi/openapi3" "github.com/gorilla/mux" - "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" "vervet-underground/config" "vervet-underground/internal/scraper" - "vervet-underground/internal/storage/mem" - "vervet-underground/internal/testutil" + "vervet-underground/internal/storage/disk" ) var ( @@ -99,7 +96,16 @@ func TestScraper(t *testing.T) { Name: "animals", URL: animalsService.URL, }}, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) c.Assert(err, qt.IsNil) @@ -155,7 +161,15 @@ func TestScraperWithLegacy(t *testing.T) { }, }, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) c.Assert(err, qt.IsNil) @@ -180,7 +194,15 @@ func TestEmptyScrape(t *testing.T) { cfg := &config.ServerConfig{ Services: nil, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) c.Assert(err, qt.IsNil) @@ -198,7 +220,15 @@ func TestScrapeClientError(t *testing.T) { cfg := &config.ServerConfig{ Services: []config.ServiceConfig{{Name: "nope", URL: "http://example.com/nope"}}, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 }), scraper.HTTPClient(&http.Client{ @@ -221,84 +251,3 @@ type errorTransport struct{} func (*errorTransport) RoundTrip(*http.Request) (*http.Response, error) { return nil, fmt.Errorf("bad wolf") } - -func TestScraperCollation(t *testing.T) { - c := qt.New(t) - - petfoodService, animalsService := setupHttpServers(c) - tests := []struct { - name, version, digest string - }{{ - "petfood", "2021-09-01", "sha256:I20cAQ3VEjDrY7O0B678yq+0pYN2h3sxQy7vmdlo4+w=", - }, { - "animals", "2021-10-16", "sha256:P1FEFvnhtxJSqXr/p6fMNKE+HYwN6iwKccBGHIVZbyg=", - }} - - cfg := &config.ServerConfig{ - Services: []config.ServiceConfig{{ - Name: "petfood", URL: petfoodService.URL, - }, { - Name: "animals", URL: animalsService.URL, - }}, - } - memSt := mem.New() - st, ok := memSt.(*mem.Storage) - c.Assert(ok, qt.IsTrue) - sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) - c.Assert(err, qt.IsNil) - - before, err := prometheus.DefaultGatherer.Gather() - c.Assert(err, qt.IsNil) - - // Cancel the scrape context after a timeout so we don't hang the test - ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) - c.Cleanup(cancel) - - // Run the scrape - err = sc.Run(ctx) - c.Assert(err, qt.IsNil) - - // Version digests now known to storage - for _, test := range tests { - ok, err := st.HasVersion(ctx, test.name, test.version, test.digest) - c.Assert(err, qt.IsNil) - c.Assert(ok, qt.IsTrue) - } - - collated, err := st.GetCollatedVersionSpecs() - c.Assert(err, qt.IsNil) - c.Assert(len(collated), qt.Equals, 4) - - vi := st.VersionIndex() - c.Assert(len(vi.Versions()), qt.Equals, 4) - for _, version := range vi.Versions() { - specData, err := st.Version(ctx, version.String()) - c.Assert(err, qt.IsNil) - l := openapi3.NewLoader() - spec, err := l.LoadFromData(specData) - c.Assert(err, qt.IsNil) - c.Assert(spec, qt.IsNotNil) - c.Assert(len(spec.Paths), qt.Equals, collatedPaths[version.String()]) - } - - // Assert metrics - after, err := prometheus.DefaultGatherer.Gather() - c.Assert(err, qt.IsNil) - - c.Assert(testutil.SampleDelta("vu_scraper_run_duration_seconds", map[string]string{}, before, after), - qt.Equals, uint64(1)) - c.Assert(testutil.SampleDelta("vu_scraper_run_error_total", map[string]string{}, before, after), - qt.Equals, uint64(0)) - c.Assert(testutil.SampleDelta("vu_scraper_service_scrape_duration_seconds", - map[string]string{ - "service": strings.Replace(petfoodService.URL, "http://", "", 1), - }, - before, after, - ), qt.Equals, uint64(1)) - c.Assert(testutil.SampleDelta("vu_scraper_service_scrape_duration_seconds", - map[string]string{ - "service": strings.Replace(animalsService.URL, "http://", "", 1), - }, - before, after, - ), qt.Equals, uint64(1)) -} diff --git a/vervet-underground/internal/storage/disk/disk.go b/vervet-underground/internal/storage/disk/disk.go new file mode 100644 index 00000000..20646aea --- /dev/null +++ b/vervet-underground/internal/storage/disk/disk.go @@ -0,0 +1,291 @@ +// Package disk provides an implementation of the storage used in Vervet +// Underground that uses a local filesystem. It's not intended for production +// use, but as a functionally complete reference implementation that can be +// used to validate the other parts of the VU system. +package disk + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "os" + "path" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/getkin/kin-openapi/openapi3" + "github.com/snyk/vervet/v6" + + "vervet-underground/internal/storage" +) + +type Storage struct { + path string + mu sync.RWMutex + collatedVersions vervet.VersionIndex + newCollator func() (*storage.Collator, error) +} + +// Option defines a Storage constructor option. +type Option func(*Storage) + +type objectMeta struct { + blob []byte + lastMod time.Time +} + +func New(path string, options ...Option) storage.Storage { + s := &Storage{ + path: path, + newCollator: func() (*storage.Collator, error) { return storage.NewCollator() }, + } + for _, option := range options { + option(s) + } + return s +} + +func (s *Storage) Cleanup() error { + if s.path == "" { + return fmt.Errorf("not cleaning up invalid path") + } + return os.RemoveAll(s.path) +} + +// NewCollator configures the Storage instance to use the given constructor +// function for creating collator instances. +func NewCollator(newCollator func() (*storage.Collator, error)) Option { + return func(s *Storage) { + s.newCollator = newCollator + } +} + +// NotifyVersions implements scraper.Storage. +func (s *Storage) NotifyVersions(ctx context.Context, name string, versions []string, scrapeTime time.Time) error { + for _, version := range versions { + // TODO: Add method to fetch contents here + // TODO: implement notify versions; update sunset when versions are removed + err := s.NotifyVersion(ctx, name, version, []byte{}, scrapeTime) + if err != nil { + return err + } + } + return nil +} + +// CollateVersions aggregates versions and revisions from all the services, and +// produces unified versions and merged specs for all APIs. +func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string]bool) error { + // create an aggregate to process collated data from storage data + aggregate, err := s.newCollator() + if err != nil { + return err + } + serviceRevisions, err := s.ListObjects(ctx, storage.ServiceVersionsFolder) + if err != nil { + return err + } + + // all specs are stored as: "service-versions/{service_name}/{version}/{digest}.json" + for _, revKey := range serviceRevisions { + service, version, digest, err := parseServiceVersionRevisionKey(revKey) + if err != nil { + return err + } + if _, ok := serviceFilter[service]; !ok { + continue + } + rev, err := s.GetObjectWithMetadata(revKey) + if err != nil { + return err + } + + // Assuming version is valid in path uploads + parsedVersion, err := vervet.ParseVersion(version) + if err != nil { + return err + } + + revision := storage.ContentRevision{ + Service: service, + Version: parsedVersion, + Timestamp: rev.lastMod, + Digest: storage.Digest(digest), + Blob: rev.blob, + } + aggregate.Add(service, revision) + } + versions, specs, err := aggregate.Collate() + if err != nil { + return err + } + + s.mu.Lock() + s.collatedVersions = vervet.NewVersionIndex(versions) + s.mu.Unlock() + + return s.putCollatedSpecs(specs) +} + +// HasVersion implements scraper.Storage. +func (s *Storage) HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) { + key := s.getServiceVersionRevisionKey(name, version, digest) + path := path.Join(s.path, key) + _, err := os.Stat(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + return false, err + } + return true, nil +} + +// NotifyVersion implements scraper.Storage. +func (s *Storage) NotifyVersion(ctx context.Context, name string, version string, contents []byte, scrapeTime time.Time) error { + digest := storage.NewDigest(contents) + key := s.getServiceVersionRevisionKey(name, version, string(digest)) + parsedVersion, err := vervet.ParseVersion(version) + if err != nil { + return err + } + + currentRevision := storage.ContentRevision{ + Service: name, + Timestamp: scrapeTime, + Digest: digest, + Blob: contents, + Version: parsedVersion, + } + + _, err = s.GetObject(key) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // Since the digest doesn't exist, add the whole key path + return s.PutObject(key, currentRevision.Blob) + } + return err + } + // digest already exists, nothing to do + return nil +} + +// VersionIndex implements scraper.Storage. +func (s *Storage) VersionIndex() vervet.VersionIndex { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.collatedVersions +} + +// Version implements scraper.Storage. +func (s *Storage) Version(ctx context.Context, version string) ([]byte, error) { + parsedVersion, err := vervet.ParseVersion(version) + if err != nil { + return nil, err + } + + blob, err := s.GetCollatedVersionSpec(version) + if err != nil { + s.mu.RLock() + resolved, err := s.collatedVersions.Resolve(parsedVersion) + s.mu.RUnlock() + if err != nil { + return nil, err + } + return s.GetCollatedVersionSpec(resolved.String()) + } + return blob, nil +} + +func (s *Storage) getServiceVersionRevisionKey(name string, version string, digest string) string { + // digest could contain slashes + b64 := base64.StdEncoding.EncodeToString([]byte(digest)) + return path.Join(storage.ServiceVersionsFolder, name, version, b64) + ".json" +} + +func (s *Storage) GetObject(key string) ([]byte, error) { + path := path.Join(s.path, key) + return os.ReadFile(path) +} + +func (s *Storage) PutObject(key string, body []byte) error { + path := path.Join(s.path, key) + err := os.MkdirAll(filepath.Dir(path), os.ModePerm) + if err != nil { + return err + } + return os.WriteFile(path, body, 0600) +} + +// GetCollatedVersionSpec retrieves a single collated vervet.Version +// and returns the JSON blob. +func (s *Storage) GetCollatedVersionSpec(version string) ([]byte, error) { + path := path.Join(storage.CollatedVersionsFolder, version, "spec.json") + return s.GetObject(path) +} + +// ListObjects gets all objects under a given directory. +func (s *Storage) ListObjects(ctx context.Context, key string) ([]string, error) { + path := path.Join(s.path, key) + objects := make([]string, 0) + err := filepath.Walk(path, func(obj string, info os.FileInfo, err error) error { + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err + } + if !info.IsDir() { + objects = append(objects, obj) + } + return nil + }) + return objects, err +} + +func parseServiceVersionRevisionKey(key string) (string, string, string, error) { + digestB64 := filepath.Base(key) + digestB64 = strings.TrimSuffix(digestB64, ".json") + digest, err := base64.StdEncoding.DecodeString(digestB64) + if err != nil { + return "", "", "", err + } + rest := filepath.Dir(key) + version := filepath.Base(rest) + rest = filepath.Dir(rest) + service := filepath.Base(rest) + + return service, version, string(digest), nil +} + +func (s *Storage) GetObjectWithMetadata(key string) (*objectMeta, error) { + info, err := os.Stat(key) + if err != nil { + return nil, err + } + lastMod := info.ModTime() + body, err := os.ReadFile(key) + return &objectMeta{ + lastMod: lastMod, + blob: body, + }, err +} + +// putCollatedSpecs stores the given collated OpenAPI document objects. +func (s *Storage) putCollatedSpecs(objects map[vervet.Version]openapi3.T) error { + for key, file := range objects { + jsonBlob, err := file.MarshalJSON() + if err != nil { + return fmt.Errorf("failed to marshal json for collation upload: %w", err) + } + err = s.PutObject(storage.CollatedVersionsFolder+key.String()+"/spec.json", jsonBlob) + if err != nil { + return err + } + } + return nil +} diff --git a/vervet-underground/internal/storage/disk/disk_test.go b/vervet-underground/internal/storage/disk/disk_test.go new file mode 100644 index 00000000..b3ea41b0 --- /dev/null +++ b/vervet-underground/internal/storage/disk/disk_test.go @@ -0,0 +1,132 @@ +package disk + +import ( + "context" + "fmt" + "testing" + "time" + + qt "github.com/frankban/quicktest" + + "vervet-underground/internal/storage" +) + +var t0 = time.Date(2021, time.December, 3, 20, 49, 51, 0, time.UTC) + +func TestNotifyVersions(t *testing.T) { + c := qt.New(t) + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + ctx := context.Background() + err := s.NotifyVersions(ctx, "petfood", []string{"2021-09-01", "2021-09-16"}, t0) + c.Assert(err, qt.IsNil) + // TODO: verify side-effects when there are some... +} + +func TestHasVersion(t *testing.T) { + c := qt.New(t) + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + ctx := context.Background() + const cricketsDigest = "sha256:mWpHX0/hIZS9mVd8eobfHWm6OkUsKZLiqd6ShRnNzA4=" + const geckosDigest = "sha256:c5JD7m0g4DVhoaX4z8HFcTP8S/yUOEsjgP8ECkuEHqM=" + for _, digest := range []string{cricketsDigest, geckosDigest} { + ok, err := s.HasVersion(ctx, "petfood", "2021-09-16", digest) + c.Assert(err, qt.IsNil) + c.Assert(ok, qt.IsFalse) + } + err := s.NotifyVersion(ctx, "petfood", "2021-09-16", []byte("crickets"), t0) + c.Assert(err, qt.IsNil) + err = s.NotifyVersion(ctx, "animals", "2021-09-16", []byte("geckos"), t0) + c.Assert(err, qt.IsNil) + + tests := []struct { + service, version, digest string + shouldHave bool + }{ + {"petfood", "2021-09-16", cricketsDigest, true}, + {"animals", "2021-09-16", geckosDigest, true}, + {"petfood", "2021-09-16", geckosDigest, false}, + {"animals", "2021-09-16", cricketsDigest, false}, + {"petfood", "2021-10-16", cricketsDigest, false}, + {"animals", "2021-09-17", geckosDigest, false}, + } + for i, t := range tests { + c.Logf("test#%d: %v", i, t) + ok, err := s.HasVersion(ctx, t.service, t.version, t.digest) + c.Assert(err, qt.IsNil) + c.Assert(ok, qt.Equals, t.shouldHave) + } +} + +const spec = `{"components":{},"info":{"title":"ServiceA API","version":"0.0.0"},` + + `"openapi":"3.0.0","paths":{"/test":{"get":{"operation":"getTest",` + + `"responses":{"204":{"description":"An empty response"}},"summary":"Test endpoint"}}}}` + +const emptySpec = `{"components":{},"info":{"title":"","version":""},"openapi":"","paths":null}` + +func TestCollateVersions(t *testing.T) { + c := qt.New(t) + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + + ctx := context.Background() + err := s.NotifyVersion(ctx, "petfood", "2021-09-16", []byte(emptySpec), t0) + c.Assert(err, qt.IsNil) + + serviceFilter := map[string]bool{"petfood": true} + err = s.CollateVersions(ctx, serviceFilter) + c.Assert(err, qt.IsNil) + before, err := s.Version(ctx, "2021-09-16") + c.Assert(err, qt.IsNil) + c.Assert(string(before), qt.Equals, emptySpec) + + content, err := s.Version(ctx, "2021-01-01") + c.Assert(err.Error(), qt.Equals, fmt.Errorf("no matching version").Error()) + c.Assert(content, qt.IsNil) + + err = s.NotifyVersion(ctx, "petfood", "2021-09-16", []byte(spec), t0.Add(time.Second)) + c.Assert(err, qt.IsNil) + err = s.CollateVersions(ctx, serviceFilter) + c.Assert(err, qt.IsNil) + + after, err := s.Version(ctx, "2021-09-16") + c.Assert(err, qt.IsNil) + c.Assert(string(after), qt.Equals, spec) +} + +func TestDiskStorageCollateVersion(t *testing.T) { + c := qt.New(t) + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + + storage.AssertCollateVersion(c, s) +} diff --git a/vervet-underground/server.go b/vervet-underground/server.go index 7b61d43d..fcfa2432 100644 --- a/vervet-underground/server.go +++ b/vervet-underground/server.go @@ -17,6 +17,7 @@ import ( "vervet-underground/internal/handler" "vervet-underground/internal/scraper" "vervet-underground/internal/storage" + "vervet-underground/internal/storage/disk" "vervet-underground/internal/storage/gcs" "vervet-underground/internal/storage/mem" "vervet-underground/internal/storage/s3" @@ -177,6 +178,8 @@ func initializeStorage(ctx context.Context, cfg *config.ServerConfig, overlayCon switch cfg.Storage.Type { case config.StorageTypeMemory: return mem.New(mem.NewCollator(newCollator)), nil + case config.StorageTypeDisk: + return disk.New(cfg.Storage.Disk.Path, disk.NewCollator(newCollator)), nil case config.StorageTypeS3: return s3.New(ctx, &s3.Config{ AwsRegion: cfg.Storage.S3.Region,