Skip to content

Commit

Permalink
feat: add disk based storage interface
Browse files Browse the repository at this point in the history
We plan to split the api and the scraper into separate processes, as
such we cannot share memory between the two. This makes the default
"memory" storage unusable so will be removed, instead in its place we
can use this new one which uses the local filesystem as a backing.

The implementation is based on the s3 storage implementation, albeit a
bit simplified.
  • Loading branch information
jgresty committed Apr 24, 2024
1 parent b724515 commit 9715877
Show file tree
Hide file tree
Showing 6 changed files with 499 additions and 89 deletions.
7 changes: 7 additions & 0 deletions vervet-underground/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type StorageType string

const (
StorageTypeMemory StorageType = "memory"
StorageTypeDisk StorageType = "disk"
StorageTypeS3 StorageType = "s3"
StorageTypeGCS StorageType = "gcs"
)
Expand Down Expand Up @@ -70,6 +71,12 @@ type StorageConfig struct {
IamRoleEnabled bool
S3 S3Config
GCS GcsConfig
Disk DiskConfig
}

// DiskConfig defines configuration options for local disk storage.
type DiskConfig struct {
Path string
}

// S3Config defines configuration options for AWS S3 storage.
Expand Down
28 changes: 28 additions & 0 deletions vervet-underground/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,34 @@ func TestLoad(t *testing.T) {
c.Assert(*conf, qt.DeepEquals, expected)
})

c.Run("disk config", func(c *qt.C) {
f := createTestFile(c, []byte(`{
"host": "0.0.0.0",
"services": [{"url":"localhost","name":"localhost"}],
"storage": {
"type": "disk",
"disk": {
"path": "/tmp/foobar"
}
}
}`))

conf, err := config.Load(f.Name())
c.Assert(err, qt.IsNil)

expected := config.ServerConfig{
Host: "0.0.0.0",
Services: []config.ServiceConfig{{URL: "localhost", Name: "localhost"}},
Storage: config.StorageConfig{
Type: config.StorageTypeDisk,
Disk: config.DiskConfig{
Path: "/tmp/foobar",
},
},
}
c.Assert(*conf, qt.DeepEquals, expected)
})

c.Run("s3 config", func(c *qt.C) {
f := createTestFile(c, []byte(`{
"host": "0.0.0.0",
Expand Down
127 changes: 38 additions & 89 deletions vervet-underground/internal/scraper/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

qt "github.com/frankban/quicktest"
"github.com/getkin/kin-openapi/openapi3"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"

"vervet-underground/config"
"vervet-underground/internal/scraper"
"vervet-underground/internal/storage/mem"
"vervet-underground/internal/testutil"
"vervet-underground/internal/storage/disk"
)

var (
Expand Down Expand Up @@ -99,7 +96,16 @@ func TestScraper(t *testing.T) {
Name: "animals", URL: animalsService.URL,
}},
}
st := mem.New()
st := disk.New("/tmp/specs")
c.Cleanup(func() {
ds, ok := st.(*disk.Storage)
if !ok {
return
}
err := ds.Cleanup()
c.Assert(err, qt.IsNil)
})

sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 }))
c.Assert(err, qt.IsNil)

Expand Down Expand Up @@ -155,7 +161,15 @@ func TestScraperWithLegacy(t *testing.T) {
},
},
}
st := mem.New()
st := disk.New("/tmp/specs")
c.Cleanup(func() {
ds, ok := st.(*disk.Storage)
if !ok {
return
}
err := ds.Cleanup()
c.Assert(err, qt.IsNil)
})
sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 }))
c.Assert(err, qt.IsNil)

Expand All @@ -180,7 +194,15 @@ func TestEmptyScrape(t *testing.T) {
cfg := &config.ServerConfig{
Services: nil,
}
st := mem.New()
st := disk.New("/tmp/specs")
c.Cleanup(func() {
ds, ok := st.(*disk.Storage)
if !ok {
return
}
err := ds.Cleanup()
c.Assert(err, qt.IsNil)
})
sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 }))
c.Assert(err, qt.IsNil)

Expand All @@ -198,7 +220,15 @@ func TestScrapeClientError(t *testing.T) {
cfg := &config.ServerConfig{
Services: []config.ServiceConfig{{Name: "nope", URL: "http://example.com/nope"}},
}
st := mem.New()
st := disk.New("/tmp/specs")
c.Cleanup(func() {
ds, ok := st.(*disk.Storage)
if !ok {
return
}
err := ds.Cleanup()
c.Assert(err, qt.IsNil)
})
sc, err := scraper.New(cfg, st,
scraper.Clock(func() time.Time { return t0 }),
scraper.HTTPClient(&http.Client{
Expand All @@ -221,84 +251,3 @@ type errorTransport struct{}
func (*errorTransport) RoundTrip(*http.Request) (*http.Response, error) {
return nil, fmt.Errorf("bad wolf")
}

func TestScraperCollation(t *testing.T) {
c := qt.New(t)

petfoodService, animalsService := setupHttpServers(c)
tests := []struct {
name, version, digest string
}{{
"petfood", "2021-09-01", "sha256:I20cAQ3VEjDrY7O0B678yq+0pYN2h3sxQy7vmdlo4+w=",
}, {
"animals", "2021-10-16", "sha256:P1FEFvnhtxJSqXr/p6fMNKE+HYwN6iwKccBGHIVZbyg=",
}}

cfg := &config.ServerConfig{
Services: []config.ServiceConfig{{
Name: "petfood", URL: petfoodService.URL,
}, {
Name: "animals", URL: animalsService.URL,
}},
}
memSt := mem.New()
st, ok := memSt.(*mem.Storage)
c.Assert(ok, qt.IsTrue)
sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 }))
c.Assert(err, qt.IsNil)

before, err := prometheus.DefaultGatherer.Gather()
c.Assert(err, qt.IsNil)

// Cancel the scrape context after a timeout so we don't hang the test
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
c.Cleanup(cancel)

// Run the scrape
err = sc.Run(ctx)
c.Assert(err, qt.IsNil)

// Version digests now known to storage
for _, test := range tests {
ok, err := st.HasVersion(ctx, test.name, test.version, test.digest)
c.Assert(err, qt.IsNil)
c.Assert(ok, qt.IsTrue)
}

collated, err := st.GetCollatedVersionSpecs()
c.Assert(err, qt.IsNil)
c.Assert(len(collated), qt.Equals, 4)

vi := st.VersionIndex()
c.Assert(len(vi.Versions()), qt.Equals, 4)
for _, version := range vi.Versions() {
specData, err := st.Version(ctx, version.String())
c.Assert(err, qt.IsNil)
l := openapi3.NewLoader()
spec, err := l.LoadFromData(specData)
c.Assert(err, qt.IsNil)
c.Assert(spec, qt.IsNotNil)
c.Assert(len(spec.Paths), qt.Equals, collatedPaths[version.String()])
}

// Assert metrics
after, err := prometheus.DefaultGatherer.Gather()
c.Assert(err, qt.IsNil)

c.Assert(testutil.SampleDelta("vu_scraper_run_duration_seconds", map[string]string{}, before, after),
qt.Equals, uint64(1))
c.Assert(testutil.SampleDelta("vu_scraper_run_error_total", map[string]string{}, before, after),
qt.Equals, uint64(0))
c.Assert(testutil.SampleDelta("vu_scraper_service_scrape_duration_seconds",
map[string]string{
"service": strings.Replace(petfoodService.URL, "http://", "", 1),
},
before, after,
), qt.Equals, uint64(1))
c.Assert(testutil.SampleDelta("vu_scraper_service_scrape_duration_seconds",
map[string]string{
"service": strings.Replace(animalsService.URL, "http://", "", 1),
},
before, after,
), qt.Equals, uint64(1))
}
Loading

0 comments on commit 9715877

Please sign in to comment.