From 116e886420dea55034daff78ced8593f8e2fe0cb Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Fri, 6 Dec 2024 09:20:39 -0500 Subject: [PATCH 1/4] add a new feature-gated api/v1/query endpoint Signed-off-by: Joe Lanford --- catalogd/cmd/catalogd/main.go | 4 +- catalogd/internal/features/features.go | 8 +- catalogd/internal/storage/index.go | 136 +++++++++++ catalogd/internal/storage/localdir.go | 256 +++++++++++++++++---- catalogd/internal/storage/localdir_test.go | 70 +++--- 5 files changed, 387 insertions(+), 87 deletions(-) create mode 100644 catalogd/internal/storage/index.go diff --git a/catalogd/cmd/catalogd/main.go b/catalogd/cmd/catalogd/main.go index 77698444c..062b44b78 100644 --- a/catalogd/cmd/catalogd/main.go +++ b/catalogd/cmd/catalogd/main.go @@ -294,9 +294,9 @@ func main() { os.Exit(1) } - localStorage = storage.LocalDirV1{RootDir: storeDir, RootURL: baseStorageURL} + localStorage = &storage.LocalDirV1{RootDir: storeDir, RootURL: baseStorageURL} - // Config for the the catalogd web server + // Config for the catalogd web server catalogServerConfig := serverutil.CatalogServerConfig{ ExternalAddr: externalAddr, CatalogAddr: catalogServerAddr, diff --git a/catalogd/internal/features/features.go b/catalogd/internal/features/features.go index 8f67b1689..1ab490854 100644 --- a/catalogd/internal/features/features.go +++ b/catalogd/internal/features/features.go @@ -5,7 +5,13 @@ import ( "k8s.io/component-base/featuregate" ) -var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{} +const ( + APIV1QueryHandler = featuregate.Feature("APIV1QueryHandler") +) + +var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ + APIV1QueryHandler: {Default: false, PreRelease: featuregate.Alpha}, +} var CatalogdFeatureGate featuregate.MutableFeatureGate = featuregate.NewFeatureGate() diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go new file mode 100644 index 000000000..b80b2646a --- /dev/null +++ b/catalogd/internal/storage/index.go @@ -0,0 +1,136 @@ +package storage + +import ( + "cmp" + "encoding/json" + "errors" + "fmt" + "io" + "slices" + + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/operator-framework/operator-registry/alpha/declcfg" +) + +type index struct { + BySchema map[string][]section `json:"by_schema"` + ByPackage map[string][]section `json:"by_package"` + ByName map[string][]section `json:"by_name"` +} + +type section struct { + offset int64 + length int64 +} + +func (s *section) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`[%d,%d]`, s.offset, s.length)), nil +} + +func (s *section) UnmarshalJSON(b []byte) error { + vals := [2]int64{} + if err := json.Unmarshal(b, &vals); err != nil { + return err + } + s.offset = vals[0] + s.length = vals[1] + return nil +} + +func (i index) Size() int64 { + size := 0 + for k, v := range i.BySchema { + size += len(k) + len(v)*16 + } + for k, v := range i.ByPackage { + size += len(k) + len(v)*16 + } + for k, v := range i.ByName { + size += len(k) + len(v)*16 + } + return int64(size) +} + +func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.Reader, bool) { + sectionSet := i.getSectionSet(schema, packageName, name) + + sections := sectionSet.UnsortedList() + slices.SortFunc(sections, func(a, b section) int { + return cmp.Compare(a.offset, b.offset) + }) + + srs := make([]io.Reader, 0, len(sections)) + for _, s := range sections { + sr := io.NewSectionReader(r, s.offset, s.length) + srs = append(srs, sr) + } + return io.MultiReader(srs...), true +} + +func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section] { + if schema == "" { + if packageName == "" { + if name == "" { + sectionSet := sets.New[section]() + for _, s := range i.BySchema { + sectionSet.Insert(s...) + } + return sectionSet + } else { + return sets.New[section](i.ByName[name]...) + } + } else { + sectionSet := sets.New[section](i.ByPackage[packageName]...) + if name == "" { + return sectionSet + } else { + return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) + } + } + } else { + sectionSet := sets.New[section](i.BySchema[schema]...) + if packageName == "" { + if name == "" { + return sectionSet + } else { + return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) + } + } else { + sectionSet = sectionSet.Intersection(sets.New[section](i.ByPackage[packageName]...)) + if name == "" { + return sectionSet + } else { + return sectionSet.Intersection(sets.New[section](i.ByName[name]...)) + } + } + } +} + +func newIndex(r io.Reader) (*index, error) { + idx := &index{ + BySchema: make(map[string][]section), + ByPackage: make(map[string][]section), + ByName: make(map[string][]section), + } + var meta declcfg.Meta + dec := json.NewDecoder(r) + for { + i1 := dec.InputOffset() + if err := dec.Decode(&meta); err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, err + } + i2 := dec.InputOffset() + start := i1 + length := i2 - i1 + + s := section{offset: start, length: length} + idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s) + idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s) + idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) + } + return idx, nil +} diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index dd06729ea..7a0b67e99 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -2,16 +2,24 @@ package storage import ( "context" + "encoding/json" + "errors" "fmt" + "io" "io/fs" "net/http" "net/url" "os" "path/filepath" + "strings" + "sync" "github.com/klauspost/compress/gzhttp" + "golang.org/x/sync/errgroup" "github.com/operator-framework/operator-registry/alpha/declcfg" + + "github.com/operator-framework/catalogd/internal/features" ) // LocalDirV1 is a storage Instance. When Storing a new FBC contained in @@ -22,93 +30,241 @@ import ( type LocalDirV1 struct { RootDir string RootURL *url.URL + + m sync.RWMutex } -const ( - v1ApiPath = "api/v1" - v1ApiData = "all" -) +func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { + s.m.Lock() + defer s.m.Unlock() + + if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + return s.storeCatalogFileAndIndex(ctx, catalog, fsys) + } + return s.storeCatalogFile(ctx, catalog, fsys) +} -func (s LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { - fbcDir := filepath.Join(s.RootDir, catalog, v1ApiPath) - if err := os.MkdirAll(fbcDir, 0700); err != nil { +func (s *LocalDirV1) storeCatalogFile(ctx context.Context, catalog string, fsys fs.FS) error { + if err := os.MkdirAll(s.RootDir, 0700); err != nil { return err } - tempFile, err := os.CreateTemp(s.RootDir, fmt.Sprint(catalog)) + tmpCatalogFile, err := os.CreateTemp(s.RootDir, fmt.Sprintf(".%s-*.jsonl", catalog)) if err != nil { return err } - defer os.Remove(tempFile.Name()) + defer os.Remove(tmpCatalogFile.Name()) + if err := declcfg.WalkMetasFS(ctx, fsys, func(path string, meta *declcfg.Meta, err error) error { if err != nil { return err } - _, err = tempFile.Write(meta.Blob) + _, err = tmpCatalogFile.Write(meta.Blob) return err }); err != nil { return fmt.Errorf("error walking FBC root: %w", err) } - fbcFile := filepath.Join(fbcDir, v1ApiData) - return os.Rename(tempFile.Name(), fbcFile) + + fbcFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) + return os.Rename(tmpCatalogFile.Name(), fbcFile) } -func (s LocalDirV1) Delete(catalog string) error { - return os.RemoveAll(filepath.Join(s.RootDir, catalog)) +func (s *LocalDirV1) storeCatalogFileAndIndex(ctx context.Context, catalog string, fsys fs.FS) error { + if err := os.MkdirAll(s.RootDir, 0700); err != nil { + return err + } + tmpCatalogFile, err := os.CreateTemp(s.RootDir, fmt.Sprintf(".%s-*.jsonl", catalog)) + if err != nil { + return err + } + defer os.Remove(tmpCatalogFile.Name()) + + tmpIndexFile, err := os.CreateTemp(s.RootDir, filepath.Base(fmt.Sprintf("%s.index.json", strings.TrimSuffix(tmpCatalogFile.Name(), ".jsonl")))) + if err != nil { + return err + } + defer os.Remove(tmpIndexFile.Name()) + + pr, pw := io.Pipe() + mw := io.MultiWriter(tmpCatalogFile, pw) + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + if err := declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { + if err != nil { + return err + } + _, err = mw.Write(meta.Blob) + if err != nil { + return pw.CloseWithError(err) + } + return nil + }, declcfg.WithConcurrency(1)); err != nil { + return fmt.Errorf("error walking FBC root: %w", err) + } + return pw.CloseWithError(tmpCatalogFile.Close()) + }) + eg.Go(func() error { + idx, err := newIndex(pr) + if err != nil { + return pr.CloseWithError(err) + } + if err := pr.Close(); err != nil { + return err + } + enc := json.NewEncoder(tmpIndexFile) + if err := enc.Encode(idx); err != nil { + return err + } + if err := tmpIndexFile.Close(); err != nil { + return err + } + return nil + }) + if err := eg.Wait(); err != nil { + return err + } + + fbcFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) + fbcIndexFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)) + return errors.Join( + os.Rename(tmpCatalogFile.Name(), fbcFile), + os.Rename(tmpIndexFile.Name(), fbcIndexFile), + ) } -func (s LocalDirV1) BaseURL(catalog string) string { +func (s *LocalDirV1) Delete(catalog string) error { + s.m.Lock() + defer s.m.Unlock() + + var errs []error + errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)))) + + if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)))) + } + return errors.Join(errs...) +} + +func (s *LocalDirV1) BaseURL(catalog string) string { return s.RootURL.JoinPath(catalog).String() } -func (s LocalDirV1) StorageServerHandler() http.Handler { +func (s *LocalDirV1) StorageServerHandler() http.Handler { mux := http.NewServeMux() - fsHandler := http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)})) - spHandler := http.StripPrefix(s.RootURL.Path, fsHandler) - gzHandler := gzhttp.GzipHandler(spHandler) + + v1AllPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path + mux.Handle(v1AllPath, s.v1AllHandler()) + + if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + v1QueryPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path + mux.Handle(v1QueryPath, s.v1QueryHandler()) + } + return mux +} + +func (s *LocalDirV1) v1AllHandler() http.Handler { + catalogHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() + + catalog := r.PathValue("catalog") + http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) + }) + gzHandler := gzhttp.GzipHandler(catalogHandler) typeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/jsonl") gzHandler.ServeHTTP(w, r) }) - mux.Handle(s.RootURL.Path, typeHandler) - return mux + return typeHandler } -func (s LocalDirV1) ContentExists(catalog string) bool { - file, err := os.Stat(filepath.Join(s.RootDir, catalog, v1ApiPath, v1ApiData)) +func (s *LocalDirV1) v1QueryHandler() http.Handler { + catalogHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() + + catalog := r.PathValue("catalog") + schema := r.URL.Query().Get("schema") + pkg := r.URL.Query().Get("package") + name := r.URL.Query().Get("name") + + catalogFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) + catalogFileStat, err := os.Stat(catalogFilePath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + http.Error(w, "Catalog not found", http.StatusNotFound) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + modTime := catalogFileStat.ModTime().Format(http.TimeFormat) + if r.Header.Get("If-Modified-Since") == modTime { + w.WriteHeader(http.StatusNotModified) + return + } + + catalogFile, err := os.Open(catalogFilePath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + http.Error(w, "Catalog not found", http.StatusNotFound) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer catalogFile.Close() + + indexFile, err := os.Open(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog))) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + http.Error(w, "No catalog contents found matching query", http.StatusNotFound) + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer indexFile.Close() + + var idx index + if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + queryReader, ok := idx.Get(catalogFile, schema, pkg, name) + if !ok { + http.Error(w, fmt.Sprintf("No index found for schema=%q, package=%q, name=%q", schema, pkg, name), http.StatusInternalServerError) + return + } + w.Header().Add("Content-Type", "application/jsonl") + w.Header().Set("Last-Modified", modTime) + _, _ = io.Copy(w, queryReader) + }) + gzHandler := gzhttp.GzipHandler(catalogHandler) + return gzHandler +} + +func (s *LocalDirV1) ContentExists(catalog string) bool { + s.m.RLock() + defer s.m.RUnlock() + + catalogFileStat, err := os.Stat(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) if err != nil { return false } - if !file.Mode().IsRegular() { + if !catalogFileStat.Mode().IsRegular() { // path is not valid content return false } - return true -} -// filesOnlyFilesystem is a file system that can open only regular -// files from the underlying filesystem. All other file types result -// in os.ErrNotExists -type filesOnlyFilesystem struct { - FS fs.FS -} - -// Open opens a named file from the underlying filesystem. If the file -// is not a regular file, it return os.ErrNotExists. Callers are resposible -// for closing the file returned. -func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) { - file, err := f.FS.Open(name) - if err != nil { - return nil, err - } - stat, err := file.Stat() - if err != nil { - _ = file.Close() - return nil, err - } - if !stat.Mode().IsRegular() { - _ = file.Close() - return nil, os.ErrNotExist + if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + indexFileStat, err := os.Stat(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog))) + if err != nil { + return false + } + if !indexFileStat.Mode().IsRegular() { + return false + } } - return file, nil + return true } diff --git a/catalogd/internal/storage/localdir_test.go b/catalogd/internal/storage/localdir_test.go index c975c8fc9..3d7f7f8a0 100644 --- a/catalogd/internal/storage/localdir_test.go +++ b/catalogd/internal/storage/localdir_test.go @@ -69,14 +69,13 @@ var _ = Describe("LocalDir Storage Test", func() { Expect(err).To(Not(HaveOccurred())) }) It("should store the content in the RootDir correctly", func() { - fbcDir := filepath.Join(rootDir, catalog, v1ApiPath) - fbcFile := filepath.Join(fbcDir, v1ApiData) + fbcFile := filepath.Join(rootDir, fmt.Sprintf("%s.jsonl", catalog)) _, err := os.Stat(fbcFile) Expect(err).To(Not(HaveOccurred())) gotConfig, err := declcfg.LoadFS(ctx, unpackResultFS) Expect(err).To(Not(HaveOccurred())) - storedConfig, err := declcfg.LoadFile(os.DirFS(fbcDir), v1ApiData) + storedConfig, err := declcfg.LoadFile(os.DirFS(filepath.Dir(fbcFile)), filepath.Base(fbcFile)) Expect(err).To(Not(HaveOccurred())) diff := cmp.Diff(gotConfig, storedConfig) Expect(diff).To(Equal("")) @@ -93,10 +92,15 @@ var _ = Describe("LocalDir Storage Test", func() { Expect(err).To(Not(HaveOccurred())) }) It("should delete the FBC from the cache directory", func() { - fbcFile := filepath.Join(rootDir, catalog) + fbcFile := filepath.Join(rootDir, fmt.Sprintf("%s.jsonl", catalog)) _, err := os.Stat(fbcFile) Expect(err).To(HaveOccurred()) Expect(os.IsNotExist(err)).To(BeTrue()) + + indexFile := filepath.Join(rootDir, fmt.Sprintf("%s.index.json", catalog)) + _, err = os.Stat(indexFile) + Expect(err).To(HaveOccurred()) + Expect(os.IsNotExist(err)).To(BeTrue()) }) It("should report content does not exist", func() { Expect(store.ContentExists(catalog)).To(BeFalse()) @@ -111,9 +115,7 @@ var _ = Describe("LocalDir Server Handler tests", func() { store LocalDirV1 ) BeforeEach(func() { - d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache") - Expect(err).ToNot(HaveOccurred()) - Expect(os.MkdirAll(filepath.Join(d, "test-catalog", v1ApiPath), 0700)).To(Succeed()) + d := GinkgoT().TempDir() store = LocalDirV1{RootDir: d, RootURL: &url.URL{Path: urlPrefix}} testServer = httptest.NewServer(store.StorageServerHandler()) @@ -127,33 +129,32 @@ var _ = Describe("LocalDir Server Handler tests", func() { It("gets 404 for the path /catalogs/test-catalog/", func() { expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/")) }) - It("gets 404 for the path /test-catalog/foo.txt", func() { - // This ensures that even if the file exists, the URL must contain the /catalogs/ prefix - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), []byte("bar"), 0600)).To(Succeed()) - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/test-catalog/foo.txt")) + It("gets 404 for the path /catalogs/test-catalog/api", func() { + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api")) }) - It("gets 404 for the path /catalogs/test-catalog/non-existent.txt", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/non-existent.txt")) + It("gets 404 for the path /catalogs/test-catalog/api/v1", func() { + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1")) }) - It("gets 200 for the path /catalogs/foo.txt", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "foo.txt"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/foo.txt"), expectedContent) + It("gets 404 for the path /catalogs/test-catalog.jsonl", func() { + // This is actually how the file is stored, but we don't serve + // the filesystem, we serve an API. Hence, expect 404 not found + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), []byte("foobar"), 0600)).To(Succeed()) + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog.jsonl")) }) - It("gets 200 for the path /catalogs/test-catalog/foo.txt", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/foo.txt"), expectedContent) + It("gets 200 for the path /catalogs/test-catalog/api/v1/all", func() { + expectedContent := []byte(`{"foo":"bar"}`) + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, false) }) It("ignores accept-encoding for the path /catalogs/test-catalog/api/v1/all with size < 1400 bytes", func() { - expectedContent := []byte("bar") - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", v1ApiPath, v1ApiData), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent) + expectedContent := []byte(`{"foo":"bar"}`) + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, false) }) It("provides gzipped content for the path /catalogs/test-catalog/api/v1/all with size > 1400 bytes", func() { expectedContent := []byte(testCompressableJSON) - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", v1ApiPath, v1ApiData), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent) + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, true) }) It("provides json-lines format for the served JSON catalog", func() { catalog := "test-catalog" @@ -165,9 +166,9 @@ var _ = Describe("LocalDir Server Handler tests", func() { expectedContent, err := generateJSONLines([]byte(testCompressableJSON)) Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, v1ApiPath, v1ApiData) + path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, "api", "v1", "all") Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent)) + expectFound(path, []byte(expectedContent), true) }) It("provides json-lines format for the served YAML catalog", func() { catalog := "test-catalog" @@ -181,9 +182,9 @@ var _ = Describe("LocalDir Server Handler tests", func() { expectedContent, err := generateJSONLines(yamlData) Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, v1ApiPath, v1ApiData) + path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, "api", "v1", "all") Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent)) + expectFound(path, []byte(expectedContent), true) }) AfterEach(func() { testServer.Close() @@ -197,7 +198,7 @@ func expectNotFound(url string) { Expect(resp.Body.Close()).To(Succeed()) } -func expectFound(url string, expectedContent []byte) { +func expectFound(url string, expectedContent []byte, expectCompression bool) { req, err := http.NewRequest(http.MethodGet, url, nil) Expect(err).To(Not(HaveOccurred())) req.Header.Set("Accept-Encoding", "gzip") @@ -206,15 +207,16 @@ func expectFound(url string, expectedContent []byte) { Expect(resp.StatusCode).To(Equal(http.StatusOK)) var actualContent []byte - switch resp.Header.Get("Content-Encoding") { - case "gzip": + if expectCompression { + Expect(resp.Header.Get("Content-Encoding")).To(Equal("gzip")) Expect(len(expectedContent)).To(BeNumerically(">", 1400), fmt.Sprintf("gzipped content should only be provided for content larger than 1400 bytes, but our expected content is only %d bytes", len(expectedContent))) gz, err := gzip.NewReader(resp.Body) Expect(err).To(Not(HaveOccurred())) actualContent, err = io.ReadAll(gz) Expect(err).To(Not(HaveOccurred())) - default: + } else { + Expect(resp.Header.Get("Content-Encoding")).To(BeEmpty()) actualContent, err = io.ReadAll(resp.Body) Expect(len(expectedContent)).To(BeNumerically("<", 1400), fmt.Sprintf("plaintext content should only be provided for content smaller than 1400 bytes, but we received plaintext for %d bytes\n expectedContent:\n%s\n", len(expectedContent), expectedContent)) From 10cbc91148c2d3bd6f0644ee6ff5d4c19926a60b Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Fri, 6 Dec 2024 16:36:04 -0500 Subject: [PATCH 2/4] add request logging and singleflight for shared index access for concurrent requests Signed-off-by: Joe Lanford --- catalogd/internal/serverutil/serverutil.go | 10 +++- catalogd/internal/storage/localdir.go | 70 +++++++++++++++++----- 2 files changed, 62 insertions(+), 18 deletions(-) diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index 614da2b8b..35cef0023 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -1,6 +1,7 @@ package serverutil import ( + "context" "crypto/tls" "fmt" "net" @@ -9,7 +10,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/certwatcher" - "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/log" catalogdmetrics "github.com/operator-framework/operator-controller/catalogd/internal/metrics" "github.com/operator-framework/operator-controller/catalogd/internal/storage" @@ -44,8 +45,11 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil Name: "catalogs", OnlyServeWhenLeader: true, Server: &http.Server{ - Addr: cfg.CatalogAddr, - Handler: catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler()), + Addr: cfg.CatalogAddr, + Handler: catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler()), + BaseContext: func(_ net.Listener) context.Context { + return log.IntoContext(context.Background(), mgr.GetLogger().WithName("http.catalogs")) + }, ReadTimeout: 5 * time.Second, // TODO: Revert this to 10 seconds if/when the API // evolves to have significantly smaller responses diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 7a0b67e99..20bbb8614 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -13,9 +13,12 @@ import ( "path/filepath" "strings" "sync" + "time" + "github.com/go-logr/logr" "github.com/klauspost/compress/gzhttp" "golang.org/x/sync/errgroup" + "golang.org/x/sync/singleflight" "github.com/operator-framework/operator-registry/alpha/declcfg" @@ -31,7 +34,8 @@ type LocalDirV1 struct { RootDir string RootURL *url.URL - m sync.RWMutex + m sync.RWMutex + sf singleflight.Group } func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { @@ -175,7 +179,8 @@ func (s *LocalDirV1) v1AllHandler() http.Handler { w.Header().Add("Content-Type", "application/jsonl") gzHandler.ServeHTTP(w, r) }) - return typeHandler + + return newLoggingMiddleware(typeHandler) } func (s *LocalDirV1) v1QueryHandler() http.Handler { @@ -215,22 +220,11 @@ func (s *LocalDirV1) v1QueryHandler() http.Handler { } defer catalogFile.Close() - indexFile, err := os.Open(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog))) + idx, err := s.getIndex(catalog) if err != nil { - if errors.Is(err, fs.ErrNotExist) { - http.Error(w, "No catalog contents found matching query", http.StatusNotFound) - } - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer indexFile.Close() - - var idx index - if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - queryReader, ok := idx.Get(catalogFile, schema, pkg, name) if !ok { http.Error(w, fmt.Sprintf("No index found for schema=%q, package=%q, name=%q", schema, pkg, name), http.StatusInternalServerError) @@ -241,7 +235,7 @@ func (s *LocalDirV1) v1QueryHandler() http.Handler { _, _ = io.Copy(w, queryReader) }) gzHandler := gzhttp.GzipHandler(catalogHandler) - return gzHandler + return newLoggingMiddleware(gzHandler) } func (s *LocalDirV1) ContentExists(catalog string) bool { @@ -268,3 +262,49 @@ func (s *LocalDirV1) ContentExists(catalog string) bool { } return true } + +func (s *LocalDirV1) getIndex(catalog string) (*index, error) { + idx, err, _ := s.sf.Do(catalog, func() (interface{}, error) { + indexFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)) + fmt.Printf("opening index file %s\n", indexFilePath) + indexFile, err := os.Open(indexFilePath) + if err != nil { + return nil, err + } + defer indexFile.Close() + var idx index + if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { + return nil, err + } + return &idx, nil + }) + return idx.(*index), err +} + +func newLoggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logger := logr.FromContextOrDiscard(r.Context()) + + start := time.Now() + lrw := &loggingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK} + next.ServeHTTP(lrw, r) + + logger.WithValues( + "method", r.Method, + "url", r.URL.String(), + "status", lrw.statusCode, + "duration", time.Since(start), + "remoteAddr", r.RemoteAddr, + ).Info("HTTP request processed") + }) +} + +type loggingResponseWriter struct { + http.ResponseWriter + statusCode int +} + +func (w *loggingResponseWriter) WriteHeader(code int) { + w.statusCode = code + w.ResponseWriter.WriteHeader(code) +} From ffaa05dcabde300222d39f22a0b3548d0223e49a Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Fri, 17 Jan 2025 16:28:42 -0500 Subject: [PATCH 3/4] a few improvements and optimizations Signed-off-by: Joe Lanford --- catalogd/cmd/catalogd/main.go | 6 ++- catalogd/internal/storage/index.go | 33 +++++++-------- catalogd/internal/storage/localdir.go | 48 ++++++++++++---------- catalogd/internal/storage/localdir_test.go | 2 +- catalogd/internal/storage/storage.go | 3 +- 5 files changed, 50 insertions(+), 42 deletions(-) diff --git a/catalogd/cmd/catalogd/main.go b/catalogd/cmd/catalogd/main.go index 062b44b78..8a73d8bd3 100644 --- a/catalogd/cmd/catalogd/main.go +++ b/catalogd/cmd/catalogd/main.go @@ -294,7 +294,11 @@ func main() { os.Exit(1) } - localStorage = &storage.LocalDirV1{RootDir: storeDir, RootURL: baseStorageURL} + localStorage = &storage.LocalDirV1{ + RootDir: storeDir, + RootURL: baseStorageURL, + EnableQueryHandler: features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler), + } // Config for the catalogd web server catalogServerConfig := serverutil.CatalogServerConfig{ diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go index b80b2646a..02f14d11b 100644 --- a/catalogd/internal/storage/index.go +++ b/catalogd/internal/storage/index.go @@ -3,7 +3,6 @@ package storage import ( "cmp" "encoding/json" - "errors" "fmt" "io" "slices" @@ -107,30 +106,28 @@ func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section } } -func newIndex(r io.Reader) (*index, error) { +func newIndex(metasChan <-chan *declcfg.Meta) (*index, error) { idx := &index{ BySchema: make(map[string][]section), ByPackage: make(map[string][]section), ByName: make(map[string][]section), } - var meta declcfg.Meta - dec := json.NewDecoder(r) - for { - i1 := dec.InputOffset() - if err := dec.Decode(&meta); err != nil { - if errors.Is(err, io.EOF) { - break - } - return nil, err - } - i2 := dec.InputOffset() - start := i1 - length := i2 - i1 + offset := int64(0) + for meta := range metasChan { + start := offset + length := int64(len(meta.Blob)) + offset += length s := section{offset: start, length: length} - idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s) - idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s) - idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) + if meta.Schema != "" { + idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s) + } + if meta.Package != "" { + idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s) + } + if meta.Name != "" { + idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s) + } } return idx, nil } diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 20bbb8614..60e09fadb 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -31,8 +31,9 @@ import ( // done so that clients accessing the content stored in RootDir/catalogName have // atomic view of the content for a catalog. type LocalDirV1 struct { - RootDir string - RootURL *url.URL + RootDir string + RootURL *url.URL + EnableQueryHandler bool m sync.RWMutex sf singleflight.Group @@ -42,7 +43,7 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro s.m.Lock() defer s.m.Unlock() - if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + if s.EnableQueryHandler { return s.storeCatalogFileAndIndex(ctx, catalog, fsys) } return s.storeCatalogFile(ctx, catalog, fsys) @@ -88,30 +89,33 @@ func (s *LocalDirV1) storeCatalogFileAndIndex(ctx context.Context, catalog strin } defer os.Remove(tmpIndexFile.Name()) - pr, pw := io.Pipe() - mw := io.MultiWriter(tmpCatalogFile, pw) + metasChan := make(chan *declcfg.Meta) eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { + defer close(metasChan) if err := declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { if err != nil { return err } - _, err = mw.Write(meta.Blob) + _, err = tmpCatalogFile.Write(meta.Blob) if err != nil { - return pw.CloseWithError(err) + return err } + select { + case <-egCtx.Done(): + return egCtx.Err() + case metasChan <- meta: + } + return nil }, declcfg.WithConcurrency(1)); err != nil { return fmt.Errorf("error walking FBC root: %w", err) } - return pw.CloseWithError(tmpCatalogFile.Close()) + return tmpCatalogFile.Close() }) eg.Go(func() error { - idx, err := newIndex(pr) + idx, err := newIndex(metasChan) if err != nil { - return pr.CloseWithError(err) - } - if err := pr.Close(); err != nil { return err } enc := json.NewEncoder(tmpIndexFile) @@ -142,7 +146,7 @@ func (s *LocalDirV1) Delete(catalog string) error { var errs []error errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)))) - if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + if s.EnableQueryHandler { errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)))) } return errors.Join(errs...) @@ -158,7 +162,7 @@ func (s *LocalDirV1) StorageServerHandler() http.Handler { v1AllPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path mux.Handle(v1AllPath, s.v1AllHandler()) - if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { + if s.EnableQueryHandler { v1QueryPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path mux.Handle(v1QueryPath, s.v1QueryHandler()) } @@ -171,16 +175,11 @@ func (s *LocalDirV1) v1AllHandler() http.Handler { defer s.m.RUnlock() catalog := r.PathValue("catalog") + w.Header().Add("Content-Type", "application/jsonl") http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) }) gzHandler := gzhttp.GzipHandler(catalogHandler) - - typeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Content-Type", "application/jsonl") - gzHandler.ServeHTTP(w, r) - }) - - return newLoggingMiddleware(typeHandler) + return newLoggingMiddleware(gzHandler) } func (s *LocalDirV1) v1QueryHandler() http.Handler { @@ -193,6 +192,13 @@ func (s *LocalDirV1) v1QueryHandler() http.Handler { pkg := r.URL.Query().Get("package") name := r.URL.Query().Get("name") + // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) + if schema == "" && pkg == "" && name == "" { + w.Header().Add("Content-Type", "application/jsonl") + http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) + return + } + catalogFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) catalogFileStat, err := os.Stat(catalogFilePath) if err != nil { diff --git a/catalogd/internal/storage/localdir_test.go b/catalogd/internal/storage/localdir_test.go index 3d7f7f8a0..0e84d3adc 100644 --- a/catalogd/internal/storage/localdir_test.go +++ b/catalogd/internal/storage/localdir_test.go @@ -56,7 +56,7 @@ var _ = Describe("LocalDir Storage Test", func() { rootDir = d baseURL = &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix} - store = LocalDirV1{RootDir: rootDir, RootURL: baseURL} + store = &LocalDirV1{RootDir: rootDir, RootURL: baseURL} unpackResultFS = &fstest.MapFS{ "bundle.yaml": &fstest.MapFile{Data: []byte(testBundle), Mode: os.ModePerm}, "package.yaml": &fstest.MapFile{Data: []byte(testPackage), Mode: os.ModePerm}, diff --git a/catalogd/internal/storage/storage.go b/catalogd/internal/storage/storage.go index 458ff040b..af78a669f 100644 --- a/catalogd/internal/storage/storage.go +++ b/catalogd/internal/storage/storage.go @@ -13,7 +13,8 @@ import ( type Instance interface { Store(ctx context.Context, catalog string, fsys fs.FS) error Delete(catalog string) error + ContentExists(catalog string) bool + BaseURL(catalog string) string StorageServerHandler() http.Handler - ContentExists(catalog string) bool } From c145d4fe7c18eddd3fbd6fc8cd60bfefe1807e9d Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Sun, 19 Jan 2025 10:00:36 -0500 Subject: [PATCH 4/4] another round of refactoring improvement Signed-off-by: Joe Lanford --- catalogd/internal/serverutil/serverutil.go | 36 +- catalogd/internal/storage/index.go | 6 +- catalogd/internal/storage/localdir.go | 370 +++++++++---------- catalogd/internal/storage/multireadseeker.go | 118 ++++++ 4 files changed, 323 insertions(+), 207 deletions(-) create mode 100644 catalogd/internal/storage/multireadseeker.go diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index 35cef0023..6d3daf1fd 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -8,6 +8,8 @@ import ( "net/http" "time" + "github.com/go-logr/logr" + "github.com/klauspost/compress/gzhttp" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/log" @@ -40,13 +42,17 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil } shutdownTimeout := 30 * time.Second + handler := cfg.LocalStorage.StorageServerHandler() + handler = gzhttp.GzipHandler(handler) + handler = catalogdmetrics.AddMetricsToHandler(handler) + handler = newLoggingMiddleware(handler) catalogServer := manager.Server{ Name: "catalogs", OnlyServeWhenLeader: true, Server: &http.Server{ Addr: cfg.CatalogAddr, - Handler: catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler()), + Handler: handler, BaseContext: func(_ net.Listener) context.Context { return log.IntoContext(context.Background(), mgr.GetLogger().WithName("http.catalogs")) }, @@ -66,3 +72,31 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil return nil } + +func newLoggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logger := logr.FromContextOrDiscard(r.Context()) + + start := time.Now() + lrw := &loggingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK} + next.ServeHTTP(lrw, r) + + logger.WithValues( + "method", r.Method, + "url", r.URL.String(), + "status", lrw.statusCode, + "duration", time.Since(start), + "remoteAddr", r.RemoteAddr, + ).Info("HTTP request processed") + }) +} + +type loggingResponseWriter struct { + http.ResponseWriter + statusCode int +} + +func (w *loggingResponseWriter) WriteHeader(code int) { + w.statusCode = code + w.ResponseWriter.WriteHeader(code) +} diff --git a/catalogd/internal/storage/index.go b/catalogd/internal/storage/index.go index 02f14d11b..20fe5f2ed 100644 --- a/catalogd/internal/storage/index.go +++ b/catalogd/internal/storage/index.go @@ -51,7 +51,7 @@ func (i index) Size() int64 { return int64(size) } -func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.Reader, bool) { +func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.ReadSeeker, bool) { sectionSet := i.getSectionSet(schema, packageName, name) sections := sectionSet.UnsortedList() @@ -59,12 +59,12 @@ func (i index) Get(r io.ReaderAt, schema, packageName, name string) (io.Reader, return cmp.Compare(a.offset, b.offset) }) - srs := make([]io.Reader, 0, len(sections)) + srs := make([]io.ReadSeeker, 0, len(sections)) for _, s := range sections { sr := io.NewSectionReader(r, s.offset, s.length) srs = append(srs, sr) } - return io.MultiReader(srs...), true + return newMultiReadSeeker(srs...), true } func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section] { diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index 60e09fadb..db642e8f5 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -11,18 +11,13 @@ import ( "net/url" "os" "path/filepath" - "strings" "sync" "time" - "github.com/go-logr/logr" - "github.com/klauspost/compress/gzhttp" "golang.org/x/sync/errgroup" "golang.org/x/sync/singleflight" "github.com/operator-framework/operator-registry/alpha/declcfg" - - "github.com/operator-framework/catalogd/internal/features" ) // LocalDirV1 is a storage Instance. When Storing a new FBC contained in @@ -43,99 +38,56 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro s.m.Lock() defer s.m.Unlock() - if s.EnableQueryHandler { - return s.storeCatalogFileAndIndex(ctx, catalog, fsys) - } - return s.storeCatalogFile(ctx, catalog, fsys) -} - -func (s *LocalDirV1) storeCatalogFile(ctx context.Context, catalog string, fsys fs.FS) error { if err := os.MkdirAll(s.RootDir, 0700); err != nil { return err } - tmpCatalogFile, err := os.CreateTemp(s.RootDir, fmt.Sprintf(".%s-*.jsonl", catalog)) + tmpCatalogDir, err := os.MkdirTemp(s.RootDir, fmt.Sprintf(".%s-*", catalog)) if err != nil { return err } - defer os.Remove(tmpCatalogFile.Name()) + defer os.RemoveAll(tmpCatalogDir) - if err := declcfg.WalkMetasFS(ctx, fsys, func(path string, meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - _, err = tmpCatalogFile.Write(meta.Blob) - return err - }); err != nil { - return fmt.Errorf("error walking FBC root: %w", err) - } - - fbcFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) - return os.Rename(tmpCatalogFile.Name(), fbcFile) -} - -func (s *LocalDirV1) storeCatalogFileAndIndex(ctx context.Context, catalog string, fsys fs.FS) error { - if err := os.MkdirAll(s.RootDir, 0700); err != nil { - return err - } - tmpCatalogFile, err := os.CreateTemp(s.RootDir, fmt.Sprintf(".%s-*.jsonl", catalog)) - if err != nil { - return err + storeMetaFuncs := []storeMetasFunc{storeCatalogData} + if s.EnableQueryHandler { + storeMetaFuncs = append(storeMetaFuncs, storeIndexData) } - defer os.Remove(tmpCatalogFile.Name()) - tmpIndexFile, err := os.CreateTemp(s.RootDir, filepath.Base(fmt.Sprintf("%s.index.json", strings.TrimSuffix(tmpCatalogFile.Name(), ".jsonl")))) - if err != nil { - return err + var ( + eg, egCtx = errgroup.WithContext(ctx) + metaChans []chan *declcfg.Meta + ) + for i, f := range storeMetaFuncs { + metaChans = append(metaChans, make(chan *declcfg.Meta, 1)) + eg.Go(func() error { return f(tmpCatalogDir, metaChans[i]) }) } - defer os.Remove(tmpIndexFile.Name()) - - metasChan := make(chan *declcfg.Meta) - eg, egCtx := errgroup.WithContext(ctx) - eg.Go(func() error { - defer close(metasChan) - if err := declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - _, err = tmpCatalogFile.Write(meta.Blob) - if err != nil { - return err - } + err = declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { + if err != nil { + return err + } + for _, ch := range metaChans { select { + case ch <- meta: case <-egCtx.Done(): return egCtx.Err() - case metasChan <- meta: } - - return nil - }, declcfg.WithConcurrency(1)); err != nil { - return fmt.Errorf("error walking FBC root: %w", err) - } - return tmpCatalogFile.Close() - }) - eg.Go(func() error { - idx, err := newIndex(metasChan) - if err != nil { - return err - } - enc := json.NewEncoder(tmpIndexFile) - if err := enc.Encode(idx); err != nil { - return err - } - if err := tmpIndexFile.Close(); err != nil { - return err } return nil - }) + }, declcfg.WithConcurrency(1)) + for _, ch := range metaChans { + close(ch) + } + if err != nil { + return fmt.Errorf("error walking FBC root: %w", err) + } + if err := eg.Wait(); err != nil { return err } - fbcFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) - fbcIndexFile := filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)) + catalogDir := s.catalogDir(catalog) return errors.Join( - os.Rename(tmpCatalogFile.Name(), fbcFile), - os.Rename(tmpIndexFile.Name(), fbcIndexFile), + os.RemoveAll(catalogDir), + os.Rename(tmpCatalogDir, catalogDir), ) } @@ -143,13 +95,78 @@ func (s *LocalDirV1) Delete(catalog string) error { s.m.Lock() defer s.m.Unlock() - var errs []error - errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)))) + return os.RemoveAll(s.catalogDir(catalog)) +} + +func (s *LocalDirV1) ContentExists(catalog string) bool { + s.m.RLock() + defer s.m.RUnlock() + + catalogFileStat, err := os.Stat(catalogFilePath(s.catalogDir(catalog))) + if err != nil { + return false + } + if !catalogFileStat.Mode().IsRegular() { + // path is not valid content + return false + } if s.EnableQueryHandler { - errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)))) + indexFileStat, err := os.Stat(catalogIndexFilePath(s.catalogDir(catalog))) + if err != nil { + return false + } + if !indexFileStat.Mode().IsRegular() { + return false + } } - return errors.Join(errs...) + return true +} + +func (s *LocalDirV1) catalogDir(catalog string) string { + return filepath.Join(s.RootDir, catalog) +} + +func catalogFilePath(catalogDir string) string { + return filepath.Join(catalogDir, "catalog.jsonl") +} + +func catalogIndexFilePath(catalogDir string) string { + return filepath.Join(catalogDir, "index.json") +} + +type storeMetasFunc func(catalogDir string, metaChan <-chan *declcfg.Meta) error + +func storeCatalogData(catalogDir string, metas <-chan *declcfg.Meta) error { + f, err := os.Create(catalogFilePath(catalogDir)) + if err != nil { + return err + } + defer f.Close() + + for m := range metas { + if _, err := f.Write(m.Blob); err != nil { + return err + } + } + return nil +} + +func storeIndexData(catalogDir string, metas <-chan *declcfg.Meta) error { + idx, err := newIndex(metas) + if err != nil { + return err + } + + f, err := os.Create(catalogIndexFilePath(catalogDir)) + if err != nil { + return err + } + defer f.Close() + + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + return enc.Encode(idx) } func (s *LocalDirV1) BaseURL(catalog string) string { @@ -159,121 +176,93 @@ func (s *LocalDirV1) BaseURL(catalog string) string { func (s *LocalDirV1) StorageServerHandler() http.Handler { mux := http.NewServeMux() - v1AllPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path - mux.Handle(v1AllPath, s.v1AllHandler()) - + mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path, s.handleV1All) if s.EnableQueryHandler { - v1QueryPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path - mux.Handle(v1QueryPath, s.v1QueryHandler()) + mux.HandleFunc(s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path, s.handleV1Query) } return mux } -func (s *LocalDirV1) v1AllHandler() http.Handler { - catalogHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - s.m.RLock() - defer s.m.RUnlock() +func (s *LocalDirV1) handleV1All(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() - catalog := r.PathValue("catalog") - w.Header().Add("Content-Type", "application/jsonl") - http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) - }) - gzHandler := gzhttp.GzipHandler(catalogHandler) - return newLoggingMiddleware(gzHandler) + catalog := r.PathValue("catalog") + catalogFile, catalogStat, err := s.catalogData(catalog) + if err != nil { + httpError(w, err) + return + } + serveJsonLines(w, r, catalogStat.ModTime(), catalogFile) } -func (s *LocalDirV1) v1QueryHandler() http.Handler { - catalogHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - s.m.RLock() - defer s.m.RUnlock() - - catalog := r.PathValue("catalog") - schema := r.URL.Query().Get("schema") - pkg := r.URL.Query().Get("package") - name := r.URL.Query().Get("name") - - // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) - if schema == "" && pkg == "" && name == "" { - w.Header().Add("Content-Type", "application/jsonl") - http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) - return - } +func (s *LocalDirV1) handleV1Query(w http.ResponseWriter, r *http.Request) { + s.m.RLock() + defer s.m.RUnlock() - catalogFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)) - catalogFileStat, err := os.Stat(catalogFilePath) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - http.Error(w, "Catalog not found", http.StatusNotFound) - return - } - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - modTime := catalogFileStat.ModTime().Format(http.TimeFormat) - if r.Header.Get("If-Modified-Since") == modTime { - w.WriteHeader(http.StatusNotModified) - return - } + catalog := r.PathValue("catalog") + catalogFile, catalogStat, err := s.catalogData(catalog) + if err != nil { + httpError(w, err) + return + } + defer catalogFile.Close() - catalogFile, err := os.Open(catalogFilePath) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - http.Error(w, "Catalog not found", http.StatusNotFound) - return - } - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer catalogFile.Close() + schema := r.URL.Query().Get("schema") + pkg := r.URL.Query().Get("package") + name := r.URL.Query().Get("name") - idx, err := s.getIndex(catalog) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - queryReader, ok := idx.Get(catalogFile, schema, pkg, name) - if !ok { - http.Error(w, fmt.Sprintf("No index found for schema=%q, package=%q, name=%q", schema, pkg, name), http.StatusInternalServerError) - return - } - w.Header().Add("Content-Type", "application/jsonl") - w.Header().Set("Last-Modified", modTime) - _, _ = io.Copy(w, queryReader) - }) - gzHandler := gzhttp.GzipHandler(catalogHandler) - return newLoggingMiddleware(gzHandler) + if schema == "" && pkg == "" && name == "" { + // If no parameters are provided, return the entire catalog (this is the same as /api/v1/all) + serveJsonLines(w, r, catalogStat.ModTime(), catalogFile) + return + } + idx, err := s.getIndex(catalog) + if err != nil { + httpError(w, err) + return + } + indexReader, ok := idx.Get(catalogFile, schema, pkg, name) + if !ok { + httpError(w, fs.ErrNotExist) + return + } + serveJsonLines(w, r, catalogStat.ModTime(), indexReader) } -func (s *LocalDirV1) ContentExists(catalog string) bool { - s.m.RLock() - defer s.m.RUnlock() - - catalogFileStat, err := os.Stat(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))) +func (s *LocalDirV1) catalogData(catalog string) (*os.File, os.FileInfo, error) { + catalogFile, err := os.Open(catalogFilePath(s.catalogDir(catalog))) if err != nil { - return false + return nil, nil, err } - if !catalogFileStat.Mode().IsRegular() { - // path is not valid content - return false + catalogFileStat, err := catalogFile.Stat() + if err != nil { + return nil, nil, err } + return catalogFile, catalogFileStat, nil +} - if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) { - indexFileStat, err := os.Stat(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog))) - if err != nil { - return false - } - if !indexFileStat.Mode().IsRegular() { - return false - } +func httpError(w http.ResponseWriter, err error) { + var code int + switch { + case errors.Is(err, fs.ErrNotExist): + code = http.StatusNotFound + case errors.Is(err, fs.ErrPermission): + code = http.StatusForbidden + default: + code = http.StatusInternalServerError } - return true + http.Error(w, fmt.Sprintf("%d %s", code, http.StatusText(code)), code) +} + +func serveJsonLines(w http.ResponseWriter, r *http.Request, modTime time.Time, rs io.ReadSeeker) { + w.Header().Add("Content-Type", "application/jsonl") + http.ServeContent(w, r, "", modTime, rs) } func (s *LocalDirV1) getIndex(catalog string) (*index, error) { idx, err, _ := s.sf.Do(catalog, func() (interface{}, error) { - indexFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)) - fmt.Printf("opening index file %s\n", indexFilePath) - indexFile, err := os.Open(indexFilePath) + indexFile, err := os.Open(catalogIndexFilePath(s.catalogDir(catalog))) if err != nil { return nil, err } @@ -284,33 +273,8 @@ func (s *LocalDirV1) getIndex(catalog string) (*index, error) { } return &idx, nil }) - return idx.(*index), err -} - -func newLoggingMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - logger := logr.FromContextOrDiscard(r.Context()) - - start := time.Now() - lrw := &loggingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK} - next.ServeHTTP(lrw, r) - - logger.WithValues( - "method", r.Method, - "url", r.URL.String(), - "status", lrw.statusCode, - "duration", time.Since(start), - "remoteAddr", r.RemoteAddr, - ).Info("HTTP request processed") - }) -} - -type loggingResponseWriter struct { - http.ResponseWriter - statusCode int -} - -func (w *loggingResponseWriter) WriteHeader(code int) { - w.statusCode = code - w.ResponseWriter.WriteHeader(code) + if err != nil { + return nil, err + } + return idx.(*index), nil } diff --git a/catalogd/internal/storage/multireadseeker.go b/catalogd/internal/storage/multireadseeker.go new file mode 100644 index 000000000..f1e3e1cd3 --- /dev/null +++ b/catalogd/internal/storage/multireadseeker.go @@ -0,0 +1,118 @@ +package storage + +import ( + "io" + "os" +) + +type doubleReadSeeker struct { + rs1, rs2 io.ReadSeeker + rs1len, rs2len int64 + second bool + pos int64 +} + +func (r *doubleReadSeeker) Seek(offset int64, whence int) (int64, error) { + var err error + switch whence { + case os.SEEK_SET: + if offset < r.rs1len { + r.second = false + r.pos, err = r.rs1.Seek(offset, os.SEEK_SET) + return r.pos, err + } else { + r.second = true + r.pos, err = r.rs2.Seek(offset-r.rs1len, os.SEEK_SET) + r.pos += r.rs1len + return r.pos, err + } + case os.SEEK_END: // negative offset + return r.Seek(r.rs1len+r.rs2len+offset-1, os.SEEK_SET) + default: // os.SEEK_CUR + return r.Seek(r.pos+offset, os.SEEK_SET) + } +} + +func (r *doubleReadSeeker) Read(p []byte) (n int, err error) { + switch { + case r.pos >= r.rs1len: // read only from the second reader + n, err := r.rs2.Read(p) + r.pos += int64(n) + return n, err + case r.pos+int64(len(p)) <= r.rs1len: // read only from the first reader + n, err := r.rs1.Read(p) + r.pos += int64(n) + return n, err + default: // read on the border - end of first reader and start of second reader + n1, err := r.rs1.Read(p) + r.pos += int64(n1) + if r.pos != r.rs1len || (err != nil && err != io.EOF) { + // Read() might not read all, return + // If error (but not EOF), return + return n1, err + } + _, err = r.rs2.Seek(0, os.SEEK_SET) + if err != nil { + return n1, err + } + r.second = true + n2, err := r.rs2.Read(p[n1:]) + r.pos += int64(n2) + return n1 + n2, err + } +} + +func multiReadSeeker(rs []io.ReadSeeker, leftmost bool) (io.ReadSeeker, int64, error) { + if len(rs) == 1 { + r := rs[0] + l, err := r.Seek(0, io.SeekEnd) + if err != nil { + return nil, 0, err + } + if leftmost { + _, err = r.Seek(0, io.SeekStart) + } + return r, l, err + } else { + rs1, l1, err := multiReadSeeker(rs[:len(rs)/2], leftmost) + if err != nil { + return nil, 0, err + } + rs2, l2, err := multiReadSeeker(rs[len(rs)/2:], false) + if err != nil { + return nil, 0, err + } + return &doubleReadSeeker{rs1, rs2, l1, l2, false, 0}, l1 + l2, nil + } +} + +type emptyReadSeeker struct{} + +func (r *emptyReadSeeker) Read(p []byte) (n int, err error) { + return 0, io.EOF +} + +func (r *emptyReadSeeker) Seek(offset int64, whence int) (int64, error) { + return 0, io.EOF +} + +// newMultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided +// input readseekers. After calling this method the initial position is set to the +// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances +// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker. +// Seek can be used over the sum of lengths of all readseekers. +// +// When a newMultiReadSeeker is used, no Read and Seek operations should be made on +// its ReadSeeker components and the length of the readseekers should not change. +// Also, users should make no assumption on the state of individual readseekers +// while the newMultiReadSeeker is used. +func newMultiReadSeeker(rs ...io.ReadSeeker) io.ReadSeeker { + if len(rs) == 0 { + return &emptyReadSeeker{} + } + r, _, err := multiReadSeeker(rs, true) + if err != nil { + return &emptyReadSeeker{} + } + return r +}