diff --git a/.config-schema.json b/.config-schema.json index 1df5cf9661..124ed25a61 100644 --- a/.config-schema.json +++ b/.config-schema.json @@ -30,6 +30,12 @@ "default": 4294967295, "x-env-variable": "OPENFGA_MAX_CONCURRENT_READS_FOR_LIST_OBJECTS" }, + "maxConcurrentReadsForListUsers": { + "description": "The maximum allowed number of concurrent reads in a single ListUsers query (default is MaxUint32).", + "type": "integer", + "default": 4294967295, + "x-env-variable": "OPENFGA_MAX_CONCURRENT_READS_FOR_LIST_USERS" + }, "changelogHorizonOffset": { "description": "The offset (in minutes) from the current time. Changes that occur after this offset will not be included in the response of ReadChanges.", "type": "integer", @@ -62,6 +68,20 @@ "default": 1000, "x-env-variable": "OPENFGA_LIST_OBJECTS_MAX_RESULTS" }, + "listUsersDeadline": { + "description": "The timeout deadline for serving ListUsers requests. If 0s, there is no deadline", + "type": "string", + "format": "duration", + "default": "3s", + "x-env-variable": "OPENFGA_LIST_USERS_DEADLINE" + }, + "listUsersMaxResults": { + "description": "The maximum results to return in ListUsers API response. If 0, all results can be returned", + "type": "integer", + "minimum": 0, + "default": 1000, + "x-env-variable": "OPENFGA_LIST_USERS_MAX_RESULTS" + }, "requestDurationDatastoreQueryCountBuckets": { "description": "Datastore query count buckets used to label the histogram metric for measuring request duration.", "type": "array", diff --git a/cmd/run/flags.go b/cmd/run/flags.go index 9e9840c0dc..e190efcbcb 100644 --- a/cmd/run/flags.go +++ b/cmd/run/flags.go @@ -156,6 +156,9 @@ func bindRunFlagsFunc(flags *pflag.FlagSet) func(*cobra.Command, []string) { util.MustBindPFlag("maxConcurrentReadsForListObjects", flags.Lookup("max-concurrent-reads-for-list-objects")) util.MustBindEnv("maxConcurrentReadsForListObjects", "OPENFGA_MAX_CONCURRENT_READS_FOR_LIST_OBJECTS", "OPENFGA_MAXCONCURRENTREADSFORLISTOBJECTS") + util.MustBindPFlag("maxConcurrentReadsForListUsers", flags.Lookup("max-concurrent-reads-for-list-users")) + util.MustBindEnv("maxConcurrentReadsForListUsers", "OPENFGA_MAX_CONCURRENT_READS_FOR_LIST_USERS", "OPENFGA_MAXCONCURRENTREADSFORLISTUSERS") + util.MustBindPFlag("maxConcurrentReadsForCheck", flags.Lookup("max-concurrent-reads-for-check")) util.MustBindEnv("maxConcurrentReadsForCheck", "OPENFGA_MAX_CONCURRENT_READS_FOR_CHECK", "OPENFGA_MAXCONCURRENTREADSFORCHECK") @@ -174,6 +177,12 @@ func bindRunFlagsFunc(flags *pflag.FlagSet) func(*cobra.Command, []string) { util.MustBindPFlag("listObjectsMaxResults", flags.Lookup("listObjects-max-results")) util.MustBindEnv("listObjectsMaxResults", "OPENFGA_LIST_OBJECTS_MAX_RESULTS", "OPENFGA_LISTOBJECTSMAXRESULTS") + util.MustBindPFlag("listUsersDeadline", flags.Lookup("listUsers-deadline")) + util.MustBindEnv("listUsersDeadline", "OPENFGA_LIST_USERS_DEADLINE", "OPENFGA_LISTUSERSDEADLINE") + + util.MustBindPFlag("listUsersMaxResults", flags.Lookup("listUsers-max-results")) + util.MustBindEnv("listUsersMaxResults", "OPENFGA_LIST_USERS_MAX_RESULTS", "OPENFGA_LISTUSERSMAXRESULTS") + util.MustBindPFlag("checkQueryCache.enabled", flags.Lookup("check-query-cache-enabled")) util.MustBindEnv("checkQueryCache.enabled", "OPENFGA_CHECK_QUERY_CACHE_ENABLED") diff --git a/cmd/run/run.go b/cmd/run/run.go index 5abf02f03e..a7affdeebe 100644 --- a/cmd/run/run.go +++ b/cmd/run/run.go @@ -181,6 +181,8 @@ func NewRunCommand() *cobra.Command { flags.Int("max-authorization-model-size-in-bytes", defaultConfig.MaxAuthorizationModelSizeInBytes, "the maximum size in bytes allowed for persisting an Authorization Model.") + flags.Uint32("max-concurrent-reads-for-list-users", defaultConfig.MaxConcurrentReadsForListUsers, "the maximum allowed number of concurrent datastore reads in a single ListUsers query. A high number will consume more connections from the datastore pool and will attempt to prioritize performance for the request at the expense of other queries performance.") + flags.Uint32("max-concurrent-reads-for-list-objects", defaultConfig.MaxConcurrentReadsForListObjects, "the maximum allowed number of concurrent datastore reads in a single ListObjects or StreamedListObjects query. A high number will consume more connections from the datastore pool and will attempt to prioritize performance for the request at the expense of other queries performance.") flags.Uint32("max-concurrent-reads-for-check", defaultConfig.MaxConcurrentReadsForCheck, "the maximum allowed number of concurrent datastore reads in a single Check query. A high number will consume more connections from the datastore pool and will attempt to prioritize performance for the request at the expense of other queries performance.") @@ -195,6 +197,10 @@ func NewRunCommand() *cobra.Command { flags.Uint32("listObjects-max-results", defaultConfig.ListObjectsMaxResults, "the maximum results to return in non-streaming ListObjects API responses. If 0, all results can be returned") + flags.Duration("listUsers-deadline", defaultConfig.ListUsersDeadline, "the timeout deadline for serving ListUsers requests. If 0, there is no deadline") + + flags.Uint32("listUsers-max-results", defaultConfig.ListUsersMaxResults, "the maximum results to return in ListUsers API responses. If 0, all results can be returned") + flags.Bool("check-query-cache-enabled", defaultConfig.CheckQueryCache.Enabled, "when executing Check and ListObjects requests, enables caching. This will turn Check and ListObjects responses into eventually consistent responses") flags.Uint32("check-query-cache-limit", defaultConfig.CheckQueryCache.Limit, "if caching of Check and ListObjects calls is enabled, this is the size limit of the cache") @@ -492,8 +498,11 @@ func (s *ServerContext) Run(ctx context.Context, config *serverconfig.Config) er server.WithChangelogHorizonOffset(config.ChangelogHorizonOffset), server.WithListObjectsDeadline(config.ListObjectsDeadline), server.WithListObjectsMaxResults(config.ListObjectsMaxResults), + server.WithListUsersDeadline(config.ListUsersDeadline), + server.WithListUsersMaxResults(config.ListUsersMaxResults), server.WithMaxConcurrentReadsForListObjects(config.MaxConcurrentReadsForListObjects), server.WithMaxConcurrentReadsForCheck(config.MaxConcurrentReadsForCheck), + server.WithMaxConcurrentReadsForListUsers(config.MaxConcurrentReadsForListUsers), server.WithCheckQueryCacheEnabled(config.CheckQueryCache.Enabled), server.WithCheckQueryCacheLimit(config.CheckQueryCache.Limit), server.WithCheckQueryCacheTTL(config.CheckQueryCache.TTL), diff --git a/cmd/run/run_test.go b/cmd/run/run_test.go index b7445460b4..352d1d44da 100644 --- a/cmd/run/run_test.go +++ b/cmd/run/run_test.go @@ -1028,6 +1028,10 @@ func TestDefaultConfig(t *testing.T) { require.True(t, val.Exists()) require.EqualValues(t, val.Int(), cfg.MaxConcurrentReadsForCheck) + val = res.Get("properties.maxConcurrentReadsForListUsers.default") + require.True(t, val.Exists()) + require.EqualValues(t, val.Int(), cfg.MaxConcurrentReadsForListUsers) + val = res.Get("properties.changelogHorizonOffset.default") require.True(t, val.Exists()) require.EqualValues(t, val.Int(), cfg.ChangelogHorizonOffset) @@ -1056,6 +1060,14 @@ func TestDefaultConfig(t *testing.T) { require.True(t, val.Exists()) require.EqualValues(t, val.Int(), cfg.ListObjectsMaxResults) + val = res.Get("properties.listUsersDeadline.default") + require.True(t, val.Exists()) + require.Equal(t, val.String(), cfg.ListUsersDeadline.String()) + + val = res.Get("properties.listUsersMaxResults.default") + require.True(t, val.Exists()) + require.EqualValues(t, val.Int(), cfg.ListUsersMaxResults) + val = res.Get("properties.experimentals.default") require.True(t, val.Exists()) require.Equal(t, len(val.Array()), len(cfg.Experimentals)) diff --git a/internal/server/config/config.go b/internal/server/config/config.go index 15f4599ad9..8063d56368 100644 --- a/internal/server/config/config.go +++ b/internal/server/config/config.go @@ -22,6 +22,9 @@ const ( DefaultListObjectsMaxResults = 1000 DefaultMaxConcurrentReadsForCheck = math.MaxUint32 DefaultMaxConcurrentReadsForListObjects = math.MaxUint32 + DefaultListUsersDeadline = 3 * time.Second + DefaultListUsersMaxResults = 1000 + DefaultMaxConcurrentReadsForListUsers = math.MaxUint32 DefaultWriteContextByteLimit = 32 * 1_024 // 32KB DefaultCheckQueryCacheLimit = 10000 @@ -199,6 +202,16 @@ type Config struct { // This is to protect the server from misuse of the ListObjects endpoints. ListObjectsMaxResults uint32 + // ListUsersDeadline defines the maximum amount of time to accumulate ListUsers results + // before the server will respond. This is to protect the server from misuse of the + // ListUsers endpoints. It cannot be larger than HTTPConfig.UpstreamTimeout. + ListUsersDeadline time.Duration + + // ListUsersMaxResults defines the maximum number of results to accumulate + // before the non-streaming ListUsers API will respond to the client. + // This is to protect the server from misuse of the ListUsers endpoints. + ListUsersMaxResults uint32 + // MaxTuplesPerWrite defines the maximum number of tuples per Write endpoint. MaxTuplesPerWrite int @@ -218,6 +231,10 @@ type Config struct { // Check queries MaxConcurrentReadsForCheck uint32 + // MaxConcurrentReadsForListUsers defines the maximum number of concurrent database reads + // allowed in ListUsers queries + MaxConcurrentReadsForListUsers uint32 + // ChangelogHorizonOffset is an offset in minutes from the current time. Changes that occur // after this offset will not be included in the response of ReadChanges. ChangelogHorizonOffset int @@ -257,6 +274,17 @@ func (cfg *Config) Verify() error { cfg.ListObjectsDeadline, ) } + if cfg.ListUsersDeadline > cfg.HTTP.UpstreamTimeout { + return fmt.Errorf( + "config 'http.upstreamTimeout' (%s) cannot be lower than 'listUsersDeadline' config (%s)", + cfg.HTTP.UpstreamTimeout, + cfg.ListUsersDeadline, + ) + } + + if cfg.MaxConcurrentReadsForListUsers == 0 { + return fmt.Errorf("config 'maxConcurrentReadsForListUsers' cannot be 0") + } if cfg.Log.Format != "text" && cfg.Log.Format != "json" { return fmt.Errorf("config 'log.format' must be one of ['text', 'json']") @@ -344,12 +372,15 @@ func DefaultConfig() *Config { MaxAuthorizationModelSizeInBytes: DefaultMaxAuthorizationModelSizeInBytes, MaxConcurrentReadsForCheck: DefaultMaxConcurrentReadsForCheck, MaxConcurrentReadsForListObjects: DefaultMaxConcurrentReadsForListObjects, + MaxConcurrentReadsForListUsers: DefaultMaxConcurrentReadsForListUsers, ChangelogHorizonOffset: DefaultChangelogHorizonOffset, ResolveNodeLimit: DefaultResolveNodeLimit, ResolveNodeBreadthLimit: DefaultResolveNodeBreadthLimit, Experimentals: []string{}, ListObjectsDeadline: DefaultListObjectsDeadline, ListObjectsMaxResults: DefaultListObjectsMaxResults, + ListUsersMaxResults: DefaultListUsersMaxResults, + ListUsersDeadline: DefaultListUsersDeadline, RequestDurationDatastoreQueryCountBuckets: []string{"50", "200"}, RequestDurationDispatchCountBuckets: []string{"50", "200"}, Datastore: DatastoreConfig{ diff --git a/internal/server/config/config_test.go b/internal/server/config/config_test.go index d3989b53b0..c62a9cf166 100644 --- a/internal/server/config/config_test.go +++ b/internal/server/config/config_test.go @@ -16,6 +16,23 @@ func TestVerifyConfig(t *testing.T) { err := cfg.Verify() require.EqualError(t, err, "config 'http.upstreamTimeout' (2s) cannot be lower than 'listObjectsDeadline' config (5m0s)") }) + t.Run("UpstreamTimeout_cannot_be_less_than_ListUsersDeadline", func(t *testing.T) { + cfg := DefaultConfig() + cfg.ListObjectsDeadline = 2 * time.Second + cfg.ListUsersDeadline = 5 * time.Minute + cfg.HTTP.UpstreamTimeout = 2 * time.Second + + err := cfg.Verify() + require.EqualError(t, err, "config 'http.upstreamTimeout' (2s) cannot be lower than 'listUsersDeadline' config (5m0s)") + }) + + t.Run("maxConcurrentReadsForListUsers_not_zero", func(t *testing.T) { + cfg := DefaultConfig() + cfg.MaxConcurrentReadsForListUsers = 0 + + err := cfg.Verify() + require.EqualError(t, err, "config 'maxConcurrentReadsForListUsers' cannot be 0") + }) t.Run("failing_to_set_http_cert_path_will_not_allow_server_to_start", func(t *testing.T) { cfg := DefaultConfig() diff --git a/pkg/server/commands/listusers/list_users_rpc.go b/pkg/server/commands/listusers/list_users_rpc.go index c8f8d7f25a..e0bb6fe360 100644 --- a/pkg/server/commands/listusers/list_users_rpc.go +++ b/pkg/server/commands/listusers/list_users_rpc.go @@ -6,12 +6,15 @@ import ( "fmt" "sync" "sync/atomic" + "time" openfgav1 "github.com/openfga/api/proto/openfga/v1" "github.com/sourcegraph/conc/pool" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + serverconfig "github.com/openfga/openfga/internal/server/config" + "github.com/openfga/openfga/pkg/telemetry" "github.com/openfga/openfga/pkg/logger" @@ -35,13 +38,51 @@ type listUsersQuery struct { typesystemResolver typesystem.TypesystemResolverFunc resolveNodeBreadthLimit uint32 resolveNodeLimit uint32 + maxResults uint32 + maxConcurrentReads uint32 + deadline time.Duration } type ListUsersQueryOption func(l *listUsersQuery) func WithListUsersQueryLogger(l logger.Logger) ListUsersQueryOption { - return func(rq *listUsersQuery) { - rq.logger = l + return func(d *listUsersQuery) { + d.logger = l + } +} + +// WithListUserMaxResults see server.WithListUsersMaxResults. +func WithListUsersMaxResults(max uint32) ListUsersQueryOption { + return func(d *listUsersQuery) { + d.maxResults = max + } +} + +// WithListUsersDeadline see server.WithListUsersDeadline. +func WithListUsersDeadline(t time.Duration) ListUsersQueryOption { + return func(d *listUsersQuery) { + d.deadline = t + } +} + +// WithResolveNodeLimit see server.WithResolveNodeLimit. +func WithResolveNodeLimit(limit uint32) ListUsersQueryOption { + return func(d *listUsersQuery) { + d.resolveNodeLimit = limit + } +} + +// WithResolveNodeBreadthLimit see server.WithResolveNodeBreadthLimit. +func WithResolveNodeBreadthLimit(limit uint32) ListUsersQueryOption { + return func(d *listUsersQuery) { + d.resolveNodeBreadthLimit = limit + } +} + +// WithListUsersMaxConcurrentReads see server.WithMaxConcurrentReadsForListUsers. +func WithListUsersMaxConcurrentReads(limit uint32) ListUsersQueryOption { + return func(d *listUsersQuery) { + d.maxConcurrentReads = limit } } @@ -58,8 +99,11 @@ func NewListUsersQuery(ds storage.RelationshipTupleReader, opts ...ListUsersQuer return typesys, nil }, - resolveNodeBreadthLimit: 20, - resolveNodeLimit: 25, + resolveNodeBreadthLimit: serverconfig.DefaultResolveNodeBreadthLimit, + resolveNodeLimit: serverconfig.DefaultResolveNodeLimit, + deadline: serverconfig.DefaultListObjectsDeadline, + maxResults: serverconfig.DefaultListObjectsMaxResults, + maxConcurrentReads: serverconfig.DefaultMaxConcurrentReadsForListObjects, } for _, opt := range opts { @@ -69,13 +113,6 @@ func NewListUsersQuery(ds storage.RelationshipTupleReader, opts ...ListUsersQuer return l } -// WithResolveNodeLimit see server.WithResolveNodeLimit. -func WithResolveNodeLimit(limit uint32) ListUsersQueryOption { - return func(d *listUsersQuery) { - d.resolveNodeLimit = limit - } -} - // ListUsers assumes that the typesystem is in the context. func (l *listUsersQuery) ListUsers( ctx context.Context, @@ -84,8 +121,18 @@ func (l *listUsersQuery) ListUsers( ctx, span := tracer.Start(ctx, "ListUsers") defer span.End() - l.ds = storagewrappers.NewCombinedTupleReader(l.ds, req.GetContextualTuples()) - typesys, ok := typesystem.TypesystemFromContext(ctx) + cancellableCtx, cancelContextIfMaxResultsMet := context.WithCancel(ctx) + defer cancelContextIfMaxResultsMet() + if l.deadline != 0 { + var cancelContextIfDeadlineHit context.CancelFunc + cancellableCtx, cancelContextIfDeadlineHit = context.WithTimeout(cancellableCtx, l.deadline) + defer cancelContextIfDeadlineHit() + } + l.ds = storagewrappers.NewCombinedTupleReader( + storagewrappers.NewBoundedConcurrencyTupleReader(l.ds, l.maxConcurrentReads), + req.GetContextualTuples(), + ) + typesys, ok := typesystem.TypesystemFromContext(cancellableCtx) if !ok { return nil, fmt.Errorf("typesystem missing in context") } @@ -114,6 +161,12 @@ func (l *listUsersQuery) ListUsers( go func() { for foundObject := range foundUsersCh { foundUsersUnique[tuple.UserProtoToString(foundObject)] = struct{}{} + if l.maxResults > 0 { + if uint32(len(foundUsersUnique)) >= l.maxResults { + span.SetAttributes(attribute.Bool("max_results_found", true)) + break + } + } } done <- struct{}{} @@ -122,7 +175,7 @@ func (l *listUsersQuery) ListUsers( go func() { defer close(foundUsersCh) internalRequest := fromListUsersRequest(req) - if err := l.expand(ctx, internalRequest, foundUsersCh); err != nil { + if err := l.expand(cancellableCtx, internalRequest, foundUsersCh); err != nil { expandErrCh <- err return } @@ -133,6 +186,7 @@ func (l *listUsersQuery) ListUsers( telemetry.TraceError(span, err) return nil, err case <-done: + cancelContextIfMaxResultsMet() break } foundUsers := make([]*openfgav1.User, 0, len(foundUsersUnique)) diff --git a/pkg/server/commands/listusers/list_users_rpc_test.go b/pkg/server/commands/listusers/list_users_rpc_test.go index 931ede2bd0..8ba762c9f0 100644 --- a/pkg/server/commands/listusers/list_users_rpc_test.go +++ b/pkg/server/commands/listusers/list_users_rpc_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/oklog/ulid/v2" openfgav1 "github.com/openfga/api/proto/openfga/v1" @@ -2567,3 +2568,349 @@ func TestListUsersReadFails_NoLeaks_TTU(t *testing.T) { require.ErrorContains(t, err, "simulated errors") require.Nil(t, resp) } + +func TestListUsersConfig_MaxResults(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + + ds := memory.New() + t.Cleanup(ds.Close) + + testCases := map[string]struct { + inputTuples []*openfgav1.TupleKey + inputModel string + inputRequest *openfgav1.ListUsersRequest + inputConfigMaxResults uint32 + allResults []*openfgav1.User // all the results. the server may return less + expectMinResults uint32 + }{ + `max_results_infinite`: { + inputModel: ` + model + schema 1.1 + type user + type repo + relations + define admin: [user]`, + inputTuples: []*openfgav1.TupleKey{ + tuple.NewTupleKey("repo:target", "admin", "user:1"), + tuple.NewTupleKey("repo:target", "admin", "user:2"), + }, + inputRequest: &openfgav1.ListUsersRequest{ + ContextualTuples: []*openfgav1.TupleKey{ + tuple.NewTupleKey("repo:target", "admin", "user:3"), + }, + Object: &openfgav1.Object{Type: "repo", Id: "target"}, + Relation: "admin", + UserFilters: []*openfgav1.UserTypeFilter{{Type: "user"}}, + }, + inputConfigMaxResults: 0, + allResults: []*openfgav1.User{ + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "1"}}}, + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "2"}}}, + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "3"}}}, + }, + expectMinResults: 3, + }, + `max_results_less_than_actual_results`: { + inputModel: ` + model + schema 1.1 + type user + type repo + relations + define admin: [user]`, + inputTuples: []*openfgav1.TupleKey{ + tuple.NewTupleKey("repo:target", "admin", "user:1"), + tuple.NewTupleKey("repo:target", "admin", "user:2"), + }, + inputRequest: &openfgav1.ListUsersRequest{ + ContextualTuples: []*openfgav1.TupleKey{ + tuple.NewTupleKey("repo:target", "admin", "user:3"), + }, + Object: &openfgav1.Object{Type: "repo", Id: "target"}, + Relation: "admin", + UserFilters: []*openfgav1.UserTypeFilter{{Type: "user"}}, + }, + inputConfigMaxResults: 2, + allResults: []*openfgav1.User{ + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "1"}}}, + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "2"}}}, + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "3"}}}, + }, + expectMinResults: 2, + }, + `max_results_more_than_actual_results`: { + inputModel: ` + model + schema 1.1 + type user + type repo + relations + define admin: [user]`, + inputTuples: []*openfgav1.TupleKey{ + tuple.NewTupleKey("repo:target", "admin", "user:1"), + }, + inputRequest: &openfgav1.ListUsersRequest{ + Object: &openfgav1.Object{Type: "repo", Id: "target"}, + Relation: "admin", + UserFilters: []*openfgav1.UserTypeFilter{{Type: "user"}}, + }, + inputConfigMaxResults: 2, + allResults: []*openfgav1.User{ + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "1"}}}, + }, + expectMinResults: 1, + }, + } + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + + // arrange: write model + model := testutils.MustTransformDSLToProtoWithID(test.inputModel) + + storeID := ulid.Make().String() + + err := ds.WriteAuthorizationModel(ctx, storeID, model) + require.NoError(t, err) + + // arrange: write tuples + err = ds.Write(context.Background(), storeID, nil, test.inputTuples) + require.NoError(t, err) + + typesys, err := typesystem.NewAndValidate(context.Background(), model) + require.NoError(t, err) + ctx = typesystem.ContextWithTypesystem(context.Background(), typesys) + + // assertions + test.inputRequest.StoreId = storeID + res, err := NewListUsersQuery(ds, + WithListUsersMaxResults(test.inputConfigMaxResults), + WithListUsersDeadline(10*time.Second), + ).ListUsers(ctx, test.inputRequest) + + require.NotNil(t, res) + require.NoError(t, err) + if test.inputConfigMaxResults != 0 { // don't get all results + require.LessOrEqual(t, len(res.GetUsers()), int(test.inputConfigMaxResults)) + } + require.GreaterOrEqual(t, len(res.GetUsers()), int(test.expectMinResults)) + require.Subset(t, test.allResults, res.GetUsers()) + }) + } +} + +func TestListUsersConfig_Deadline(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + + ds := memory.New() + t.Cleanup(ds.Close) + + testCases := map[string]struct { + inputTuples []*openfgav1.TupleKey + inputModel string + inputRequest *openfgav1.ListUsersRequest + inputConfigDeadline time.Duration // request can only take this time + inputReadDelay time.Duration // to be able to hit the deadline at a predictable time + allResults []*openfgav1.User // all the results. the server may return less + expectMinResults uint32 + expectError string + }{ + `deadline_very_small_returns_nothing`: { + inputModel: ` + model + schema 1.1 + type user + type repo + relations + define admin: [user]`, + inputTuples: []*openfgav1.TupleKey{ + tuple.NewTupleKey("repo:target", "admin", "user:1"), + }, + inputRequest: &openfgav1.ListUsersRequest{ + Object: &openfgav1.Object{Type: "repo", Id: "target"}, + Relation: "admin", + UserFilters: []*openfgav1.UserTypeFilter{{Type: "user"}}, + }, + inputConfigDeadline: 1 * time.Millisecond, + inputReadDelay: 50 * time.Millisecond, + allResults: []*openfgav1.User{ + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "1"}}}, + }, + expectError: "context deadline exceeded", + }, + `deadline_very_high_returns_everything`: { + inputModel: ` + model + schema 1.1 + type user + type repo + relations + define admin: [user]`, + inputTuples: []*openfgav1.TupleKey{ + tuple.NewTupleKey("repo:target", "admin", "user:1"), + }, + inputRequest: &openfgav1.ListUsersRequest{ + Object: &openfgav1.Object{Type: "repo", Id: "target"}, + Relation: "admin", + UserFilters: []*openfgav1.UserTypeFilter{{Type: "user"}}, + }, + inputConfigDeadline: 1 * time.Second, + inputReadDelay: 0 * time.Second, + allResults: []*openfgav1.User{ + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "1"}}}, + }, + expectMinResults: 1, + }, + } + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + + // arrange: write model + model := testutils.MustTransformDSLToProtoWithID(test.inputModel) + + storeID := ulid.Make().String() + + err := ds.WriteAuthorizationModel(ctx, storeID, model) + require.NoError(t, err) + + // arrange: write tuples + err = ds.Write(context.Background(), storeID, nil, test.inputTuples) + require.NoError(t, err) + + typesys, err := typesystem.NewAndValidate(context.Background(), model) + require.NoError(t, err) + ctx = typesystem.ContextWithTypesystem(context.Background(), typesys) + + // assertions + t.Run("regular_endpoint", func(t *testing.T) { + test.inputRequest.StoreId = storeID + res, err := NewListUsersQuery( + mocks.NewMockSlowDataStorage(ds, test.inputReadDelay), + WithListUsersDeadline(test.inputConfigDeadline), + ).ListUsers(ctx, test.inputRequest) + + if test.expectError != "" { + require.ErrorContains(t, err, test.expectError) + } else { + require.NoError(t, err) + require.GreaterOrEqual(t, len(res.GetUsers()), int(test.expectMinResults)) + require.Subset(t, test.allResults, res.GetUsers()) + } + }) + }) + } +} + +func TestListUsersConfig_MaxConcurrency(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) + + ds := memory.New() + t.Cleanup(ds.Close) + + testCases := map[string]struct { + inputTuples []*openfgav1.TupleKey + inputModel string + inputRequest *openfgav1.ListUsersRequest + inputConfigMaxConcurrentReads uint32 + inputReadDelay time.Duration // to be able to hit the deadline at a predictable time + allResults []*openfgav1.User // all the results. the server may return less + expectMinResults uint32 + expectMinExecutionTime time.Duration + }{ + `max_concurrent_reads_does_not_delay_response_if_only_contextual_tuples_are_in_place`: { + inputModel: ` + model + schema 1.1 + type user + type repo + relations + define admin: [user]`, + inputTuples: []*openfgav1.TupleKey{}, + inputRequest: &openfgav1.ListUsersRequest{ + Object: &openfgav1.Object{Type: "repo", Id: "target"}, + Relation: "admin", + UserFilters: []*openfgav1.UserTypeFilter{{Type: "user"}}, + ContextualTuples: []*openfgav1.TupleKey{ + tuple.NewTupleKey("repo:target", "admin", "user:1"), + }, + }, + inputConfigMaxConcurrentReads: 1, + allResults: []*openfgav1.User{ + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "1"}}}, + }, + expectMinResults: 1, + expectMinExecutionTime: 0 * time.Millisecond, + }, + `max_concurrent_reads_delays_response`: { + inputModel: ` + model + schema 1.1 + type user + type folder + relations + define admin: [user] + type repo + relations + define parent: [folder] + define admin: [user] or admin from parent`, // two parallel reads will have to be made + inputTuples: []*openfgav1.TupleKey{ + tuple.NewTupleKey("repo:target", "admin", "user:1"), + }, + inputRequest: &openfgav1.ListUsersRequest{ + Object: &openfgav1.Object{Type: "repo", Id: "target"}, + Relation: "admin", + UserFilters: []*openfgav1.UserTypeFilter{{Type: "user"}}, + }, + inputReadDelay: 1 * time.Second, + inputConfigMaxConcurrentReads: 1, + allResults: []*openfgav1.User{ + {User: &openfgav1.User_Object{Object: &openfgav1.Object{Type: "user", Id: "1"}}}, + }, + expectMinExecutionTime: 2 * time.Second, + }, + } + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + + // arrange: write model + model := testutils.MustTransformDSLToProtoWithID(test.inputModel) + + storeID := ulid.Make().String() + + err := ds.WriteAuthorizationModel(ctx, storeID, model) + require.NoError(t, err) + + // arrange: write tuples + err = ds.Write(context.Background(), storeID, nil, test.inputTuples) + require.NoError(t, err) + + typesys, err := typesystem.NewAndValidate(context.Background(), model) + require.NoError(t, err) + ctx = typesystem.ContextWithTypesystem(context.Background(), typesys) + + // assertions + t.Run("regular_endpoint", func(t *testing.T) { + test.inputRequest.StoreId = storeID + start := time.Now() + res, err := NewListUsersQuery( + mocks.NewMockSlowDataStorage(ds, test.inputReadDelay), + WithListUsersMaxConcurrentReads(test.inputConfigMaxConcurrentReads), + ).ListUsers(ctx, test.inputRequest) + + require.NoError(t, err) + require.GreaterOrEqual(t, len(res.GetUsers()), int(test.expectMinResults)) + require.Subset(t, test.allResults, res.GetUsers()) + require.GreaterOrEqual(t, time.Since(start), test.expectMinExecutionTime) + }) + }) + } +} diff --git a/pkg/server/list_users.go b/pkg/server/list_users.go index 622f46e8a7..0bd54aa610 100644 --- a/pkg/server/list_users.go +++ b/pkg/server/list_users.go @@ -51,7 +51,14 @@ func (s *Server) ListUsers( ctx = typesystem.ContextWithTypesystem(ctx, typesys) - listUsersQuery := listusers.NewListUsersQuery(s.datastore, listusers.WithResolveNodeLimit(s.resolveNodeLimit), listusers.WithListUsersQueryLogger(s.logger)) + listUsersQuery := listusers.NewListUsersQuery(s.datastore, + listusers.WithResolveNodeLimit(s.resolveNodeLimit), + listusers.WithResolveNodeBreadthLimit(s.resolveNodeBreadthLimit), + listusers.WithListUsersQueryLogger(s.logger), + listusers.WithListUsersMaxResults(s.listUsersMaxResults), + listusers.WithListUsersDeadline(s.listUsersDeadline), + listusers.WithListUsersMaxConcurrentReads(s.maxConcurrentReadsForListUsers), + ) resp, err := listUsersQuery.ListUsers(ctx, req) if err != nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index 23bf489a6b..deeebec817 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -116,8 +116,11 @@ type Server struct { changelogHorizonOffset int listObjectsDeadline time.Duration listObjectsMaxResults uint32 + listUsersDeadline time.Duration + listUsersMaxResults uint32 maxConcurrentReadsForListObjects uint32 maxConcurrentReadsForCheck uint32 + maxConcurrentReadsForListUsers uint32 maxAuthorizationModelSizeInBytes int experimentals []ExperimentalFeatureFlag serviceName string @@ -203,7 +206,7 @@ func WithChangelogHorizonOffset(offset int) OpenFGAServiceV1Option { } } -// WithListObjectsDeadline affect the ListObjects API and Streamed ListObjects API only. +// WithListObjectsDeadline affect the ListObjects API only. // It sets the maximum amount of time that the server will spend gathering results. func WithListObjectsDeadline(deadline time.Duration) OpenFGAServiceV1Option { return func(s *Server) { @@ -219,6 +222,23 @@ func WithListObjectsMaxResults(limit uint32) OpenFGAServiceV1Option { } } +// WithListUsersDeadline affect the ListUsers API only. +// It sets the maximum amount of time that the server will spend gathering results. +func WithListUsersDeadline(deadline time.Duration) OpenFGAServiceV1Option { + return func(s *Server) { + s.listUsersDeadline = deadline + } +} + +// WithListUsersMaxResults affects the ListUsers API only. +// It sets the maximum number of results that this API will return. +// If it's zero, all results will be attempted to be returned. +func WithListUsersMaxResults(limit uint32) OpenFGAServiceV1Option { + return func(s *Server) { + s.listUsersMaxResults = limit + } +} + // WithMaxConcurrentReadsForListObjects sets a limit on the number of datastore reads that can be in flight for a given ListObjects call. // This number should be set depending on the RPS expected for Check and ListObjects APIs, the number of OpenFGA replicas running, // and the number of connections the datastore allows. @@ -245,6 +265,19 @@ func WithMaxConcurrentReadsForCheck(max uint32) OpenFGAServiceV1Option { } } +// WithMaxConcurrentReadsForListUsers sets a limit on the number of datastore reads that can be in flight for a given ListUsers call. +// This number should be set depending on the RPS expected for all query APIs, the number of OpenFGA replicas running, +// and the number of connections the datastore allows. +// E.g. If Datastore.MaxOpenConns = 100 and assuming that each ListUsers call takes 1 second and no traffic to other query APIs: +// - One OpenFGA replica and expected traffic of 100 RPS => set it to 1. +// - One OpenFGA replica and expected traffic of 1 RPS => set it to 100. +// - Two OpenFGA replicas and expected traffic of 1 RPS => set it to 50. +func WithMaxConcurrentReadsForListUsers(max uint32) OpenFGAServiceV1Option { + return func(s *Server) { + s.maxConcurrentReadsForListUsers = max + } +} + func WithExperimentals(experimentals ...ExperimentalFeatureFlag) OpenFGAServiceV1Option { return func(s *Server) { s.experimentals = experimentals @@ -357,8 +390,11 @@ func NewServerWithOpts(opts ...OpenFGAServiceV1Option) (*Server, error) { resolveNodeBreadthLimit: serverconfig.DefaultResolveNodeBreadthLimit, listObjectsDeadline: serverconfig.DefaultListObjectsDeadline, listObjectsMaxResults: serverconfig.DefaultListObjectsMaxResults, + listUsersDeadline: serverconfig.DefaultListUsersDeadline, + listUsersMaxResults: serverconfig.DefaultListUsersMaxResults, maxConcurrentReadsForCheck: serverconfig.DefaultMaxConcurrentReadsForCheck, maxConcurrentReadsForListObjects: serverconfig.DefaultMaxConcurrentReadsForListObjects, + maxConcurrentReadsForListUsers: serverconfig.DefaultMaxConcurrentReadsForListUsers, maxAuthorizationModelSizeInBytes: serverconfig.DefaultMaxAuthorizationModelSizeInBytes, experimentals: make([]ExperimentalFeatureFlag, 0, 10), diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 7e451c7f1c..f8b17d66da 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -1507,6 +1507,7 @@ func TestDefaultMaxConcurrentReadSettings(t *testing.T) { cfg := serverconfig.DefaultConfig() require.EqualValues(t, math.MaxUint32, cfg.MaxConcurrentReadsForCheck) require.EqualValues(t, math.MaxUint32, cfg.MaxConcurrentReadsForListObjects) + require.EqualValues(t, math.MaxUint32, cfg.MaxConcurrentReadsForListUsers) s := MustNewServerWithOpts( WithDatastore(memory.New()), @@ -1514,6 +1515,7 @@ func TestDefaultMaxConcurrentReadSettings(t *testing.T) { t.Cleanup(s.Close) require.EqualValues(t, math.MaxUint32, s.maxConcurrentReadsForCheck) require.EqualValues(t, math.MaxUint32, s.maxConcurrentReadsForListObjects) + require.EqualValues(t, math.MaxUint32, s.maxConcurrentReadsForListUsers) } func TestDelegateCheckResolver(t *testing.T) {