diff --git a/catalogd/cmd/catalogd/main.go b/catalogd/cmd/catalogd/main.go index 77698444c..8a73d8bd3 100644 --- a/catalogd/cmd/catalogd/main.go +++ b/catalogd/cmd/catalogd/main.go @@ -294,9 +294,13 @@ func main() { os.Exit(1) } - localStorage = storage.LocalDirV1{RootDir: storeDir, RootURL: baseStorageURL} + localStorage = &storage.LocalDirV1{ + RootDir: storeDir, + RootURL: baseStorageURL, + EnableQueryHandler: features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler), + } - // Config for the the catalogd web server + // Config for the catalogd web server catalogServerConfig := serverutil.CatalogServerConfig{ ExternalAddr: externalAddr, CatalogAddr: catalogServerAddr, diff --git a/catalogd/internal/features/features.go b/catalogd/internal/features/features.go index 8f67b1689..1ab490854 100644 --- a/catalogd/internal/features/features.go +++ b/catalogd/internal/features/features.go @@ -5,7 +5,13 @@ import ( "k8s.io/component-base/featuregate" ) -var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{} +const ( + APIV1QueryHandler = featuregate.Feature("APIV1QueryHandler") +) + +var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ + APIV1QueryHandler: {Default: false, PreRelease: featuregate.Alpha}, +} var CatalogdFeatureGate featuregate.MutableFeatureGate = featuregate.NewFeatureGate() diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index 614da2b8b..6d3daf1fd 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -1,15 +1,18 @@ package serverutil import ( + "context" "crypto/tls" "fmt" "net" "net/http" "time" + "github.com/go-logr/logr" + "github.com/klauspost/compress/gzhttp" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/certwatcher" - "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/log" catalogdmetrics "github.com/operator-framework/operator-controller/catalogd/internal/metrics" "github.com/operator-framework/operator-controller/catalogd/internal/storage" @@ -39,13 +42,20 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil } shutdownTimeout := 30 * time.Second + handler := cfg.LocalStorage.StorageServerHandler() + handler = gzhttp.GzipHandler(handler) + handler = catalogdmetrics.AddMetricsToHandler(handler) + handler = newLoggingMiddleware(handler) catalogServer := manager.Server{ Name: "catalogs", OnlyServeWhenLeader: true, Server: &http.Server{ - Addr: cfg.CatalogAddr, - Handler: catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler()), + Addr: cfg.CatalogAddr, + Handler: handler, + BaseContext: func(_ net.Listener) context.Context { + return log.IntoContext(context.Background(), mgr.GetLogger().WithName("http.catalogs")) + }, ReadTimeout: 5 * time.Second, // TODO: Revert this to 10 seconds if/when the API // evolves to have significantly smaller responses @@ -62,3 +72,31 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil return nil } + +func newLoggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logger := logr.FromContextOrDiscard(r.Context()) + + start := time.Now() + lrw := &loggingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK} + next.ServeHTTP(lrw, r) + + logger.WithValues( + "method", r.Method, + "url", r.URL.String(), + "status", lrw.statusCode, + "duration", time.Since(start), + "remoteAddr", r.RemoteAddr, + ).Info("HTTP request processed") + }) +} + +type loggingResponseWriter struct { + http.ResponseWriter + statusCode int +} + +func (w *loggingResponseWriter) WriteHeader(code int) { + w.statusCode = code + w.ResponseWriter.WriteHeader(code) +} diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go new file mode 100644 index 000000000..20fe5f2ed --- /dev/null +++ b/catalogd/internal/storage/index.go @@ -0,0 +1,133 @@ +package storage + +import ( + "cmp" + "encoding/json" + "fmt" + "io" + "slices" + + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/operator-framework/operator-registry/alpha/declcfg" +) + +type index struct { + BySchema map[string][]section `json:"by_schema"` + ByPackage map[string][]section `json:"by_package"` + ByName map[string][]section `json:"by_name"` +} + +type section struct { + offset int64 + length int64 +} + +func (s *section) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`[%d,%d]`, s.offset, s.length)), nil +} + +func (s *section) UnmarshalJSON(b []byte) error { + vals := [2]int64{} + if err := json.Unmarshal(b, &vals); err != nil { + return err + } + s.offset = vals[0] + s.length = vals[1] + return nil +} + +func (i index) Size() int64 { + size := 0 + for k, v := range i.BySchema { + size += len(k) + len(v)*16 + } + for k, v := range i.ByPackage { + size += len(k) + len(v)*16 + } + for k, v := range i.ByName { + size += len(k) + len(v)*16 + } + return int64(size) +} + +func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.ReadSeeker, bool) { + sectionSet := i.getSectionSet(schema, packageName, name) + + sections := sectionSet.UnsortedList() + slices.SortFunc(sections, func(a, b section) int { + return cmp.Compare(a.offset, b.offset) + }) + + srs := make([]io.ReadSeeker, 0, len(sections)) + for _, s := range sections { + sr := io.NewSectionReader(r, s.offset, s.length) + srs = append(srs, sr) + } + return newMultiReadSeeker(srs...), true +} + +func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section] { + if schema == "" { + if packageName == "" { + if name == "" { + sectionSet := sets.New[section]() + for _, s := range i.BySchema { + sectionSet.Insert(s...) + } + return sectionSet + } else { + return sets.New[section](i.ByName[name]...) + } + } else { + sectionSet := sets.New[section](i.ByPackage[packageName]...) + if name == "" { + return sectionSet + } else { + return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) + } + } + } else { + sectionSet := sets.New[section](i.BySchema[schema]...) + if packageName == "" { + if name == "" { + return sectionSet + } else { + return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) + } + } else { + sectionSet = sectionSet.Intersection(sets.New[section](i.ByPackage[packageName]...)) + if name == "" { + return sectionSet + } else { + return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) + } + } + } +} + +func newIndex(metasChan <-chan *declcfg.Meta) (*index, error) { + idx := &index{ + BySchema: make(map[string][]section), + ByPackage: make(map[string][]section), + ByName: make(map[string][]section), + } + offset := int64(0) + for meta := range metasChan { + start := offset + length := int64(len(meta.Blob)) + offset += length + + s := section{offset: start, length: length} + if meta.Schema != "" { + idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s) + } + if meta.Package != "" { + idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s) + } + if meta.Name != "" { + idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) + } + } + return idx, nil +} diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index dd06729ea..db642e8f5 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -2,14 +2,20 @@ package storage import ( "context" + "encoding/json" + "errors" "fmt" + "io" "io/fs" "net/http" "net/url" "os" "path/filepath" + "sync" + "time" - "github.com/klauspost/compress/gzhttp" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/singleflight" "github.com/operator-framework/operator-registry/alpha/declcfg" ) @@ -20,95 +26,255 @@ import ( // done so that clients accessing the content stored in RootDir/catalogName have // atomic view of the content for a catalog. type LocalDirV1 struct { - RootDir string - RootURL *url.URL + RootDir string + RootURL *url.URL + EnableQueryHandler bool + + m sync.RWMutex + sf singleflight.Group } -const ( - v1ApiPath = "api/v1" - v1ApiData = "all" -) +func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { + s.m.Lock() + defer s.m.Unlock() -func (s LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { - fbcDir := filepath.Join(s.RootDir, catalog, v1ApiPath) - if err := os.MkdirAll(fbcDir, 0700); err != nil { + if err := os.MkdirAll(s.RootDir, 0700); err != nil { return err } - tempFile, err := os.CreateTemp(s.RootDir, fmt.Sprint(catalog)) + tmpCatalogDir, err := os.MkdirTemp(s.RootDir, fmt.Sprintf(".%s-*", catalog)) if err != nil { return err } - defer os.Remove(tempFile.Name()) - if err := declcfg.WalkMetasFS(ctx, fsys, func(path string, meta *declcfg.Meta, err error) error { + defer os.RemoveAll(tmpCatalogDir) + + storeMetaFuncs := []storeMetasFunc{storeCatalogData} + if s.EnableQueryHandler { + storeMetaFuncs = append(storeMetaFuncs, storeIndexData) + } + + var ( + eg, egCtx = errgroup.WithContext(ctx) + metaChans []chan *declcfg.Meta + ) + for i, f := range storeMetaFuncs { + metaChans = append(metaChans, make(chan *declcfg.Meta, 1)) + eg.Go(func() error { return f(tmpCatalogDir, metaChans[i]) }) + } + err = declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { if err != nil { return err } - _, err = tempFile.Write(meta.Blob) - return err - }); err != nil { + for _, ch := range metaChans { + select { + case ch <- meta: + case <-egCtx.Done(): + return egCtx.Err() + } + } + return nil + }, declcfg.WithConcurrency(1)) + for _, ch := range metaChans { + close(ch) + } + if err != nil { return fmt.Errorf("error walking FBC root: %w", err) } - fbcFile := filepath.Join(fbcDir, v1ApiData) - return os.Rename(tempFile.Name(), fbcFile) -} -func (s LocalDirV1) Delete(catalog string) error { - return os.RemoveAll(filepath.Join(s.RootDir, catalog)) -} + if err := eg.Wait(); err != nil { + return err + } -func (s LocalDirV1) BaseURL(catalog string) string { - return s.RootURL.JoinPath(catalog).String() + catalogDir := s.catalogDir(catalog) + return errors.Join( + os.RemoveAll(catalogDir), + os.Rename(tmpCatalogDir, catalogDir), + ) } -func (s LocalDirV1) StorageServerHandler() http.Handler { - mux := http.NewServeMux() - fsHandler := http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)})) - spHandler := http.StripPrefix(s.RootURL.Path, fsHandler) - gzHandler := gzhttp.GzipHandler(spHandler) +func (s *LocalDirV1) Delete(catalog string) error { + s.m.Lock() + defer s.m.Unlock() - typeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/jsonl") - gzHandler.ServeHTTP(w, r) - }) - mux.Handle(s.RootURL.Path, typeHandler) - return mux + return os.RemoveAll(s.catalogDir(catalog)) } -func (s LocalDirV1) ContentExists(catalog string) bool { - file, err := os.Stat(filepath.Join(s.RootDir, catalog, v1ApiPath, v1ApiData)) +func (s *LocalDirV1) ContentExists(catalog string) bool { + s.m.RLock() + defer s.m.RUnlock() + + catalogFileStat, err := os.Stat(catalogFilePath(s.catalogDir(catalog))) if err != nil { return false } - if !file.Mode().IsRegular() { + if !catalogFileStat.Mode().IsRegular() { // path is not valid content return false } + + if s.EnableQueryHandler { + indexFileStat, err := os.Stat(catalogIndexFilePath(s.catalogDir(catalog))) + if err != nil { + return false + } + if !indexFileStat.Mode().IsRegular() { + return false + } + } return true } -// filesOnlyFilesystem is a file system that can open only regular -// files from the underlying filesystem. All other file types result -// in os.ErrNotExists -type filesOnlyFilesystem struct { - FS fs.FS +func (s *LocalDirV1) catalogDir(catalog string) string { + return filepath.Join(s.RootDir, catalog) +} + +func catalogFilePath(catalogDir string) string { + return filepath.Join(catalogDir, "catalog.jsonl") +} + +func catalogIndexFilePath(catalogDir string) string { + return filepath.Join(catalogDir, "index.json") } -// Open opens a named file from the underlying filesystem. If the file -// is not a regular file, it return os.ErrNotExists. Callers are resposible -// for closing the file returned. -func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) { - file, err := f.FS.Open(name) +type storeMetasFunc func(catalogDir string, metaChan <-chan *declcfg.Meta) error + +func storeCatalogData(catalogDir string, metas <-chan *declcfg.Meta) error { + f, err := os.Create(catalogFilePath(catalogDir)) if err != nil { - return nil, err + return err + } + defer f.Close() + + for m := range metas { + if _, err := f.Write(m.Blob); err != nil { + return err + } } - stat, err := file.Stat() + return nil +} + +func storeIndexData(catalogDir string, metas <-chan *declcfg.Meta) error { + idx, err := newIndex(metas) if err != nil { - _ = file.Close() - return nil, err + return err + } + + f, err := os.Create(catalogIndexFilePath(catalogDir)) + if err != nil { + return err + } + defer f.Close() + + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + return enc.Encode(idx) +} + +func (s *LocalDirV1) BaseURL(catalog string) string { + return s.RootURL.JoinPath(catalog).String() +} + +func (s *LocalDirV1) StorageServerHandler() http.Handler { + mux := http.NewServeMux() + + mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path, s.handleV1All) + if s.EnableQueryHandler { + mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path, s.handleV1Query) + } + return mux +} + +func (s *LocalDirV1) handleV1All(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() + + catalog := r.PathValue("catalog") + catalogFile, catalogStat, err := s.catalogData(catalog) + if err != nil { + httpError(w, err) + return + } + serveJsonLines(w, r, catalogStat.ModTime(), catalogFile) +} + +func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() + + catalog := r.PathValue("catalog") + catalogFile, catalogStat, err := s.catalogData(catalog) + if err != nil { + httpError(w, err) + return + } + defer catalogFile.Close() + + schema := r.URL.Query().Get("schema") + pkg := r.URL.Query().Get("package") + name := r.URL.Query().Get("name") + + if schema == "" && pkg == "" && name == "" { + // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) + serveJsonLines(w, r, catalogStat.ModTime(), catalogFile) + return } - if !stat.Mode().IsRegular() { - _ = file.Close() - return nil, os.ErrNotExist + idx, err := s.getIndex(catalog) + if err != nil { + httpError(w, err) + return + } + indexReader, ok := idx.Get(catalogFile, schema, pkg, name) + if !ok { + httpError(w, fs.ErrNotExist) + return + } + serveJsonLines(w, r, catalogStat.ModTime(), indexReader) +} + +func (s *LocalDirV1) catalogData(catalog string) (*os.File, os.FileInfo, error) { + catalogFile, err := os.Open(catalogFilePath(s.catalogDir(catalog))) + if err != nil { + return nil, nil, err + } + catalogFileStat, err := catalogFile.Stat() + if err != nil { + return nil, nil, err + } + return catalogFile, catalogFileStat, nil +} + +func httpError(w http.ResponseWriter, err error) { + var code int + switch { + case errors.Is(err, fs.ErrNotExist): + code = http.StatusNotFound + case errors.Is(err, fs.ErrPermission): + code = http.StatusForbidden + default: + code = http.StatusInternalServerError + } + http.Error(w, fmt.Sprintf("%d %s", code, http.StatusText(code)), code) +} + +func serveJsonLines(w http.ResponseWriter, r *http.Request, modTime time.Time, rs io.ReadSeeker) { + w.Header().Add("Content-Type", "application/jsonl") + http.ServeContent(w, r, "", modTime, rs) +} + +func (s *LocalDirV1) getIndex(catalog string) (*index, error) { + idx, err, _ := s.sf.Do(catalog, func() (interface{}, error) { + indexFile, err := os.Open(catalogIndexFilePath(s.catalogDir(catalog))) + if err != nil { + return nil, err + } + defer indexFile.Close() + var idx index + if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { + return nil, err + } + return &idx, nil + }) + if err != nil { + return nil, err } - return file, nil + return idx.(*index), nil } diff --git a/catalogd/internal/storage/localdir_test.go b/catalogd/internal/storage/localdir_test.go index c975c8fc9..0e84d3adc 100644 --- a/catalogd/internal/storage/localdir_test.go +++ b/catalogd/internal/storage/localdir_test.go @@ -56,7 +56,7 @@ var _ = Describe("LocalDir Storage Test", func() { rootDir = d baseURL = &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix} - store = LocalDirV1{RootDir: rootDir, RootURL: baseURL} + store = &LocalDirV1{RootDir: rootDir, RootURL: baseURL} unpackResultFS = &fstest.MapFS{ "bundle.yaml": &fstest.MapFile{Data: []byte(testBundle), Mode: os.ModePerm}, "package.yaml": &fstest.MapFile{Data: []byte(testPackage), Mode: os.ModePerm}, @@ -69,14 +69,13 @@ var _ = Describe("LocalDir Storage Test", func() { Expect(err).To(Not(HaveOccurred())) }) It("should store the content in the RootDir correctly", func() { - fbcDir := filepath.Join(rootDir, catalog, v1ApiPath) - fbcFile := filepath.Join(fbcDir, v1ApiData) + fbcFile := filepath.Join(rootDir, fmt.Sprintf("%s.jsonl", catalog)) _, err := os.Stat(fbcFile) Expect(err).To(Not(HaveOccurred())) gotConfig, err := declcfg.LoadFS(ctx, unpackResultFS) Expect(err).To(Not(HaveOccurred())) - storedConfig, err := declcfg.LoadFile(os.DirFS(fbcDir), v1ApiData) + storedConfig, err := declcfg.LoadFile(os.DirFS(filepath.Dir(fbcFile)), filepath.Base(fbcFile)) Expect(err).To(Not(HaveOccurred())) diff := cmp.Diff(gotConfig, storedConfig) Expect(diff).To(Equal("")) @@ -93,10 +92,15 @@ var _ = Describe("LocalDir Storage Test", func() { Expect(err).To(Not(HaveOccurred())) }) It("should delete the FBC from the cache directory", func() { - fbcFile := filepath.Join(rootDir, catalog) + fbcFile := filepath.Join(rootDir, fmt.Sprintf("%s.jsonl", catalog)) _, err := os.Stat(fbcFile) Expect(err).To(HaveOccurred()) Expect(os.IsNotExist(err)).To(BeTrue()) + + indexFile := filepath.Join(rootDir, fmt.Sprintf("%s.index.json", catalog)) + _, err = os.Stat(indexFile) + Expect(err).To(HaveOccurred()) + Expect(os.IsNotExist(err)).To(BeTrue()) }) It("should report content does not exist", func() { Expect(store.ContentExists(catalog)).To(BeFalse()) @@ -111,9 +115,7 @@ var _ = Describe("LocalDir Server Handler tests", func() { store LocalDirV1 ) BeforeEach(func() { - d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache") - Expect(err).ToNot(HaveOccurred()) - Expect(os.MkdirAll(filepath.Join(d, "test-catalog", v1ApiPath), 0700)).To(Succeed()) + d := GinkgoT().TempDir() store = LocalDirV1{RootDir: d, RootURL: &url.URL{Path: urlPrefix}} testServer = httptest.NewServer(store.StorageServerHandler()) @@ -127,33 +129,32 @@ var _ = Describe("LocalDir Server Handler tests", func() { It("gets 404 for the path /catalogs/test-catalog/", func() { expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/")) }) - It("gets 404 for the path /test-catalog/foo.txt", func() { - // This ensures that even if the file exists, the URL must contain the /catalogs/ prefix - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), []byte("bar"), 0600)).To(Succeed()) - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/test-catalog/foo.txt")) + It("gets 404 for the path /catalogs/test-catalog/api", func() { + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api")) }) - It("gets 404 for the path /catalogs/test-catalog/non-existent.txt", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/non-existent.txt")) + It("gets 404 for the path /catalogs/test-catalog/api/v1", func() { + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1")) }) - It("gets 200 for the path /catalogs/foo.txt", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "foo.txt"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/foo.txt"), expectedContent) + It("gets 404 for the path /catalogs/test-catalog.jsonl", func() { + // This is actually how the file is stored, but we don't serve + // the filesystem, we serve an API. Hence, expect 404 not found + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), []byte("foobar"), 0600)).To(Succeed()) + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog.jsonl")) }) - It("gets 200 for the path /catalogs/test-catalog/foo.txt", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/foo.txt"), expectedContent) + It("gets 200 for the path /catalogs/test-catalog/api/v1/all", func() { + expectedContent := []byte(`{"foo":"bar"}`) + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, false) }) It("ignores accept-encoding for the path /catalogs/test-catalog/api/v1/all with size < 1400 bytes", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", v1ApiPath, v1ApiData), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent) + expectedContent := []byte(`{"foo":"bar"}`) + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, false) }) It("provides gzipped content for the path /catalogs/test-catalog/api/v1/all with size > 1400 bytes", func() { expectedContent := []byte(testCompressableJSON) - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", v1ApiPath, v1ApiData), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent) + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, true) }) It("provides json-lines format for the served JSON catalog", func() { catalog := "test-catalog" @@ -165,9 +166,9 @@ var _ = Describe("LocalDir Server Handler tests", func() { expectedContent, err := generateJSONLines([]byte(testCompressableJSON)) Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, v1ApiPath, v1ApiData) + path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, "api", "v1", "all") Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent)) + expectFound(path, []byte(expectedContent), true) }) It("provides json-lines format for the served YAML catalog", func() { catalog := "test-catalog" @@ -181,9 +182,9 @@ var _ = Describe("LocalDir Server Handler tests", func() { expectedContent, err := generateJSONLines(yamlData) Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, v1ApiPath, v1ApiData) + path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, "api", "v1", "all") Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent)) + expectFound(path, []byte(expectedContent), true) }) AfterEach(func() { testServer.Close() @@ -197,7 +198,7 @@ func expectNotFound(url string) { Expect(resp.Body.Close()).To(Succeed()) } -func expectFound(url string, expectedContent []byte) { +func expectFound(url string, expectedContent []byte, expectCompression bool) { req, err := http.NewRequest(http.MethodGet, url, nil) Expect(err).To(Not(HaveOccurred())) req.Header.Set("Accept-Encoding", "gzip") @@ -206,15 +207,16 @@ func expectFound(url string, expectedContent []byte) { Expect(resp.StatusCode).To(Equal(http.StatusOK)) var actualContent []byte - switch resp.Header.Get("Content-Encoding") { - case "gzip": + if expectCompression { + Expect(resp.Header.Get("Content-Encoding")).To(Equal("gzip")) Expect(len(expectedContent)).To(BeNumerically(">", 1400), fmt.Sprintf("gzipped content should only be provided for content larger than 1400 bytes, but our expected content is only %d bytes", len(expectedContent))) gz, err := gzip.NewReader(resp.Body) Expect(err).To(Not(HaveOccurred())) actualContent, err = io.ReadAll(gz) Expect(err).To(Not(HaveOccurred())) - default: + } else { + Expect(resp.Header.Get("Content-Encoding")).To(BeEmpty()) actualContent, err = io.ReadAll(resp.Body) Expect(len(expectedContent)).To(BeNumerically("<", 1400), fmt.Sprintf("plaintext content should only be provided for content smaller than 1400 bytes, but we received plaintext for %d bytes\n expectedContent:\n%s\n", len(expectedContent), expectedContent)) diff --git a/catalogd/internal/storage/multireadseeker.go b/catalogd/internal/storage/multireadseeker.go new file mode 100644 index 000000000..f1e3e1cd3 --- /dev/null +++ b/catalogd/internal/storage/multireadseeker.go @@ -0,0 +1,118 @@ +package storage + +import ( + "io" + "os" +) + +type doubleReadSeeker struct { + rs1, rs2 io.ReadSeeker + rs1len, rs2len int64 + second bool + pos int64 +} + +func (r *doubleReadSeeker) Seek(offset int64, whence int) (int64, error) { + var err error + switch whence { + case os.SEEK_SET: + if offset < r.rs1len { + r.second = false + r.pos, err = r.rs1.Seek(offset, os.SEEK_SET) + return r.pos, err + } else { + r.second = true + r.pos, err = r.rs2.Seek(offset-r.rs1len, os.SEEK_SET) + r.pos += r.rs1len + return r.pos, err + } + case os.SEEK_END: // negative offset + return r.Seek(r.rs1len+r.rs2len+offset-1, os.SEEK_SET) + default: // os.SEEK_CUR + return r.Seek(r.pos+offset, os.SEEK_SET) + } +} + +func (r *doubleReadSeeker) Read(p []byte) (n int, err error) { + switch { + case r.pos >= r.rs1len: // read only from the second reader + n, err := r.rs2.Read(p) + r.pos += int64(n) + return n, err + case r.pos+int64(len(p)) <= r.rs1len: // read only from the first reader + n, err := r.rs1.Read(p) + r.pos += int64(n) + return n, err + default: // read on the border - end of first reader and start of second reader + n1, err := r.rs1.Read(p) + r.pos += int64(n1) + if r.pos != r.rs1len || (err != nil && err != io.EOF) { + // Read() might not read all, return + // If error (but not EOF), return + return n1, err + } + _, err = r.rs2.Seek(0, os.SEEK_SET) + if err != nil { + return n1, err + } + r.second = true + n2, err := r.rs2.Read(p[n1:]) + r.pos += int64(n2) + return n1 + n2, err + } +} + +func multiReadSeeker(rs []io.ReadSeeker, leftmost bool) (io.ReadSeeker, int64, error) { + if len(rs) == 1 { + r := rs[0] + l, err := r.Seek(0, io.SeekEnd) + if err != nil { + return nil, 0, err + } + if leftmost { + _, err = r.Seek(0, io.SeekStart) + } + return r, l, err + } else { + rs1, l1, err := multiReadSeeker(rs[:len(rs)/2], leftmost) + if err != nil { + return nil, 0, err + } + rs2, l2, err := multiReadSeeker(rs[len(rs)/2:], false) + if err != nil { + return nil, 0, err + } + return &doubleReadSeeker{rs1, rs2, l1, l2, false, 0}, l1 + l2, nil + } +} + +type emptyReadSeeker struct{} + +func (r *emptyReadSeeker) Read(p []byte) (n int, err error) { + return 0, io.EOF +} + +func (r *emptyReadSeeker) Seek(offset int64, whence int) (int64, error) { + return 0, io.EOF +} + +// newMultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided +// input readseekers. After calling this method the initial position is set to the +// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances +// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker. +// Seek can be used over the sum of lengths of all readseekers. +// +// When a newMultiReadSeeker is used, no Read and Seek operations should be made on +// its ReadSeeker components and the length of the readseekers should not change. +// Also, users should make no assumption on the state of individual readseekers +// while the newMultiReadSeeker is used. +func newMultiReadSeeker(rs ...io.ReadSeeker) io.ReadSeeker { + if len(rs) == 0 { + return &emptyReadSeeker{} + } + r, _, err := multiReadSeeker(rs, true) + if err != nil { + return &emptyReadSeeker{} + } + return r +} diff --git a/catalogd/internal/storage/storage.go b/catalogd/internal/storage/storage.go index 458ff040b..af78a669f 100644 --- a/catalogd/internal/storage/storage.go +++ b/catalogd/internal/storage/storage.go @@ -13,7 +13,8 @@ import ( type Instance interface { Store(ctx context.Context, catalog string, fsys fs.FS) error Delete(catalog string) error + ContentExists(catalog string) bool + BaseURL(catalog string) string StorageServerHandler() http.Handler - ContentExists(catalog string) bool }