From 374e69eb9c9ac972af1a92b841199759c1d39320 Mon Sep 17 00:00:00 2001 From: grokspawn Date: Fri, 29 May 2026 14:56:23 -0500 Subject: [PATCH 1/2] shift to file-based cache, leveraging existing indexer --- hack/demo/graphql-demo-server/main.go | 84 ++--- internal/catalogd/graphql/graphql.go | 357 ++++++++++++++---- internal/catalogd/graphql/validation.go | 73 ++++ internal/catalogd/server/handlers.go | 10 +- internal/catalogd/server/handlers_test.go | 14 +- internal/catalogd/service/graphql_service.go | 112 ++---- .../catalogd/service/graphql_service_test.go | 216 +++++------ internal/catalogd/storage/index.go | 19 + internal/catalogd/storage/localdir.go | 198 +++++++--- 9 files changed, 720 insertions(+), 363 deletions(-) create mode 100644 internal/catalogd/graphql/validation.go diff --git a/hack/demo/graphql-demo-server/main.go b/hack/demo/graphql-demo-server/main.go index 7356ce64dd..517feaa3eb 100644 --- a/hack/demo/graphql-demo-server/main.go +++ b/hack/demo/graphql-demo-server/main.go @@ -1,39 +1,20 @@ package main import ( + "context" + "encoding/json" "fmt" - "io/fs" "net/http" - "net/url" "os" "os/signal" "syscall" "testing/fstest" + "time" - "github.com/operator-framework/operator-controller/internal/catalogd/server" - "github.com/operator-framework/operator-controller/internal/catalogd/service" -) - -// demoCatalogStore implements server.CatalogStore backed by an in-memory FS -type demoCatalogStore struct { - catalogs map[string]fs.FS -} - -func (s *demoCatalogStore) GetCatalogData(catalog string) (*os.File, os.FileInfo, error) { - return nil, nil, fmt.Errorf("not implemented for demo") -} + "github.com/graphql-go/graphql" -func (s *demoCatalogStore) GetCatalogFS(catalog string) (fs.FS, error) { - catFS, ok := s.catalogs[catalog] - if !ok { - return nil, fs.ErrNotExist - } - return catFS, nil -} - -func (s *demoCatalogStore) GetIndex(catalog string) (server.Index, error) { - return nil, fmt.Errorf("not implemented for demo") -} + gql "github.com/operator-framework/operator-controller/internal/catalogd/graphql" +) func main() { addr := ":9376" @@ -41,24 +22,42 @@ func main() { addr = ":" + v } - store := &demoCatalogStore{ - catalogs: map[string]fs.FS{ - "example-catalog": buildCatalog(), - }, + catalogFS := buildCatalog() + dynamicSchema, err := gql.LoadAndSummarizeCatalogDynamic(catalogFS) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to build schema: %v\n", err) + os.Exit(1) } - - graphqlSvc := service.NewCachedGraphQLService() - rootURL, _ := url.Parse("/catalogs/") - handlers := server.NewCatalogHandlers( - store, - graphqlSvc, - rootURL, - server.MetasHandlerDisabled, - server.GraphQLQueriesEnabled, - ) + gql.PrintCatalogSummary(dynamicSchema) mux := http.NewServeMux() - mux.Handle("/catalogs/", handlers.Handler()) + mux.HandleFunc("POST /catalogs/{catalog}/api/v1/graphql", func(w http.ResponseWriter, r *http.Request) { + r.Body = http.MaxBytesReader(w, r.Body, 1<<20) + + var params struct { + Query string `json:"query"` + } + if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + if params.Query == "" { + http.Error(w, "Query cannot be empty", http.StatusBadRequest) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + result := graphql.Do(graphql.Params{ + Schema: dynamicSchema.Schema, + RequestString: params.Query, + Context: ctx, + }) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(result) //nolint:errcheck + }) fmt.Fprintf(os.Stderr, "GraphQL demo server listening on http://localhost%s\n", addr) fmt.Fprintf(os.Stderr, "Endpoint: POST http://localhost%s/catalogs/example-catalog/api/v1/graphql\n", addr) @@ -78,8 +77,8 @@ func main() { fmt.Fprintln(os.Stderr, "\nShutting down.") } -func buildCatalog() fs.FS { - return fstest.MapFS{ +func buildCatalog() *fstest.MapFS { + return &fstest.MapFS{ "catalog.json": &fstest.MapFile{ Data: []byte(catalogJSON), }, @@ -87,7 +86,6 @@ func buildCatalog() fs.FS { } // catalogJSON contains sample FBC data for demonstration purposes. -// Each line is a separate JSON object (JSONL format parsed by WalkMetasFS). const catalogJSON = `{"schema":"olm.package","name":"database-operator","defaultChannel":"stable","description":"An operator for managing database instances."} {"schema":"olm.package","name":"logging-operator","defaultChannel":"stable","description":"Logging operator for collecting and forwarding application logs."} {"schema":"olm.package","name":"messaging-operator","defaultChannel":"stable","description":"Messaging broker operator based on Apache Kafka."} diff --git a/internal/catalogd/graphql/graphql.go b/internal/catalogd/graphql/graphql.go index 3e9e436766..6e01eeb009 100644 --- a/internal/catalogd/graphql/graphql.go +++ b/internal/catalogd/graphql/graphql.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/graphql-go/graphql" + "k8s.io/klog/v2" "github.com/operator-framework/operator-registry/alpha/declcfg" ) @@ -26,6 +27,7 @@ var ( // FieldInfo represents discovered field information type FieldInfo struct { Name string + OriginalName string // The original JSON key before remapping GraphQLType graphql.Type JSONType reflect.Kind IsArray bool @@ -45,16 +47,143 @@ type CatalogSchema struct { Schemas map[string]*SchemaInfo // schema name -> info } +// serializableFieldInfo is a JSON-friendly representation of FieldInfo +type serializableFieldInfo struct { + Name string `json:"name"` + OriginalName string `json:"originalName"` + JSONType string `json:"jsonType"` + IsArray bool `json:"isArray"` + NestedFields map[string]*serializableFieldInfo `json:"nestedFields,omitempty"` +} + +// serializableSchemaInfo is a JSON-friendly representation of SchemaInfo +type serializableSchemaInfo struct { + Fields map[string]*serializableFieldInfo `json:"fields"` + TotalObjects int `json:"totalObjects"` +} + +// serializableCatalogSchema is a JSON-friendly representation of CatalogSchema +type serializableCatalogSchema struct { + Schemas map[string]*serializableSchemaInfo `json:"schemas"` +} + +func kindToString(k reflect.Kind) string { + switch k { + case reflect.String: + return "string" + case reflect.Bool: + return "bool" + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return "int" + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return "uint" + case reflect.Float32, reflect.Float64: + return "float64" + default: + return "string" + } +} + +func stringToKind(s string) reflect.Kind { + switch s { + case "string": + return reflect.String + case "bool": + return reflect.Bool + case "int": + return reflect.Int + case "uint": + return reflect.Uint + case "float64": + return reflect.Float64 + default: + return reflect.String + } +} + +func fieldInfoToSerializable(fi *FieldInfo) *serializableFieldInfo { + sfi := &serializableFieldInfo{ + Name: fi.Name, + OriginalName: fi.OriginalName, + JSONType: kindToString(fi.JSONType), + IsArray: fi.IsArray, + } + if len(fi.NestedFields) > 0 { + sfi.NestedFields = make(map[string]*serializableFieldInfo) + for k, v := range fi.NestedFields { + sfi.NestedFields[k] = fieldInfoToSerializable(v) + } + } + return sfi +} + +func serializableToFieldInfo(sfi *serializableFieldInfo) *FieldInfo { + k := stringToKind(sfi.JSONType) + fi := &FieldInfo{ + Name: sfi.Name, + OriginalName: sfi.OriginalName, + JSONType: k, + IsArray: sfi.IsArray, + GraphQLType: jsonTypeToGraphQL(k, sfi.IsArray, nil), + } + if len(sfi.NestedFields) > 0 { + fi.NestedFields = make(map[string]*FieldInfo) + for kk, v := range sfi.NestedFields { + fi.NestedFields[kk] = serializableToFieldInfo(v) + } + } + return fi +} + +// MarshalCatalogSchema serializes a CatalogSchema to JSON bytes +func MarshalCatalogSchema(cs *CatalogSchema) ([]byte, error) { + scs := &serializableCatalogSchema{ + Schemas: make(map[string]*serializableSchemaInfo), + } + for name, info := range cs.Schemas { + si := &serializableSchemaInfo{ + Fields: make(map[string]*serializableFieldInfo), + TotalObjects: info.TotalObjects, + } + for fname, finfo := range info.Fields { + si.Fields[fname] = fieldInfoToSerializable(finfo) + } + scs.Schemas[name] = si + } + return json.Marshal(scs) +} + +// UnmarshalCatalogSchema deserializes a CatalogSchema from JSON bytes +func UnmarshalCatalogSchema(data []byte) (*CatalogSchema, error) { + var scs serializableCatalogSchema + if err := json.Unmarshal(data, &scs); err != nil { + return nil, err + } + cs := &CatalogSchema{ + Schemas: make(map[string]*SchemaInfo), + } + for name, si := range scs.Schemas { + info := &SchemaInfo{ + Fields: make(map[string]*FieldInfo), + TotalObjects: si.TotalObjects, + } + for fname, sfi := range si.Fields { + info.Fields[fname] = serializableToFieldInfo(sfi) + } + cs.Schemas[name] = info + } + return cs, nil +} + +// ObjectLoader reads FBC objects for a given schema from disk with pagination. +// It is called by the root resolver at query time instead of holding all +// parsed objects in memory. +type ObjectLoader func(schemaName string, offset, limit int) ([]map[string]interface{}, error) + // DynamicSchema holds the generated GraphQL schema and metadata type DynamicSchema struct { Schema graphql.Schema CatalogSchema *CatalogSchema - ParsedObjects map[string][]map[string]interface{} // Pre-parsed JSON objects cached during schema build - // Performance optimization: ParsedObjects avoids json.Unmarshal on every GraphQL query. - // Objects are parsed once during schema build and cached for all subsequent queries. - // Memory cost: ~same as storing raw blobs (parsed maps ≈ JSON size in memory). - // For 1000 bundles @ 5KB each: ~5MB, same as raw metadata storage. - // Performance gain: Eliminates N × json.Unmarshal operations per query (where N = returned objects). } // remapFieldName converts field names to valid GraphQL camelCase identifiers @@ -172,7 +301,11 @@ func determineFieldType(value interface{}) (reflect.Kind, bool) { return reflect.TypeOf(firstElem).Kind(), true } -// analyzeFieldValue analyzes a field value and returns type info, sample value, and nested fields +const maxSampleElements = 10 + +// analyzeFieldValue analyzes a field value and returns type info, sample value, and nested fields. +// For slices, it examines up to maxSampleElements to detect heterogeneous element types +// and build a union of nested field structures across multiple elements. func analyzeFieldValue(value interface{}) (reflect.Kind, bool, interface{}, map[string]*FieldInfo) { if value == nil { return reflect.String, false, value, nil @@ -183,28 +316,59 @@ func analyzeFieldValue(value interface{}) (reflect.Kind, bool, interface{}, map[ return valueType.Kind(), false, value, nil } - // Handle slice types slice := reflect.ValueOf(value) if slice.Len() == 0 { return reflect.String, true, value, nil } - firstElem := slice.Index(0).Interface() - if firstElem == nil { - return reflect.String, true, value, nil + // Scan up to maxSampleElements to determine element type and nested structure + var dominantType reflect.Kind + var sampleValue interface{} + var nestedFields map[string]*FieldInfo + heterogeneous := false + limit := slice.Len() + if limit > maxSampleElements { + limit = maxSampleElements } - jsonType := reflect.TypeOf(firstElem).Kind() + for i := 0; i < limit; i++ { + elem := slice.Index(i).Interface() + if elem == nil { + continue + } + + elemKind := reflect.TypeOf(elem).Kind() - // If array element is an object, analyze its structure - var nestedFields map[string]*FieldInfo - if jsonType == reflect.Map { - if elemObj, ok := firstElem.(map[string]interface{}); ok { - nestedFields = analyzeNestedObject(elemObj) + if sampleValue == nil { + dominantType = elemKind + sampleValue = elem + } else if elemKind != dominantType { + heterogeneous = true + } + + // For map elements, merge nested structure from each sampled element + if elemKind == reflect.Map { + if elemObj, ok := elem.(map[string]interface{}); ok { + elemFields := analyzeNestedObject(elemObj) + if nestedFields == nil { + nestedFields = elemFields + } else { + mergeNestedFields(nestedFields, elemFields) + } + } } } - return jsonType, true, firstElem, nestedFields + if sampleValue == nil { + return reflect.String, true, value, nil + } + + // Heterogeneous primitive arrays (string mixed with int, etc.) fall back to String + if heterogeneous && nestedFields == nil { + return reflect.String, true, sampleValue, nil + } + + return dominantType, true, sampleValue, nestedFields } // analyzeNestedObject analyzes a nested object and returns its field structure @@ -225,6 +389,7 @@ func analyzeNestedObject(obj map[string]interface{}) map[string]*FieldInfo { fields[fieldName] = &FieldInfo{ Name: fieldName, + OriginalName: key, GraphQLType: jsonTypeToGraphQL(jsonType, isArray, sampleValue), JSONType: jsonType, IsArray: isArray, @@ -266,6 +431,7 @@ func analyzeJSONObject(obj map[string]interface{}, info *SchemaInfo) { if !ok { info.Fields[fieldName] = &FieldInfo{ Name: fieldName, + OriginalName: key, GraphQLType: jsonTypeToGraphQL(jsonType, isArray, sampleValue), JSONType: jsonType, IsArray: isArray, @@ -275,6 +441,12 @@ func analyzeJSONObject(obj map[string]interface{}, info *SchemaInfo) { continue } + // Different original keys mapping to the same GraphQL field name is a collision + if existing.OriginalName != key { + klog.V(2).InfoS("field name collision: different JSON keys map to same GraphQL field", + "graphqlField", fieldName, "existingKey", existing.OriginalName, "newKey", key) + } + // Update existing field existing.SampleValues = appendUnique(existing.SampleValues, sampleValue) @@ -340,7 +512,9 @@ func DiscoverSchemaFromMetas(metas []*declcfg.Meta) (*CatalogSchema, error) { // Parse the JSON blob var obj map[string]interface{} if err := json.Unmarshal(meta.Blob, &obj); err != nil { - continue // Skip malformed objects + klog.V(4).InfoS("skipping malformed meta blob during schema discovery", + "schema", meta.Schema, "name", meta.Name, "error", err) + continue } // Store a sample object for reference @@ -355,6 +529,46 @@ func DiscoverSchemaFromMetas(metas []*declcfg.Meta) (*CatalogSchema, error) { return catalogSchema, nil } +// DiscoverSchemaFromChannel performs streaming schema discovery, processing +// one meta at a time through a channel. Each meta's blob is parsed, analyzed, +// and then goes out of scope — avoiding accumulation of all blobs in memory. +func DiscoverSchemaFromChannel(metasChan <-chan *declcfg.Meta) (*CatalogSchema, error) { + catalogSchema := &CatalogSchema{ + Schemas: make(map[string]*SchemaInfo), + } + + for meta := range metasChan { + if meta.Schema == "" { + continue + } + + if catalogSchema.Schemas[meta.Schema] == nil { + catalogSchema.Schemas[meta.Schema] = &SchemaInfo{ + Fields: make(map[string]*FieldInfo), + TotalObjects: 0, + } + } + + info := catalogSchema.Schemas[meta.Schema] + info.TotalObjects++ + + var obj map[string]interface{} + if err := json.Unmarshal(meta.Blob, &obj); err != nil { + klog.V(4).InfoS("skipping malformed meta blob during schema discovery", + "schema", meta.Schema, "name", meta.Name, "error", err) + continue + } + + if info.SampleObject == nil { + info.SampleObject = obj + } + + analyzeJSONObject(obj, info) + } + + return catalogSchema, nil +} + // marshalComplexValue marshals maps and slices as JSON strings func marshalComplexValue(value interface{}) interface{} { if value == nil { @@ -509,21 +723,28 @@ func sanitizeTypeName(propType string) string { return result } -// BuildDynamicGraphQLSchema creates a complete GraphQL schema from discovered structure -func BuildDynamicGraphQLSchema(catalogSchema *CatalogSchema, metasBySchema map[string][]*declcfg.Meta) (*DynamicSchema, error) { - // Pre-parse all meta blobs to avoid unmarshaling on every query - // This has minimal memory overhead (parsed objects ≈ raw blob size) - // but eliminates expensive json.Unmarshal operations from the query path - parsedObjects := make(map[string][]map[string]interface{}) - for schemaName, metas := range metasBySchema { - parsedObjects[schemaName] = make([]map[string]interface{}, 0, len(metas)) - for _, meta := range metas { - var obj map[string]interface{} - if err := json.Unmarshal(meta.Blob, &obj); err != nil { - continue // Skip malformed objects (same as runtime behavior) - } - parsedObjects[schemaName] = append(parsedObjects[schemaName], obj) +// BuildDynamicGraphQLSchema creates a complete GraphQL schema from discovered structure. +// The loader is called at query time to read objects from disk with pagination. +func BuildDynamicGraphQLSchema(catalogSchema *CatalogSchema, loader ObjectLoader) (*DynamicSchema, error) { + // Detect type name collisions (distinct schemas that sanitize to the same GraphQL type name) + typeNameToOriginal := make(map[string]string) + for schemaName := range catalogSchema.Schemas { + sanitized := sanitizeTypeName(schemaName) + if existing, ok := typeNameToOriginal[sanitized]; ok && existing != schemaName { + return nil, fmt.Errorf("type name collision: schemas %q and %q both sanitize to GraphQL type %q", existing, schemaName, sanitized) + } + typeNameToOriginal[sanitized] = schemaName + } + + // Detect root query field name collisions + fieldNameToOriginal := make(map[string]string) + for schemaName := range catalogSchema.Schemas { + sanitized := alphanumericOnlyRE.ReplaceAllString(schemaName, "") + fieldName := strings.ToLower(sanitized) + "s" + if existing, ok := fieldNameToOriginal[fieldName]; ok && existing != schemaName { + return nil, fmt.Errorf("query field collision: schemas %q and %q both map to field %q", existing, schemaName, fieldName) } + fieldNameToOriginal[fieldName] = schemaName } // Build GraphQL object types for each discovered schema @@ -570,40 +791,28 @@ func BuildDynamicGraphQLSchema(catalogSchema *CatalogSchema, metasBySchema map[s }, }, Resolve: func(p graphql.ResolveParams) (interface{}, error) { - // O(1) lookup of schema name from pre-built map currentSchemaName, ok := fieldNameToSchema[p.Info.FieldName] if !ok { return nil, fmt.Errorf("unknown schema for field %s", p.Info.FieldName) } - // Get pre-parsed objects for this schema (no unmarshaling needed!) - objects, ok := parsedObjects[currentSchemaName] - if !ok { - return []interface{}{}, nil - } - - // Parse arguments limit, _ := p.Args["limit"].(int) if limit <= 0 || limit > 100 { - limit = 100 // Clamp to default/max to prevent DoS + limit = 100 } offset, _ := p.Args["offset"].(int) if offset < 0 { - offset = 0 // Negative offsets make no sense + offset = 0 } - // Apply pagination to pre-parsed objects - var results []interface{} + objects, err := loader(currentSchemaName, offset, limit) + if err != nil { + return nil, fmt.Errorf("error loading objects for schema %s: %w", currentSchemaName, err) + } + results := make([]interface{}, len(objects)) for i, obj := range objects { - if i < offset { - continue - } - if len(results) >= limit { - break - } - results = append(results, obj) + results[i] = obj } - return results, nil }, } @@ -661,15 +870,40 @@ func BuildDynamicGraphQLSchema(catalogSchema *CatalogSchema, metasBySchema map[s return &DynamicSchema{ Schema: schema, CatalogSchema: catalogSchema, - ParsedObjects: parsedObjects, }, nil } -// LoadAndSummarizeCatalogDynamic loads FBC using WalkMetasReader and builds dynamic GraphQL schema +// NewInMemoryObjectLoader creates an ObjectLoader from pre-parsed metas. +// Used by the demo server and tests where disk-backed loading is not needed. +func NewInMemoryObjectLoader(metasBySchema map[string][]*declcfg.Meta) ObjectLoader { + parsed := make(map[string][]map[string]interface{}) + for schema, metas := range metasBySchema { + for _, meta := range metas { + var obj map[string]interface{} + if err := json.Unmarshal(meta.Blob, &obj); err != nil { + continue + } + parsed[schema] = append(parsed[schema], obj) + } + } + return func(schemaName string, offset, limit int) ([]map[string]interface{}, error) { + objects := parsed[schemaName] + if offset >= len(objects) { + return nil, nil + } + end := offset + limit + if end > len(objects) { + end = len(objects) + } + return objects[offset:end], nil + } +} + +// LoadAndSummarizeCatalogDynamic loads FBC from a filesystem and builds a dynamic GraphQL schema. +// Uses in-memory object loading — suitable for demos and CLI tools, not production serving. func LoadAndSummarizeCatalogDynamic(catalogFS fs.FS) (*DynamicSchema, error) { var metas []*declcfg.Meta - // Collect all metas from the filesystem err := declcfg.WalkMetasFS(context.Background(), catalogFS, func(path string, meta *declcfg.Meta, err error) error { if err != nil { return err @@ -683,13 +917,11 @@ func LoadAndSummarizeCatalogDynamic(catalogFS fs.FS) (*DynamicSchema, error) { return nil, fmt.Errorf("error walking catalog metas: %w", err) } - // Discover schema from collected metas catalogSchema, err := DiscoverSchemaFromMetas(metas) if err != nil { return nil, fmt.Errorf("error discovering schema: %w", err) } - // Organize metas by schema for resolvers metasBySchema := make(map[string][]*declcfg.Meta) for _, meta := range metas { if meta.Schema != "" { @@ -697,13 +929,8 @@ func LoadAndSummarizeCatalogDynamic(catalogFS fs.FS) (*DynamicSchema, error) { } } - // Build dynamic GraphQL schema - dynamicSchema, err := BuildDynamicGraphQLSchema(catalogSchema, metasBySchema) - if err != nil { - return nil, fmt.Errorf("error building GraphQL schema: %w", err) - } - - return dynamicSchema, nil + loader := NewInMemoryObjectLoader(metasBySchema) + return BuildDynamicGraphQLSchema(catalogSchema, loader) } // PrintCatalogSummary prints a comprehensive summary of the discovered schema diff --git a/internal/catalogd/graphql/validation.go b/internal/catalogd/graphql/validation.go new file mode 100644 index 0000000000..4c028e194c --- /dev/null +++ b/internal/catalogd/graphql/validation.go @@ -0,0 +1,73 @@ +package graphql + +import ( + "fmt" + + "github.com/graphql-go/graphql/language/ast" + "github.com/graphql-go/graphql/language/parser" +) + +const ( + MaxQueryDepth = 10 + MaxQueryAliases = 50 + MaxQueryFields = 500 +) + +type queryComplexity struct { + aliases int + fields int +} + +// ValidateQueryComplexity parses the query AST and rejects it if it exceeds +// depth, alias, or total field count thresholds. +func ValidateQueryComplexity(query string) error { + doc, err := parser.Parse(parser.ParseParams{Source: query}) + if err != nil { + return fmt.Errorf("query parse error: %w", err) + } + + c := &queryComplexity{} + for _, def := range doc.Definitions { + if op, ok := def.(*ast.OperationDefinition); ok { + if err := c.walkSelectionSet(op.SelectionSet, 1); err != nil { + return err + } + } + } + return nil +} + +func (c *queryComplexity) walkSelectionSet(ss *ast.SelectionSet, depth int) error { + if ss == nil { + return nil + } + if depth > MaxQueryDepth { + return fmt.Errorf("query exceeds maximum depth of %d", MaxQueryDepth) + } + + for _, sel := range ss.Selections { + switch s := sel.(type) { + case *ast.Field: + c.fields++ + if c.fields > MaxQueryFields { + return fmt.Errorf("query exceeds maximum field count of %d", MaxQueryFields) + } + if s.Alias != nil && s.Alias.Value != "" { + c.aliases++ + if c.aliases > MaxQueryAliases { + return fmt.Errorf("query exceeds maximum alias count of %d", MaxQueryAliases) + } + } + if err := c.walkSelectionSet(s.SelectionSet, depth+1); err != nil { + return err + } + case *ast.InlineFragment: + if err := c.walkSelectionSet(s.SelectionSet, depth+1); err != nil { + return err + } + case *ast.FragmentSpread: + c.fields++ + } + } + return nil +} diff --git a/internal/catalogd/server/handlers.go b/internal/catalogd/server/handlers.go index 2650d20505..7c90dc1147 100644 --- a/internal/catalogd/server/handlers.go +++ b/internal/catalogd/server/handlers.go @@ -223,15 +223,7 @@ func (h *CatalogHandlers) handleV1GraphQL(w http.ResponseWriter, r *http.Request return } - // Get catalog filesystem - catalogFS, err := h.store.GetCatalogFS(catalog) - if err != nil { - httpError(w, err) - return - } - - // Execute GraphQL query through the service - result, err := h.graphqlSvc.ExecuteQuery(catalog, catalogFS, params.Query) + result, err := h.graphqlSvc.ExecuteQuery(r.Context(), catalog, params.Query) if err != nil { httpError(w, err) return diff --git a/internal/catalogd/server/handlers_test.go b/internal/catalogd/server/handlers_test.go index d565de277c..1635133d2c 100644 --- a/internal/catalogd/server/handlers_test.go +++ b/internal/catalogd/server/handlers_test.go @@ -44,11 +44,11 @@ type mockGraphQLService struct { executeErr error } -func (m *mockGraphQLService) GetSchema(catalog string, catalogFS fs.FS) (*gql.DynamicSchema, error) { +func (m *mockGraphQLService) GetSchema(_ context.Context, _ string) (*gql.DynamicSchema, error) { return nil, nil } -func (m *mockGraphQLService) ExecuteQuery(catalog string, catalogFS fs.FS, query string) (*graphql.Result, error) { +func (m *mockGraphQLService) ExecuteQuery(_ context.Context, _ string, _ string) (*graphql.Result, error) { return m.executeResult, m.executeErr } @@ -230,14 +230,14 @@ func TestHandleV1GraphQL_Success(t *testing.T) { } } -func TestHandleV1GraphQL_GetCatalogFSError(t *testing.T) { +func TestHandleV1GraphQL_CatalogNotFoundError(t *testing.T) { rootURL, _ := url.Parse("http://localhost/") - store := &mockCatalogStore{ - getFSErr: fs.ErrNotExist, - } + store := &mockCatalogStore{} - graphqlSvc := &mockGraphQLService{} + graphqlSvc := &mockGraphQLService{ + executeErr: fs.ErrNotExist, + } handlers := NewCatalogHandlers(store, graphqlSvc, rootURL, MetasHandlerDisabled, GraphQLQueriesEnabled) diff --git a/internal/catalogd/service/graphql_service.go b/internal/catalogd/service/graphql_service.go index e142d4447b..09033953c8 100644 --- a/internal/catalogd/service/graphql_service.go +++ b/internal/catalogd/service/graphql_service.go @@ -3,46 +3,48 @@ package service import ( "context" "fmt" - "io/fs" "sync" + "time" "github.com/graphql-go/graphql" + "github.com/graphql-go/graphql/gqlerrors" "golang.org/x/sync/singleflight" - "github.com/operator-framework/operator-registry/alpha/declcfg" - gql "github.com/operator-framework/operator-controller/internal/catalogd/graphql" ) +// CatalogDataProvider provides access to catalog data for GraphQL schema building. +// Implemented by the storage layer. +type CatalogDataProvider interface { + LoadCatalogSchema(catalog string) (*gql.CatalogSchema, error) + NewObjectLoader(catalog string) (gql.ObjectLoader, error) +} + // GraphQLService handles GraphQL schema generation and query execution for catalogs type GraphQLService interface { - // GetSchema returns the GraphQL schema for a catalog, using cache if available - GetSchema(catalog string, catalogFS fs.FS) (*gql.DynamicSchema, error) - - // ExecuteQuery executes a GraphQL query against a catalog - ExecuteQuery(catalog string, catalogFS fs.FS, query string) (*graphql.Result, error) - - // InvalidateCache removes the cached schema for a catalog + GetSchema(ctx context.Context, catalog string) (*gql.DynamicSchema, error) + ExecuteQuery(ctx context.Context, catalog string, query string) (*graphql.Result, error) InvalidateCache(catalog string) } -// CachedGraphQLService implements GraphQLService with an in-memory schema cache +// CachedGraphQLService implements GraphQLService with an in-memory schema cache. +// The cached DynamicSchema contains only the GraphQL type system and schema metadata +// (a few KB). Object data is loaded from disk at query time via the ObjectLoader. type CachedGraphQLService struct { + provider CatalogDataProvider schemaMux sync.RWMutex schemaCache map[string]*gql.DynamicSchema - buildGroup singleflight.Group // Prevents duplicate concurrent schema builds + buildGroup singleflight.Group } -// NewCachedGraphQLService creates a new GraphQL service with caching -func NewCachedGraphQLService() *CachedGraphQLService { +func NewCachedGraphQLService(provider CatalogDataProvider) *CachedGraphQLService { return &CachedGraphQLService{ + provider: provider, schemaCache: make(map[string]*gql.DynamicSchema), } } -// GetSchema returns the GraphQL schema for a catalog, using cache if available -func (s *CachedGraphQLService) GetSchema(catalog string, catalogFS fs.FS) (*gql.DynamicSchema, error) { - // Check cache first (read lock) +func (s *CachedGraphQLService) GetSchema(ctx context.Context, catalog string) (*gql.DynamicSchema, error) { s.schemaMux.RLock() if cachedSchema, ok := s.schemaCache[catalog]; ok { s.schemaMux.RUnlock() @@ -50,9 +52,7 @@ func (s *CachedGraphQLService) GetSchema(catalog string, catalogFS fs.FS) (*gql. } s.schemaMux.RUnlock() - // Use singleflight to prevent duplicate concurrent builds for the same catalog result, err, _ := s.buildGroup.Do(catalog, func() (interface{}, error) { - // Double-check cache after acquiring singleflight lock s.schemaMux.RLock() if cachedSchema, ok := s.schemaCache[catalog]; ok { s.schemaMux.RUnlock() @@ -60,13 +60,11 @@ func (s *CachedGraphQLService) GetSchema(catalog string, catalogFS fs.FS) (*gql. } s.schemaMux.RUnlock() - // Schema not in cache, build it - dynamicSchema, err := buildSchemaFromFS(catalogFS) + dynamicSchema, err := s.buildSchema(catalog) if err != nil { return nil, err } - // Cache the result (write lock) s.schemaMux.Lock() s.schemaCache[catalog] = dynamicSchema s.schemaMux.Unlock() @@ -81,80 +79,50 @@ func (s *CachedGraphQLService) GetSchema(catalog string, catalogFS fs.FS) (*gql. return result.(*gql.DynamicSchema), nil } -// ExecuteQuery executes a GraphQL query against a catalog -func (s *CachedGraphQLService) ExecuteQuery(catalog string, catalogFS fs.FS, query string) (*graphql.Result, error) { - // Get or build the schema (uses cache and singleflight) - dynamicSchema, err := s.GetSchema(catalog, catalogFS) +func (s *CachedGraphQLService) ExecuteQuery(ctx context.Context, catalog string, query string) (*graphql.Result, error) { + if err := gql.ValidateQueryComplexity(query); err != nil { + return &graphql.Result{ + Errors: []gqlerrors.FormattedError{ + gqlerrors.FormatError(err), + }, + }, nil + } + + dynamicSchema, err := s.GetSchema(ctx, catalog) if err != nil { return nil, fmt.Errorf("failed to get GraphQL schema: %w", err) } - // Execute the query + queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + result := graphql.Do(graphql.Params{ Schema: dynamicSchema.Schema, RequestString: query, + Context: queryCtx, }) return result, nil } -// InvalidateCache removes the cached schema for a catalog func (s *CachedGraphQLService) InvalidateCache(catalog string) { s.schemaMux.Lock() delete(s.schemaCache, catalog) s.schemaMux.Unlock() } -// buildSchemaFromFS builds a GraphQL schema from a catalog filesystem -func buildSchemaFromFS(catalogFS fs.FS) (*gql.DynamicSchema, error) { - var metas []*declcfg.Meta - var metasMux sync.Mutex - var walkErr error - - // Collect all metas from the catalog filesystem - // WalkMetasFS walks the filesystem concurrently, so we need to protect the metas slice and error - err := declcfg.WalkMetasFS(context.Background(), catalogFS, func(path string, meta *declcfg.Meta, err error) error { - metasMux.Lock() - defer metasMux.Unlock() - - if err != nil { - // Set shared error so other goroutines can check - if walkErr == nil { - walkErr = err - } - return err - } - - // If an error has already occurred, skip further mutation - if walkErr != nil { - return walkErr - } - - if meta != nil { - metas = append(metas, meta) - } - return nil - }) +func (s *CachedGraphQLService) buildSchema(catalog string) (*gql.DynamicSchema, error) { + catalogSchema, err := s.provider.LoadCatalogSchema(catalog) if err != nil { - return nil, fmt.Errorf("error walking catalog metas: %w", err) + return nil, fmt.Errorf("error loading catalog schema: %w", err) } - // Discover schema from collected metas - catalogSchema, err := gql.DiscoverSchemaFromMetas(metas) + loader, err := s.provider.NewObjectLoader(catalog) if err != nil { - return nil, fmt.Errorf("error discovering schema: %w", err) - } - - // Organize metas by schema for resolvers - metasBySchema := make(map[string][]*declcfg.Meta) - for _, meta := range metas { - if meta.Schema != "" { - metasBySchema[meta.Schema] = append(metasBySchema[meta.Schema], meta) - } + return nil, fmt.Errorf("error creating object loader: %w", err) } - // Build dynamic GraphQL schema - dynamicSchema, err := gql.BuildDynamicGraphQLSchema(catalogSchema, metasBySchema) + dynamicSchema, err := gql.BuildDynamicGraphQLSchema(catalogSchema, loader) if err != nil { return nil, fmt.Errorf("error building GraphQL schema: %w", err) } diff --git a/internal/catalogd/service/graphql_service_test.go b/internal/catalogd/service/graphql_service_test.go index 81376e92fe..df9c7418b8 100644 --- a/internal/catalogd/service/graphql_service_test.go +++ b/internal/catalogd/service/graphql_service_test.go @@ -1,31 +1,62 @@ package service import ( - "io/fs" + "context" + "encoding/json" "sync" "testing" - "testing/fstest" "time" + "github.com/operator-framework/operator-registry/alpha/declcfg" + gql "github.com/operator-framework/operator-controller/internal/catalogd/graphql" ) -func TestCachedGraphQLService_CacheHit(t *testing.T) { - svc := NewCachedGraphQLService() - - // Create a test filesystem with valid catalog data - testFS := fstest.MapFS{ - "catalog.json": &fstest.MapFile{ - Data: []byte(`{ - "schema": "olm.package", - "name": "test-package", - "defaultChannel": "stable" - }`), +// testCatalogDataProvider implements CatalogDataProvider for testing using in-memory metas +type testCatalogDataProvider struct { + metas []*declcfg.Meta +} + +func newTestProvider(metas []*declcfg.Meta) *testCatalogDataProvider { + return &testCatalogDataProvider{metas: metas} +} + +func (p *testCatalogDataProvider) LoadCatalogSchema(_ string) (*gql.CatalogSchema, error) { + return gql.DiscoverSchemaFromMetas(p.metas) +} + +func (p *testCatalogDataProvider) NewObjectLoader(_ string) (gql.ObjectLoader, error) { + metasBySchema := make(map[string][]*declcfg.Meta) + for _, meta := range p.metas { + if meta.Schema != "" { + metasBySchema[meta.Schema] = append(metasBySchema[meta.Schema], meta) + } + } + return gql.NewInMemoryObjectLoader(metasBySchema), nil +} + +// testMetas returns a slice of Meta objects for use in tests +func testMetas() []*declcfg.Meta { + blob, _ := json.Marshal(map[string]interface{}{ + "schema": "olm.package", + "name": "test-package", + "defaultChannel": "stable", + }) + return []*declcfg.Meta{ + { + Schema: "olm.package", + Name: "test-package", + Blob: blob, }, } +} + +func TestCachedGraphQLService_CacheHit(t *testing.T) { + provider := newTestProvider(testMetas()) + svc := NewCachedGraphQLService(provider) // First call - cache miss, should build schema - schema1, err := svc.GetSchema("test-catalog", testFS) + schema1, err := svc.GetSchema(context.Background(), "test-catalog") if err != nil { t.Fatalf("First GetSchema failed: %v", err) } @@ -34,7 +65,7 @@ func TestCachedGraphQLService_CacheHit(t *testing.T) { } // Second call - cache hit, should return same schema without rebuilding - schema2, err := svc.GetSchema("test-catalog", testFS) + schema2, err := svc.GetSchema(context.Background(), "test-catalog") if err != nil { t.Fatalf("Second GetSchema failed: %v", err) } @@ -44,20 +75,11 @@ func TestCachedGraphQLService_CacheHit(t *testing.T) { } func TestCachedGraphQLService_InvalidateCache(t *testing.T) { - svc := NewCachedGraphQLService() - - testFS := fstest.MapFS{ - "catalog.json": &fstest.MapFile{ - Data: []byte(`{ - "schema": "olm.package", - "name": "test-package", - "defaultChannel": "stable" - }`), - }, - } + provider := newTestProvider(testMetas()) + svc := NewCachedGraphQLService(provider) // Build and cache schema - schema1, err := svc.GetSchema("test-catalog", testFS) + schema1, err := svc.GetSchema(context.Background(), "test-catalog") if err != nil { t.Fatalf("GetSchema failed: %v", err) } @@ -75,7 +97,7 @@ func TestCachedGraphQLService_InvalidateCache(t *testing.T) { } // Next call should rebuild - schema2, err := svc.GetSchema("test-catalog", testFS) + schema2, err := svc.GetSchema(context.Background(), "test-catalog") if err != nil { t.Fatalf("GetSchema after invalidation failed: %v", err) } @@ -85,17 +107,8 @@ func TestCachedGraphQLService_InvalidateCache(t *testing.T) { } func TestCachedGraphQLService_ConcurrentAccess(t *testing.T) { - svc := NewCachedGraphQLService() - - testFS := fstest.MapFS{ - "catalog.json": &fstest.MapFile{ - Data: []byte(`{ - "schema": "olm.package", - "name": "test-package", - "defaultChannel": "stable" - }`), - }, - } + provider := newTestProvider(testMetas()) + svc := NewCachedGraphQLService(provider) // Run multiple concurrent GetSchema calls const concurrency = 20 @@ -107,7 +120,7 @@ func TestCachedGraphQLService_ConcurrentAccess(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - schema, err := svc.GetSchema("test-catalog", testFS) + schema, err := svc.GetSchema(context.Background(), "test-catalog") if err != nil { errors <- err return @@ -145,38 +158,20 @@ func TestCachedGraphQLService_ConcurrentAccess(t *testing.T) { } func TestCachedGraphQLService_SingleflightDeduplication(t *testing.T) { - // Track schema build attempts by intercepting buildSchemaFromFS + // Track schema build attempts using a counting provider var buildCount int var buildMux sync.Mutex - // Create a custom service that counts builds - svc := &CachedGraphQLService{ - schemaCache: make(map[string]*gql.DynamicSchema), - } - - testFS := fstest.MapFS{ - "catalog.json": &fstest.MapFile{ - Data: []byte(`{ - "schema": "olm.package", - "name": "test-package", - "defaultChannel": "stable" - }`), - }, + metas := testMetas() + countingProvider := &countingCatalogDataProvider{ + metas: metas, + buildMux: &buildMux, + count: &buildCount, + delay: 50 * time.Millisecond, } - // Wrapper that counts and slows down schema builds - slowBuildWrapper := func(catalogFS fs.FS) (*gql.DynamicSchema, error) { - // Count this build attempt - buildMux.Lock() - buildCount++ - buildMux.Unlock() - - // Simulate slow build - time.Sleep(50 * time.Millisecond) - - // Call the actual build function - return buildSchemaFromFS(catalogFS) - } + // Create a service with the counting provider + svc := NewCachedGraphQLService(countingProvider) // Launch concurrent GetSchema calls that will race to build const concurrency = 10 @@ -188,37 +183,12 @@ func TestCachedGraphQLService_SingleflightDeduplication(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - - // Use singleflight manually to test deduplication - result, err, _ := svc.buildGroup.Do("test-catalog", func() (interface{}, error) { - // Check cache first - svc.schemaMux.RLock() - if cachedSchema, ok := svc.schemaCache["test-catalog"]; ok { - svc.schemaMux.RUnlock() - return cachedSchema, nil - } - svc.schemaMux.RUnlock() - - // Build schema (counted) - dynamicSchema, err := slowBuildWrapper(testFS) - if err != nil { - return nil, err - } - - // Cache it - svc.schemaMux.Lock() - svc.schemaCache["test-catalog"] = dynamicSchema - svc.schemaMux.Unlock() - - return dynamicSchema, nil - }) - + _, err := svc.GetSchema(context.Background(), "test-catalog") if err != nil { errorsMux.Lock() errors = append(errors, err) errorsMux.Unlock() } - _ = result }() } @@ -239,27 +209,46 @@ func TestCachedGraphQLService_SingleflightDeduplication(t *testing.T) { } } -func TestCachedGraphQLService_MultipleCatalogs(t *testing.T) { - svc := NewCachedGraphQLService() +// countingCatalogDataProvider wraps testCatalogDataProvider to count and delay LoadCatalogSchema calls +type countingCatalogDataProvider struct { + metas []*declcfg.Meta + buildMux *sync.Mutex + count *int + delay time.Duration +} - fs1 := fstest.MapFS{ - "catalog.json": &fstest.MapFile{ - Data: []byte(`{"schema": "olm.package", "name": "catalog1"}`), - }, - } - fs2 := fstest.MapFS{ - "catalog.json": &fstest.MapFile{ - Data: []byte(`{"schema": "olm.package", "name": "catalog2"}`), - }, +func (p *countingCatalogDataProvider) LoadCatalogSchema(_ string) (*gql.CatalogSchema, error) { + p.buildMux.Lock() + *p.count++ + p.buildMux.Unlock() + + // Simulate slow build + time.Sleep(p.delay) + + return gql.DiscoverSchemaFromMetas(p.metas) +} + +func (p *countingCatalogDataProvider) NewObjectLoader(_ string) (gql.ObjectLoader, error) { + metasBySchema := make(map[string][]*declcfg.Meta) + for _, meta := range p.metas { + if meta.Schema != "" { + metasBySchema[meta.Schema] = append(metasBySchema[meta.Schema], meta) + } } + return gql.NewInMemoryObjectLoader(metasBySchema), nil +} + +func TestCachedGraphQLService_MultipleCatalogs(t *testing.T) { + provider := newTestProvider(testMetas()) + svc := NewCachedGraphQLService(provider) // Build schemas for two different catalogs - schema1, err := svc.GetSchema("catalog1", fs1) + schema1, err := svc.GetSchema(context.Background(), "catalog1") if err != nil { t.Fatalf("GetSchema for catalog1 failed: %v", err) } - schema2, err := svc.GetSchema("catalog2", fs2) + schema2, err := svc.GetSchema(context.Background(), "catalog2") if err != nil { t.Fatalf("GetSchema for catalog2 failed: %v", err) } @@ -296,21 +285,12 @@ func TestCachedGraphQLService_MultipleCatalogs(t *testing.T) { } func TestCachedGraphQLService_ExecuteQuery(t *testing.T) { - svc := NewCachedGraphQLService() - - testFS := fstest.MapFS{ - "catalog.json": &fstest.MapFile{ - Data: []byte(`{ - "schema": "olm.package", - "name": "test-package", - "defaultChannel": "stable" - }`), - }, - } + provider := newTestProvider(testMetas()) + svc := NewCachedGraphQLService(provider) // Execute a simple introspection query query := `{ __schema { queryType { name } } }` - result, err := svc.ExecuteQuery("test-catalog", testFS, query) + result, err := svc.ExecuteQuery(context.Background(), "test-catalog", query) if err != nil { t.Fatalf("ExecuteQuery failed: %v", err) } diff --git a/internal/catalogd/storage/index.go b/internal/catalogd/storage/index.go index 510e23ff05..aadba814f7 100644 --- a/internal/catalogd/storage/index.go +++ b/internal/catalogd/storage/index.go @@ -92,6 +92,25 @@ func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section return sectionSet } +// SchemaSection represents a byte range within the catalog JSONL file +type SchemaSection struct { + Offset int64 + Length int64 +} + +// GetSchemaSections returns the byte-offset sections for a given schema name +func (i *index) GetSchemaSections(schema string) []SchemaSection { + sections, ok := i.BySchema[schema] + if !ok { + return nil + } + result := make([]SchemaSection, len(sections)) + for j, s := range sections { + result[j] = SchemaSection{Offset: s.offset, Length: s.length} + } + return result +} + func newIndex(metasChan <-chan *declcfg.Meta) *index { idx := &index{ BySchema: make(map[string][]section), diff --git a/internal/catalogd/storage/localdir.go b/internal/catalogd/storage/localdir.go index fa1319b665..11b915fb7b 100644 --- a/internal/catalogd/storage/localdir.go +++ b/internal/catalogd/storage/localdir.go @@ -19,6 +19,7 @@ import ( "github.com/operator-framework/operator-registry/alpha/declcfg" + gql "github.com/operator-framework/operator-controller/internal/catalogd/graphql" "github.com/operator-framework/operator-controller/internal/catalogd/server" "github.com/operator-framework/operator-controller/internal/catalogd/service" ) @@ -74,38 +75,68 @@ func NewLocalDirV1(rootDir string, rootURL *url.URL, enableMetasHandler MetasHan EnableGraphQLQueries: enableGraphQLQueries, } if enableGraphQLQueries == GraphQLQueriesEnabled { - s.graphqlSvc = service.NewCachedGraphQLService() + s.graphqlSvc = service.NewCachedGraphQLService(s) } return s } func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { + catalogDir, err := s.storeAtomicSwap(ctx, catalog, fsys) + if err != nil { + return err + } + + // Pre-warm GraphQL schema cache outside the write lock. + // Concurrent queries during this window get a cache miss and trigger + // their own build via singleflight, which is safe — the data is on disk. + if s.graphqlSvc != nil { + s.graphqlSvc.InvalidateCache(catalog) + + if _, err := s.graphqlSvc.GetSchema(context.Background(), catalog); err != nil { + // Schema build failed — remove the catalog to maintain consistency. + // Re-acquire the write lock for the rollback since it touches shared filesystem state. + s.m.Lock() + removeErr := os.RemoveAll(catalogDir) + s.m.Unlock() + if removeErr != nil { + return fmt.Errorf("failed to pre-build GraphQL schema for catalog %q: %w (rollback also failed: %v)", catalog, err, removeErr) + } + return fmt.Errorf("failed to pre-build GraphQL schema for catalog %q: %w", catalog, err) + } + } + + return nil +} + +// storeAtomicSwap writes catalog data to a temp dir and atomically swaps it +// into place. Holds the write lock for the duration of the filesystem operations only. +func (s *LocalDirV1) storeAtomicSwap(ctx context.Context, catalog string, fsys fs.FS) (string, error) { s.m.Lock() defer s.m.Unlock() if err := os.MkdirAll(s.RootDir, 0700); err != nil { - return err + return "", err } - // Remove any orphaned temporary directories left by previously interrupted Store - // operations (e.g. after a process crash where deferred cleanup did not run). if err := s.removeOrphanedTempDirs(catalog); err != nil { - return fmt.Errorf("error removing orphaned temp directories: %w", err) + return "", fmt.Errorf("error removing orphaned temp directories: %w", err) } tmpCatalogDir, err := os.MkdirTemp(s.RootDir, fmt.Sprintf(".%s-*", catalog)) if err != nil { - return err + return "", err } defer os.RemoveAll(tmpCatalogDir) storeMetaFuncs := []storeMetasFunc{storeCatalogData} - if s.EnableMetasHandler { + if s.EnableMetasHandler || s.EnableGraphQLQueries == GraphQLQueriesEnabled { storeMetaFuncs = append(storeMetaFuncs, storeIndexData) } + if s.EnableGraphQLQueries == GraphQLQueriesEnabled { + storeMetaFuncs = append(storeMetaFuncs, discoverAndStoreSchema) + } eg, egCtx := errgroup.WithContext(ctx) - // Pre-allocate metaChans with correct capacity to avoid reallocation metaChans := make([]chan *declcfg.Meta, 0, len(storeMetaFuncs)) for range storeMetaFuncs { @@ -133,11 +164,11 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro close(ch) } if err != nil { - return fmt.Errorf("error walking FBC root: %w", err) + return "", fmt.Errorf("error walking FBC root: %w", err) } if err := eg.Wait(); err != nil { - return err + return "", err } catalogDir := s.catalogDir(catalog) @@ -146,27 +177,10 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro os.Rename(tmpCatalogDir, catalogDir), ) if err != nil { - return err + return "", err } - // Invalidate and pre-warm GraphQL schema cache if GraphQL service is enabled - if s.graphqlSvc != nil { - s.graphqlSvc.InvalidateCache(catalog) - - // Pre-warm the GraphQL schema cache using the newly created catalog directory - // Use the actual catalog directory filesystem, not the input fsys - catalogFS := os.DirFS(catalogDir) - if _, err := s.graphqlSvc.GetSchema(catalog, catalogFS); err != nil { - // Schema build failed - rollback by removing the catalog directory - // to maintain consistency (don't persist catalog without valid schema) - if removeErr := os.RemoveAll(catalogDir); removeErr != nil { - return fmt.Errorf("failed to pre-build GraphQL schema for catalog %q: %w (rollback also failed: %v)", catalog, err, removeErr) - } - return fmt.Errorf("failed to pre-build GraphQL schema for catalog %q: %w", catalog, err) - } - } - - return nil + return catalogDir, nil } // removeOrphanedTempDirs removes temporary staging directories that were created by a @@ -214,7 +228,6 @@ func (s *LocalDirV1) ContentExists(catalog string) bool { return false } if !catalogFileStat.Mode().IsRegular() { - // path is not valid content return false } @@ -242,6 +255,10 @@ func catalogIndexFilePath(catalogDir string) string { return filepath.Join(catalogDir, "index.json") } +func catalogSchemaFilePath(catalogDir string) string { + return filepath.Join(catalogDir, "graphql-schema.json") +} + type storeMetasFunc func(catalogDir string, metaChan <-chan *declcfg.Meta) error func storeCatalogData(catalogDir string, metas <-chan *declcfg.Meta) error { @@ -273,6 +290,107 @@ func storeIndexData(catalogDir string, metas <-chan *declcfg.Meta) error { return enc.Encode(idx) } +func discoverAndStoreSchema(catalogDir string, metas <-chan *declcfg.Meta) error { + catalogSchema, err := gql.DiscoverSchemaFromChannel(metas) + if err != nil { + return fmt.Errorf("error discovering schema: %w", err) + } + + data, err := gql.MarshalCatalogSchema(catalogSchema) + if err != nil { + return fmt.Errorf("error marshaling catalog schema: %w", err) + } + + return os.WriteFile(catalogSchemaFilePath(catalogDir), data, 0600) +} + +// LoadCatalogSchema loads the pre-built catalog schema metadata from disk. +// Implements service.CatalogDataProvider. +func (s *LocalDirV1) LoadCatalogSchema(catalog string) (*gql.CatalogSchema, error) { + s.m.RLock() + defer s.m.RUnlock() + + data, err := os.ReadFile(catalogSchemaFilePath(s.catalogDir(catalog))) + if err != nil { + return nil, err + } + return gql.UnmarshalCatalogSchema(data) +} + +// NewObjectLoader creates an ObjectLoader that reads objects from the catalog's +// JSONL file using index-based byte offsets. Each query reads only the requested +// page from disk instead of holding all parsed objects in memory. +// Implements service.CatalogDataProvider. +func (s *LocalDirV1) NewObjectLoader(catalog string) (gql.ObjectLoader, error) { + idx, err := s.loadIndex(catalog) + if err != nil { + return nil, fmt.Errorf("error loading index for catalog %q: %w", catalog, err) + } + + catalogPath := catalogFilePath(s.catalogDir(catalog)) + + return func(schemaName string, offset, limit int) ([]map[string]interface{}, error) { + sections := idx.GetSchemaSections(schemaName) + if sections == nil { + return nil, nil + } + + if offset >= len(sections) { + return nil, nil + } + sections = sections[offset:] + + if limit < len(sections) { + sections = sections[:limit] + } + + f, err := os.Open(catalogPath) + if err != nil { + return nil, fmt.Errorf("error opening catalog file: %w", err) + } + defer f.Close() + + results := make([]map[string]interface{}, 0, len(sections)) + for _, sec := range sections { + buf := make([]byte, sec.Length) + if _, err := f.ReadAt(buf, sec.Offset); err != nil { + return nil, fmt.Errorf("error reading section at offset %d: %w", sec.Offset, err) + } + + var obj map[string]interface{} + if err := json.Unmarshal(buf, &obj); err != nil { + continue + } + results = append(results, obj) + } + + return results, nil + }, nil +} + +// loadIndex loads the index from disk using singleflight for efficiency. +func (s *LocalDirV1) loadIndex(catalog string) (*index, error) { + s.m.RLock() + defer s.m.RUnlock() + + result, err, _ := s.sf.Do(catalog, func() (interface{}, error) { + indexFile, err := os.Open(catalogIndexFilePath(s.catalogDir(catalog))) + if err != nil { + return nil, err + } + defer indexFile.Close() + var idx index + if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { + return nil, err + } + return &idx, nil + }) + if err != nil { + return nil, err + } + return result.(*index), nil +} + func (s *LocalDirV1) BaseURL(catalog string) string { return s.RootURL.JoinPath(catalog).String() } @@ -327,23 +445,5 @@ func (s *LocalDirV1) GetCatalogFS(catalog string) (fs.FS, error) { // GetIndex returns the index for a catalog // Implements server.CatalogStore interface func (s *LocalDirV1) GetIndex(catalog string) (server.Index, error) { - s.m.RLock() - defer s.m.RUnlock() - - idx, err, _ := s.sf.Do(catalog, func() (interface{}, error) { - indexFile, err := os.Open(catalogIndexFilePath(s.catalogDir(catalog))) - if err != nil { - return nil, err - } - defer indexFile.Close() - var idx index - if err := json.NewDecoder(indexFile).Decode(&idx); err != nil { - return nil, err - } - return &idx, nil - }) - if err != nil { - return nil, err - } - return idx.(*index), nil + return s.loadIndex(catalog) } From 1abb2b692881b8995f2052f1eceecd73397ccc12 Mon Sep 17 00:00:00 2001 From: grokspawn Date: Fri, 5 Jun 2026 12:05:20 -0500 Subject: [PATCH 2/2] review comments Signed-off-by: grokspawn --- internal/catalogd/graphql/graphql.go | 53 +- internal/catalogd/graphql/graphql_test.go | 684 +++++++++++++++++++ internal/catalogd/graphql/validation.go | 27 +- internal/catalogd/graphql/validation_test.go | 158 +++++ internal/catalogd/storage/index_test.go | 31 + internal/catalogd/storage/localdir.go | 2 +- internal/catalogd/storage/localdir_test.go | 107 +++ 7 files changed, 1047 insertions(+), 15 deletions(-) create mode 100644 internal/catalogd/graphql/validation_test.go diff --git a/internal/catalogd/graphql/graphql.go b/internal/catalogd/graphql/graphql.go index 6e01eeb009..dafc965514 100644 --- a/internal/catalogd/graphql/graphql.go +++ b/internal/catalogd/graphql/graphql.go @@ -49,11 +49,12 @@ type CatalogSchema struct { // serializableFieldInfo is a JSON-friendly representation of FieldInfo type serializableFieldInfo struct { - Name string `json:"name"` - OriginalName string `json:"originalName"` - JSONType string `json:"jsonType"` - IsArray bool `json:"isArray"` - NestedFields map[string]*serializableFieldInfo `json:"nestedFields,omitempty"` + Name string `json:"name"` + OriginalName string `json:"originalName"` + JSONType string `json:"jsonType"` + IsArray bool `json:"isArray"` + GraphQLTypeName string `json:"graphqlTypeName,omitempty"` + NestedFields map[string]*serializableFieldInfo `json:"nestedFields,omitempty"` } // serializableSchemaInfo is a JSON-friendly representation of SchemaInfo @@ -101,12 +102,38 @@ func stringToKind(s string) reflect.Kind { } } +func graphqlTypeName(t graphql.Type) string { + if list, ok := t.(*graphql.List); ok { + return graphqlTypeName(list.OfType) + } + return t.Name() +} + +func graphqlTypeFromName(name string, isArray bool) graphql.Type { + var base graphql.Type + switch name { + case "Int": + base = graphql.Int + case "Float": + base = graphql.Float + case "Boolean": + base = graphql.Boolean + default: + base = graphql.String + } + if isArray { + return graphql.NewList(base) + } + return base +} + func fieldInfoToSerializable(fi *FieldInfo) *serializableFieldInfo { sfi := &serializableFieldInfo{ - Name: fi.Name, - OriginalName: fi.OriginalName, - JSONType: kindToString(fi.JSONType), - IsArray: fi.IsArray, + Name: fi.Name, + OriginalName: fi.OriginalName, + JSONType: kindToString(fi.JSONType), + IsArray: fi.IsArray, + GraphQLTypeName: graphqlTypeName(fi.GraphQLType), } if len(fi.NestedFields) > 0 { sfi.NestedFields = make(map[string]*serializableFieldInfo) @@ -119,12 +146,18 @@ func fieldInfoToSerializable(fi *FieldInfo) *serializableFieldInfo { func serializableToFieldInfo(sfi *serializableFieldInfo) *FieldInfo { k := stringToKind(sfi.JSONType) + var gqlType graphql.Type + if sfi.GraphQLTypeName != "" { + gqlType = graphqlTypeFromName(sfi.GraphQLTypeName, sfi.IsArray) + } else { + gqlType = jsonTypeToGraphQL(k, sfi.IsArray, nil) + } fi := &FieldInfo{ Name: sfi.Name, OriginalName: sfi.OriginalName, JSONType: k, IsArray: sfi.IsArray, - GraphQLType: jsonTypeToGraphQL(k, sfi.IsArray, nil), + GraphQLType: gqlType, } if len(sfi.NestedFields) > 0 { fi.NestedFields = make(map[string]*FieldInfo) diff --git a/internal/catalogd/graphql/graphql_test.go b/internal/catalogd/graphql/graphql_test.go index 9597fcda2c..4143555735 100644 --- a/internal/catalogd/graphql/graphql_test.go +++ b/internal/catalogd/graphql/graphql_test.go @@ -1,7 +1,11 @@ package graphql import ( + "reflect" "testing" + "testing/fstest" + + graphqlgo "github.com/graphql-go/graphql" "github.com/operator-framework/operator-registry/alpha/declcfg" ) @@ -379,3 +383,683 @@ func TestIntegerTypeDetection(t *testing.T) { t.Error("floatArray not found") } } + +// --- Serialization round-trip tests --- + +func testCatalogMetas() []*declcfg.Meta { + return []*declcfg.Meta{ + { + Schema: "olm.package", + Package: "test-package", + Name: "test-package", + Blob: []byte(`{ + "schema": "olm.package", + "name": "test-package", + "defaultChannel": "stable", + "description": "A test package" + }`), + }, + { + Schema: "olm.bundle", + Package: "test-package", + Name: "test-package.v1.0.0", + Blob: []byte(`{ + "schema": "olm.bundle", + "name": "test-package.v1.0.0", + "package": "test-package", + "image": "registry.io/test@sha256:abc", + "properties": [ + {"type": "olm.package", "value": {"packageName": "test-package", "version": "1.0.0"}} + ], + "relatedImages": [ + {"name": "operator", "image": "registry.io/test@sha256:abc"} + ] + }`), + }, + { + Schema: "olm.channel", + Package: "test-package", + Name: "stable", + Blob: []byte(`{ + "schema": "olm.channel", + "name": "stable", + "package": "test-package", + "entries": [{"name": "test-package.v1.0.0"}] + }`), + }, + } +} + +func TestMarshalUnmarshalCatalogSchema_RoundTrip(t *testing.T) { + metas := testCatalogMetas() + original, err := DiscoverSchemaFromMetas(metas) + if err != nil { + t.Fatalf("DiscoverSchemaFromMetas failed: %v", err) + } + + data, err := MarshalCatalogSchema(original) + if err != nil { + t.Fatalf("MarshalCatalogSchema failed: %v", err) + } + + if len(data) == 0 { + t.Fatal("MarshalCatalogSchema returned empty data") + } + + restored, err := UnmarshalCatalogSchema(data) + if err != nil { + t.Fatalf("UnmarshalCatalogSchema failed: %v", err) + } + + // Verify schema count matches + if len(restored.Schemas) != len(original.Schemas) { + t.Fatalf("schema count mismatch: original=%d, restored=%d", + len(original.Schemas), len(restored.Schemas)) + } + + // Verify each schema's fields and metadata + for name, origInfo := range original.Schemas { + restoredInfo, ok := restored.Schemas[name] + if !ok { + t.Errorf("schema %q missing after round-trip", name) + continue + } + + if restoredInfo.TotalObjects != origInfo.TotalObjects { + t.Errorf("schema %q: TotalObjects mismatch: %d vs %d", + name, origInfo.TotalObjects, restoredInfo.TotalObjects) + } + + if len(restoredInfo.Fields) != len(origInfo.Fields) { + t.Errorf("schema %q: field count mismatch: %d vs %d", + name, len(origInfo.Fields), len(restoredInfo.Fields)) + } + + for fname, origField := range origInfo.Fields { + resField, ok := restoredInfo.Fields[fname] + if !ok { + t.Errorf("schema %q: field %q missing after round-trip", name, fname) + continue + } + if resField.Name != origField.Name { + t.Errorf("field %q: Name mismatch: %q vs %q", fname, origField.Name, resField.Name) + } + if resField.OriginalName != origField.OriginalName { + t.Errorf("field %q: OriginalName mismatch: %q vs %q", fname, origField.OriginalName, resField.OriginalName) + } + if resField.IsArray != origField.IsArray { + t.Errorf("field %q: IsArray mismatch: %v vs %v", fname, origField.IsArray, resField.IsArray) + } + // JSONType for reflect.Map/Slice falls back to String in serialization + // (kindToString doesn't handle Map/Slice). This is expected — the GraphQL + // type name is the authoritative source after round-trip, not JSONType. + if origField.JSONType != reflect.Map && origField.JSONType != reflect.Slice { + if resField.JSONType != origField.JSONType { + t.Errorf("field %q: JSONType mismatch: %v vs %v", fname, origField.JSONType, resField.JSONType) + } + } + } + } +} + +func TestUnmarshalCatalogSchema_InvalidJSON(t *testing.T) { + _, err := UnmarshalCatalogSchema([]byte(`{invalid`)) + if err == nil { + t.Fatal("expected error for invalid JSON") + } +} + +func TestMarshalCatalogSchema_WithNestedFields(t *testing.T) { + cs := &CatalogSchema{ + Schemas: map[string]*SchemaInfo{ + "test.schema": { + TotalObjects: 1, + Fields: map[string]*FieldInfo{ + "items": { + Name: "items", + OriginalName: "items", + GraphQLType: graphqlgo.NewList(graphqlgo.String), + JSONType: reflect.Map, + IsArray: true, + NestedFields: map[string]*FieldInfo{ + "key": { + Name: "key", + OriginalName: "key", + GraphQLType: graphqlgo.String, + JSONType: reflect.String, + }, + }, + }, + }, + }, + }, + } + + data, err := MarshalCatalogSchema(cs) + if err != nil { + t.Fatalf("MarshalCatalogSchema failed: %v", err) + } + + restored, err := UnmarshalCatalogSchema(data) + if err != nil { + t.Fatalf("UnmarshalCatalogSchema failed: %v", err) + } + + field := restored.Schemas["test.schema"].Fields["items"] + if field == nil { + t.Fatal("items field missing after round-trip") + } + if len(field.NestedFields) != 1 { + t.Fatalf("expected 1 nested field, got %d", len(field.NestedFields)) + } + if _, ok := field.NestedFields["key"]; !ok { + t.Error("nested field 'key' missing after round-trip") + } +} + +// --- kindToString / stringToKind tests --- + +func TestKindToString(t *testing.T) { + tests := []struct { + kind reflect.Kind + expected string + }{ + {reflect.String, "string"}, + {reflect.Bool, "bool"}, + {reflect.Int, "int"}, + {reflect.Int64, "int"}, + {reflect.Uint, "uint"}, + {reflect.Uint32, "uint"}, + {reflect.Float64, "float64"}, + {reflect.Float32, "float64"}, + {reflect.Struct, "string"}, // default + } + + for _, tt := range tests { + result := kindToString(tt.kind) + if result != tt.expected { + t.Errorf("kindToString(%v) = %q, want %q", tt.kind, result, tt.expected) + } + } +} + +func TestStringToKind(t *testing.T) { + tests := []struct { + input string + expected reflect.Kind + }{ + {"string", reflect.String}, + {"bool", reflect.Bool}, + {"int", reflect.Int}, + {"uint", reflect.Uint}, + {"float64", reflect.Float64}, + {"unknown", reflect.String}, // default + } + + for _, tt := range tests { + result := stringToKind(tt.input) + if result != tt.expected { + t.Errorf("stringToKind(%q) = %v, want %v", tt.input, result, tt.expected) + } + } +} + +// --- graphqlTypeName / graphqlTypeFromName tests --- + +func TestGraphqlTypeName(t *testing.T) { + tests := []struct { + gqlType graphqlgo.Type + expected string + }{ + {graphqlgo.String, "String"}, + {graphqlgo.Int, "Int"}, + {graphqlgo.Float, "Float"}, + {graphqlgo.Boolean, "Boolean"}, + {graphqlgo.NewList(graphqlgo.String), "String"}, + {graphqlgo.NewList(graphqlgo.Int), "Int"}, + } + + for _, tt := range tests { + result := graphqlTypeName(tt.gqlType) + if result != tt.expected { + t.Errorf("graphqlTypeName(%v) = %q, want %q", tt.gqlType, result, tt.expected) + } + } +} + +func TestGraphqlTypeFromName(t *testing.T) { + tests := []struct { + name string + isArray bool + check string + }{ + {"String", false, "String"}, + {"Int", false, "Int"}, + {"Float", false, "Float"}, + {"Boolean", false, "Boolean"}, + {"Unknown", false, "String"}, // defaults to String + {"String", true, "[String]"}, + {"Int", true, "[Int]"}, + } + + for _, tt := range tests { + result := graphqlTypeFromName(tt.name, tt.isArray) + if result.String() != tt.check { + t.Errorf("graphqlTypeFromName(%q, %v) = %q, want %q", tt.name, tt.isArray, result.String(), tt.check) + } + } +} + +// --- DiscoverSchemaFromChannel tests --- + +func TestDiscoverSchemaFromChannel(t *testing.T) { + metas := testCatalogMetas() + ch := make(chan *declcfg.Meta, len(metas)) + for _, m := range metas { + ch <- m + } + close(ch) + + cs, err := DiscoverSchemaFromChannel(ch) + if err != nil { + t.Fatalf("DiscoverSchemaFromChannel failed: %v", err) + } + + if len(cs.Schemas) != 3 { + t.Errorf("expected 3 schemas, got %d", len(cs.Schemas)) + } + + for _, name := range []string{"olm.package", "olm.bundle", "olm.channel"} { + info, ok := cs.Schemas[name] + if !ok { + t.Errorf("schema %q not discovered", name) + continue + } + if info.TotalObjects != 1 { + t.Errorf("schema %q: expected 1 object, got %d", name, info.TotalObjects) + } + if len(info.Fields) == 0 { + t.Errorf("schema %q: no fields discovered", name) + } + } +} + +func TestDiscoverSchemaFromChannel_SkipsEmptySchema(t *testing.T) { + ch := make(chan *declcfg.Meta, 1) + ch <- &declcfg.Meta{Schema: "", Blob: []byte(`{"name":"x"}`)} + close(ch) + + cs, err := DiscoverSchemaFromChannel(ch) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(cs.Schemas) != 0 { + t.Errorf("expected 0 schemas, got %d", len(cs.Schemas)) + } +} + +func TestDiscoverSchemaFromChannel_SkipsMalformedBlob(t *testing.T) { + ch := make(chan *declcfg.Meta, 1) + ch <- &declcfg.Meta{Schema: "test", Name: "bad", Blob: []byte(`{invalid json`)} + close(ch) + + cs, err := DiscoverSchemaFromChannel(ch) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + info := cs.Schemas["test"] + if info == nil { + t.Fatal("expected schema entry for 'test'") + } + if info.TotalObjects != 1 { + t.Errorf("expected TotalObjects=1 (counted before parse), got %d", info.TotalObjects) + } + if info.SampleObject != nil { + t.Error("expected nil SampleObject for malformed blob") + } +} + +// --- BuildDynamicGraphQLSchema + query execution tests --- + +func TestBuildDynamicGraphQLSchema_BasicQuery(t *testing.T) { + metas := testCatalogMetas() + catalogSchema, err := DiscoverSchemaFromMetas(metas) + if err != nil { + t.Fatalf("DiscoverSchemaFromMetas: %v", err) + } + + metasBySchema := make(map[string][]*declcfg.Meta) + for _, m := range metas { + if m.Schema != "" { + metasBySchema[m.Schema] = append(metasBySchema[m.Schema], m) + } + } + loader := NewInMemoryObjectLoader(metasBySchema) + + ds, err := BuildDynamicGraphQLSchema(catalogSchema, loader) + if err != nil { + t.Fatalf("BuildDynamicGraphQLSchema: %v", err) + } + + // Query for packages + result := graphqlgo.Do(graphqlgo.Params{ + Schema: ds.Schema, + RequestString: `{ olmpackages { name defaultChannel } }`, + }) + if len(result.Errors) > 0 { + t.Fatalf("GraphQL query errors: %v", result.Errors) + } + + data, ok := result.Data.(map[string]interface{}) + if !ok { + t.Fatal("expected map result") + } + packages, ok := data["olmpackages"].([]interface{}) + if !ok || len(packages) != 1 { + t.Fatalf("expected 1 package, got %v", data["olmpackages"]) + } +} + +func TestBuildDynamicGraphQLSchema_PaginationArgs(t *testing.T) { + metas := testCatalogMetas() + catalogSchema, err := DiscoverSchemaFromMetas(metas) + if err != nil { + t.Fatalf("DiscoverSchemaFromMetas: %v", err) + } + + metasBySchema := make(map[string][]*declcfg.Meta) + for _, m := range metas { + if m.Schema != "" { + metasBySchema[m.Schema] = append(metasBySchema[m.Schema], m) + } + } + loader := NewInMemoryObjectLoader(metasBySchema) + + ds, err := BuildDynamicGraphQLSchema(catalogSchema, loader) + if err != nil { + t.Fatalf("BuildDynamicGraphQLSchema: %v", err) + } + + // Offset past all results + result := graphqlgo.Do(graphqlgo.Params{ + Schema: ds.Schema, + RequestString: `{ olmpackages(offset: 100) { name } }`, + }) + if len(result.Errors) > 0 { + t.Fatalf("GraphQL query errors: %v", result.Errors) + } + + data := result.Data.(map[string]interface{}) + packages := data["olmpackages"] + if packages != nil { + if p, ok := packages.([]interface{}); ok && len(p) != 0 { + t.Errorf("expected empty result for high offset, got %d", len(p)) + } + } +} + +func TestBuildDynamicGraphQLSchema_SummaryField(t *testing.T) { + metas := testCatalogMetas() + catalogSchema, err := DiscoverSchemaFromMetas(metas) + if err != nil { + t.Fatalf("DiscoverSchemaFromMetas: %v", err) + } + + loader := NewInMemoryObjectLoader(nil) + + ds, err := BuildDynamicGraphQLSchema(catalogSchema, loader) + if err != nil { + t.Fatalf("BuildDynamicGraphQLSchema: %v", err) + } + + result := graphqlgo.Do(graphqlgo.Params{ + Schema: ds.Schema, + RequestString: `{ summary { totalSchemas schemas { name totalObjects totalFields } } }`, + }) + if len(result.Errors) > 0 { + t.Fatalf("GraphQL query errors: %v", result.Errors) + } + + data := result.Data.(map[string]interface{}) + summary, ok := data["summary"].(map[string]interface{}) + if !ok { + t.Fatal("expected summary in result") + } + totalSchemas, ok := summary["totalSchemas"] + if !ok { + t.Fatal("expected totalSchemas in summary") + } + if totalSchemas.(int) != 3 { + t.Errorf("expected 3 total schemas, got %v", totalSchemas) + } +} + +func TestBuildDynamicGraphQLSchema_IntrospectionQuery(t *testing.T) { + metas := testCatalogMetas() + catalogSchema, err := DiscoverSchemaFromMetas(metas) + if err != nil { + t.Fatalf("DiscoverSchemaFromMetas: %v", err) + } + + loader := NewInMemoryObjectLoader(nil) + + ds, err := BuildDynamicGraphQLSchema(catalogSchema, loader) + if err != nil { + t.Fatalf("BuildDynamicGraphQLSchema: %v", err) + } + + result := graphqlgo.Do(graphqlgo.Params{ + Schema: ds.Schema, + RequestString: `{ __schema { queryType { name } } }`, + }) + if len(result.Errors) > 0 { + t.Fatalf("introspection query errors: %v", result.Errors) + } +} + +// --- NewInMemoryObjectLoader tests --- + +func TestNewInMemoryObjectLoader_BasicPagination(t *testing.T) { + metas := map[string][]*declcfg.Meta{ + "test": { + {Schema: "test", Blob: []byte(`{"name":"a"}`)}, + {Schema: "test", Blob: []byte(`{"name":"b"}`)}, + {Schema: "test", Blob: []byte(`{"name":"c"}`)}, + }, + } + + loader := NewInMemoryObjectLoader(metas) + + // Full page + objs, err := loader("test", 0, 10) + if err != nil { + t.Fatalf("loader error: %v", err) + } + if len(objs) != 3 { + t.Errorf("expected 3 objects, got %d", len(objs)) + } + + // With offset + objs, err = loader("test", 1, 10) + if err != nil { + t.Fatalf("loader error: %v", err) + } + if len(objs) != 2 { + t.Errorf("expected 2 objects with offset=1, got %d", len(objs)) + } + + // With limit + objs, err = loader("test", 0, 2) + if err != nil { + t.Fatalf("loader error: %v", err) + } + if len(objs) != 2 { + t.Errorf("expected 2 objects with limit=2, got %d", len(objs)) + } + + // Offset past end + objs, err = loader("test", 100, 10) + if err != nil { + t.Fatalf("loader error: %v", err) + } + if objs != nil { + t.Errorf("expected nil for offset past end, got %v", objs) + } + + // Unknown schema + objs, err = loader("nonexistent", 0, 10) + if err != nil { + t.Fatalf("loader error: %v", err) + } + if objs != nil { + t.Errorf("expected nil for unknown schema, got %v", objs) + } +} + +func TestNewInMemoryObjectLoader_SkipsMalformedJSON(t *testing.T) { + metas := map[string][]*declcfg.Meta{ + "test": { + {Schema: "test", Blob: []byte(`{invalid`)}, + {Schema: "test", Blob: []byte(`{"name":"valid"}`)}, + }, + } + + loader := NewInMemoryObjectLoader(metas) + objs, err := loader("test", 0, 10) + if err != nil { + t.Fatalf("loader error: %v", err) + } + if len(objs) != 1 { + t.Errorf("expected 1 valid object (malformed skipped), got %d", len(objs)) + } +} + +// --- LoadAndSummarizeCatalogDynamic tests --- + +func TestLoadAndSummarizeCatalogDynamic(t *testing.T) { + catalogFS := &fstest.MapFS{ + "catalog.json": &fstest.MapFile{ + Data: []byte(`{"schema":"olm.package","name":"test-pkg","defaultChannel":"stable"} +{"schema":"olm.channel","name":"stable","package":"test-pkg","entries":[{"name":"test-pkg.v1.0.0"}]} +`), + }, + } + + ds, err := LoadAndSummarizeCatalogDynamic(catalogFS) + if err != nil { + t.Fatalf("LoadAndSummarizeCatalogDynamic failed: %v", err) + } + + if ds.CatalogSchema == nil { + t.Fatal("CatalogSchema is nil") + } + if len(ds.CatalogSchema.Schemas) != 2 { + t.Errorf("expected 2 schemas, got %d", len(ds.CatalogSchema.Schemas)) + } + + // Verify the schema actually works for queries + result := graphqlgo.Do(graphqlgo.Params{ + Schema: ds.Schema, + RequestString: `{ olmpackages { name } }`, + }) + if len(result.Errors) > 0 { + t.Fatalf("query errors: %v", result.Errors) + } +} + +// --- marshalComplexValue tests --- + +func TestMarshalComplexValue(t *testing.T) { + // nil + if v := marshalComplexValue(nil); v != nil { + t.Errorf("expected nil for nil input, got %v", v) + } + + // string passthrough + if v := marshalComplexValue("hello"); v != "hello" { + t.Errorf("expected 'hello', got %v", v) + } + + // int passthrough + if v := marshalComplexValue(42); v != 42 { + t.Errorf("expected 42, got %v", v) + } + + // map gets marshaled to JSON string + m := map[string]interface{}{"key": "value"} + v := marshalComplexValue(m) + s, ok := v.(string) + if !ok { + t.Fatalf("expected string for map, got %T", v) + } + if s != `{"key":"value"}` { + t.Errorf("expected JSON string for map, got %q", s) + } + + // slice gets marshaled to JSON string + sl := []interface{}{"a", "b"} + v = marshalComplexValue(sl) + s, ok = v.(string) + if !ok { + t.Fatalf("expected string for slice, got %T", v) + } + if s != `["a","b"]` { + t.Errorf("expected JSON string for slice, got %q", s) + } +} + +// --- Field resolver tests (via full query execution) --- + +func TestFieldResolvers_NestedObjectsAndScalars(t *testing.T) { + metas := []*declcfg.Meta{ + { + Schema: "olm.bundle", + Name: "test.v1.0.0", + Blob: []byte(`{ + "schema": "olm.bundle", + "name": "test.v1.0.0", + "package": "test-pkg", + "properties": [ + {"type": "olm.package", "value": {"packageName": "test-pkg", "version": "1.0.0"}} + ] + }`), + }, + } + + catalogSchema, err := DiscoverSchemaFromMetas(metas) + if err != nil { + t.Fatalf("DiscoverSchemaFromMetas: %v", err) + } + + metasBySchema := map[string][]*declcfg.Meta{"olm.bundle": metas} + loader := NewInMemoryObjectLoader(metasBySchema) + + ds, err := BuildDynamicGraphQLSchema(catalogSchema, loader) + if err != nil { + t.Fatalf("BuildDynamicGraphQLSchema: %v", err) + } + + result := graphqlgo.Do(graphqlgo.Params{ + Schema: ds.Schema, + RequestString: `{ olmbundles { name properties { type } } }`, + }) + if len(result.Errors) > 0 { + t.Fatalf("query errors: %v", result.Errors) + } + + data := result.Data.(map[string]interface{}) + bundles := data["olmbundles"].([]interface{}) + if len(bundles) != 1 { + t.Fatalf("expected 1 bundle, got %d", len(bundles)) + } + + bundle := bundles[0].(map[string]interface{}) + if bundle["name"] != "test.v1.0.0" { + t.Errorf("expected name 'test.v1.0.0', got %v", bundle["name"]) + } + + props, ok := bundle["properties"].([]interface{}) + if !ok || len(props) != 1 { + t.Fatalf("expected 1 property, got %v", bundle["properties"]) + } +} diff --git a/internal/catalogd/graphql/validation.go b/internal/catalogd/graphql/validation.go index 4c028e194c..733c61500f 100644 --- a/internal/catalogd/graphql/validation.go +++ b/internal/catalogd/graphql/validation.go @@ -14,8 +14,10 @@ const ( ) type queryComplexity struct { - aliases int - fields int + aliases int + fields int + fragments map[string]*ast.FragmentDefinition + visited map[string]bool } // ValidateQueryComplexity parses the query AST and rejects it if it exceeds @@ -26,7 +28,15 @@ func ValidateQueryComplexity(query string) error { return fmt.Errorf("query parse error: %w", err) } - c := &queryComplexity{} + c := &queryComplexity{ + fragments: make(map[string]*ast.FragmentDefinition), + visited: make(map[string]bool), + } + for _, def := range doc.Definitions { + if frag, ok := def.(*ast.FragmentDefinition); ok { + c.fragments[frag.Name.Value] = frag + } + } for _, def := range doc.Definitions { if op, ok := def.(*ast.OperationDefinition); ok { if err := c.walkSelectionSet(op.SelectionSet, 1); err != nil { @@ -66,7 +76,16 @@ func (c *queryComplexity) walkSelectionSet(ss *ast.SelectionSet, depth int) erro return err } case *ast.FragmentSpread: - c.fields++ + name := s.Name.Value + if c.visited[name] { + continue + } + c.visited[name] = true + if frag, ok := c.fragments[name]; ok { + if err := c.walkSelectionSet(frag.SelectionSet, depth+1); err != nil { + return err + } + } } } return nil diff --git a/internal/catalogd/graphql/validation_test.go b/internal/catalogd/graphql/validation_test.go new file mode 100644 index 0000000000..bd2b7e3a34 --- /dev/null +++ b/internal/catalogd/graphql/validation_test.go @@ -0,0 +1,158 @@ +package graphql + +import ( + "fmt" + "strings" + "testing" +) + +func TestValidateQueryComplexity_ValidSimpleQuery(t *testing.T) { + err := ValidateQueryComplexity(`{ olmpackages { name } }`) + if err != nil { + t.Fatalf("expected no error for simple query, got: %v", err) + } +} + +func TestValidateQueryComplexity_ParseError(t *testing.T) { + err := ValidateQueryComplexity(`{ invalid query {{{{`) + if err == nil { + t.Fatal("expected parse error for malformed query") + } + if !strings.Contains(err.Error(), "query parse error") { + t.Errorf("expected 'query parse error' in message, got: %v", err) + } +} + +func TestValidateQueryComplexity_ExceedsDepth(t *testing.T) { + // Build a query that exceeds MaxQueryDepth (10) + var b strings.Builder + b.WriteString("{ ") + for i := 0; i <= MaxQueryDepth+1; i++ { + b.WriteString(fmt.Sprintf("f%d { ", i)) + } + b.WriteString("leaf") + for i := 0; i <= MaxQueryDepth+1; i++ { + b.WriteString(" }") + } + b.WriteString(" }") + + err := ValidateQueryComplexity(b.String()) + if err == nil { + t.Fatal("expected depth error") + } + if !strings.Contains(err.Error(), "maximum depth") { + t.Errorf("expected 'maximum depth' in error, got: %v", err) + } +} + +func TestValidateQueryComplexity_WithinDepthLimit(t *testing.T) { + // Build a query at exactly MaxQueryDepth (should pass) + var b strings.Builder + b.WriteString("{ ") + for i := 1; i < MaxQueryDepth; i++ { + b.WriteString(fmt.Sprintf("f%d { ", i)) + } + b.WriteString("leaf") + for i := 1; i < MaxQueryDepth; i++ { + b.WriteString(" }") + } + b.WriteString(" }") + + err := ValidateQueryComplexity(b.String()) + if err != nil { + t.Fatalf("query at depth limit should pass, got: %v", err) + } +} + +func TestValidateQueryComplexity_ExceedsAliases(t *testing.T) { + var b strings.Builder + b.WriteString("{ ") + for i := 0; i <= MaxQueryAliases; i++ { + b.WriteString(fmt.Sprintf("a%d: name ", i)) + } + b.WriteString("}") + + err := ValidateQueryComplexity(b.String()) + if err == nil { + t.Fatal("expected alias count error") + } + if !strings.Contains(err.Error(), "maximum alias count") { + t.Errorf("expected 'maximum alias count' in error, got: %v", err) + } +} + +func TestValidateQueryComplexity_ExceedsFieldCount(t *testing.T) { + var b strings.Builder + b.WriteString("{ ") + for i := 0; i <= MaxQueryFields; i++ { + b.WriteString(fmt.Sprintf("f%d ", i)) + } + b.WriteString("}") + + err := ValidateQueryComplexity(b.String()) + if err == nil { + t.Fatal("expected field count error") + } + if !strings.Contains(err.Error(), "maximum field count") { + t.Errorf("expected 'maximum field count' in error, got: %v", err) + } +} + +func TestValidateQueryComplexity_WithFragments(t *testing.T) { + query := ` + fragment PkgFields on OlmPackage { + name + defaultChannel + } + { + olmpackages { + ...PkgFields + } + } + ` + err := ValidateQueryComplexity(query) + if err != nil { + t.Fatalf("expected no error for query with fragments, got: %v", err) + } +} + +func TestValidateQueryComplexity_WithInlineFragment(t *testing.T) { + query := ` + { + olmpackages { + ... on OlmPackage { + name + } + } + } + ` + err := ValidateQueryComplexity(query) + if err != nil { + t.Fatalf("expected no error for query with inline fragment, got: %v", err) + } +} + +func TestValidateQueryComplexity_EmptyQuery(t *testing.T) { + err := ValidateQueryComplexity(`{ __typename }`) + if err != nil { + t.Fatalf("expected no error for minimal query, got: %v", err) + } +} + +func TestValidateQueryComplexity_DuplicateFragmentSpreadSkipped(t *testing.T) { + query := ` + fragment F on OlmPackage { + name + } + { + olmpackages { + ...F + ...F + } + } + ` + err := ValidateQueryComplexity(query) + if err != nil { + t.Fatalf("expected no error with duplicate fragment spreads, got: %v", err) + } +} diff --git a/internal/catalogd/storage/index_test.go b/internal/catalogd/storage/index_test.go index 66dee0c133..32c5c9fe43 100644 --- a/internal/catalogd/storage/index_test.go +++ b/internal/catalogd/storage/index_test.go @@ -275,6 +275,37 @@ func TestIndexGet(t *testing.T) { } } +func TestGetSchemaSections(t *testing.T) { + metas := []*declcfg.Meta{ + {Schema: "olm.package", Name: "pkg1", Blob: []byte(`{"schema":"olm.package","name":"pkg1"}` + "\n")}, + {Schema: "olm.bundle", Name: "b1", Blob: []byte(`{"schema":"olm.bundle","name":"b1"}` + "\n")}, + {Schema: "olm.bundle", Name: "b2", Blob: []byte(`{"schema":"olm.bundle","name":"b2"}` + "\n")}, + } + + ch := make(chan *declcfg.Meta, len(metas)) + for _, m := range metas { + ch <- m + } + close(ch) + + idx := newIndex(ch) + + // Known schema with 2 entries + sections := idx.GetSchemaSections("olm.bundle") + require.Len(t, sections, 2, "expected 2 sections for olm.bundle") + for _, s := range sections { + require.Positive(t, s.Length, "section length should be positive") + } + + // Known schema with 1 entry + sections = idx.GetSchemaSections("olm.package") + require.Len(t, sections, 1, "expected 1 section for olm.package") + + // Unknown schema + sections = idx.GetSchemaSections("nonexistent") + require.Nil(t, sections, "expected nil for unknown schema") +} + // createBlob is a helper function that creates a JSON blob with a trailing newline func createBlob(t *testing.T, data map[string]interface{}) []byte { blob, err := json.Marshal(data) diff --git a/internal/catalogd/storage/localdir.go b/internal/catalogd/storage/localdir.go index 11b915fb7b..89d08d64e8 100644 --- a/internal/catalogd/storage/localdir.go +++ b/internal/catalogd/storage/localdir.go @@ -92,7 +92,7 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro if s.graphqlSvc != nil { s.graphqlSvc.InvalidateCache(catalog) - if _, err := s.graphqlSvc.GetSchema(context.Background(), catalog); err != nil { + if _, err := s.graphqlSvc.GetSchema(ctx, catalog); err != nil { // Schema build failed — remove the catalog to maintain consistency. // Re-acquire the write lock for the rollback since it touches shared filesystem state. s.m.Lock() diff --git a/internal/catalogd/storage/localdir_test.go b/internal/catalogd/storage/localdir_test.go index aa1286cc3a..76efc00e7f 100644 --- a/internal/catalogd/storage/localdir_test.go +++ b/internal/catalogd/storage/localdir_test.go @@ -706,3 +706,110 @@ func generateJSONLinesOrFail(t *testing.T, in []byte) string { return out.String() } + +func TestLocalDirV1_GraphQLEnabled_StoreAndQuery(t *testing.T) { + rootURL, _ := url.Parse("http://localhost/catalogs/") + s := NewLocalDirV1( + t.TempDir(), + rootURL, + MetasHandlerDisabled, + GraphQLQueriesEnabled, + ) + + if err := s.Store(context.Background(), "test-catalog", createTestFS(t)); err != nil { + t.Fatalf("Store with GraphQL enabled failed: %v", err) + } + + // Verify graphql-schema.json was written + schemaPath := catalogSchemaFilePath(s.catalogDir("test-catalog")) + if _, err := os.Stat(schemaPath); err != nil { + t.Fatalf("graphql-schema.json not found after Store: %v", err) + } + + // Test LoadCatalogSchema + cs, err := s.LoadCatalogSchema("test-catalog") + if err != nil { + t.Fatalf("LoadCatalogSchema failed: %v", err) + } + if len(cs.Schemas) == 0 { + t.Fatal("expected at least 1 schema in catalog") + } + + // Test NewObjectLoader + loader, err := s.NewObjectLoader("test-catalog") + if err != nil { + t.Fatalf("NewObjectLoader failed: %v", err) + } + + // Query for bundles + objs, err := loader("olm.bundle", 0, 100) + if err != nil { + t.Fatalf("ObjectLoader query failed: %v", err) + } + if len(objs) == 0 { + t.Error("expected at least 1 bundle object from loader") + } + + // Query for unknown schema returns nil + objs, err = loader("nonexistent.schema", 0, 10) + if err != nil { + t.Fatalf("ObjectLoader query for unknown schema failed: %v", err) + } + if objs != nil { + t.Errorf("expected nil for unknown schema, got %d objects", len(objs)) + } +} + +func TestLocalDirV1_GraphQLEnabled_InvalidateAndRebuild(t *testing.T) { + rootURL, _ := url.Parse("http://localhost/catalogs/") + s := NewLocalDirV1( + t.TempDir(), + rootURL, + MetasHandlerDisabled, + GraphQLQueriesEnabled, + ) + + if err := s.Store(context.Background(), "test-catalog", createTestFS(t)); err != nil { + t.Fatalf("Store failed: %v", err) + } + + // Invalidate the cache + s.graphqlSvc.InvalidateCache("test-catalog") + + // GetSchema should rebuild from disk + schema, err := s.graphqlSvc.GetSchema(context.Background(), "test-catalog") + if err != nil { + t.Fatalf("GetSchema after invalidation failed: %v", err) + } + if schema == nil { + t.Fatal("expected non-nil schema after rebuild") + } +} + +func TestLocalDirV1_LoadCatalogSchema_MissingCatalog(t *testing.T) { + s := NewLocalDirV1( + t.TempDir(), + nil, + MetasHandlerDisabled, + GraphQLQueriesEnabled, + ) + + _, err := s.LoadCatalogSchema("nonexistent") + if err == nil { + t.Fatal("expected error for missing catalog") + } +} + +func TestLocalDirV1_NewObjectLoader_MissingCatalog(t *testing.T) { + s := NewLocalDirV1( + t.TempDir(), + nil, + MetasHandlerDisabled, + GraphQLQueriesEnabled, + ) + + _, err := s.NewObjectLoader("nonexistent") + if err == nil { + t.Fatal("expected error for missing catalog index") + } +}