Skip to content

Commit

Permalink
feat: add ListUsers configuration options (#1572)
Browse files Browse the repository at this point in the history
  • Loading branch information
miparnisari committed Apr 26, 2024
1 parent 568e89c commit 4954cd6
Show file tree
Hide file tree
Showing 11 changed files with 560 additions and 16 deletions.
20 changes: 20 additions & 0 deletions .config-schema.json
Expand Up @@ -30,6 +30,12 @@
"default": 4294967295,
"x-env-variable": "OPENFGA_MAX_CONCURRENT_READS_FOR_LIST_OBJECTS"
},
"maxConcurrentReadsForListUsers": {
"description": "The maximum allowed number of concurrent reads in a single ListUsers query (default is MaxUint32).",
"type": "integer",
"default": 4294967295,
"x-env-variable": "OPENFGA_MAX_CONCURRENT_READS_FOR_LIST_USERS"
},
"changelogHorizonOffset": {
"description": "The offset (in minutes) from the current time. Changes that occur after this offset will not be included in the response of ReadChanges.",
"type": "integer",
Expand Down Expand Up @@ -62,6 +68,20 @@
"default": 1000,
"x-env-variable": "OPENFGA_LIST_OBJECTS_MAX_RESULTS"
},
"listUsersDeadline": {
"description": "The timeout deadline for serving ListUsers requests. If 0s, there is no deadline",
"type": "string",
"format": "duration",
"default": "3s",
"x-env-variable": "OPENFGA_LIST_USERS_DEADLINE"
},
"listUsersMaxResults": {
"description": "The maximum results to return in ListUsers API response. If 0, all results can be returned",
"type": "integer",
"minimum": 0,
"default": 1000,
"x-env-variable": "OPENFGA_LIST_USERS_MAX_RESULTS"
},
"requestDurationDatastoreQueryCountBuckets": {
"description": "Datastore query count buckets used to label the histogram metric for measuring request duration.",
"type": "array",
Expand Down
9 changes: 9 additions & 0 deletions cmd/run/flags.go
Expand Up @@ -156,6 +156,9 @@ func bindRunFlagsFunc(flags *pflag.FlagSet) func(*cobra.Command, []string) {
util.MustBindPFlag("maxConcurrentReadsForListObjects", flags.Lookup("max-concurrent-reads-for-list-objects"))
util.MustBindEnv("maxConcurrentReadsForListObjects", "OPENFGA_MAX_CONCURRENT_READS_FOR_LIST_OBJECTS", "OPENFGA_MAXCONCURRENTREADSFORLISTOBJECTS")

util.MustBindPFlag("maxConcurrentReadsForListUsers", flags.Lookup("max-concurrent-reads-for-list-users"))
util.MustBindEnv("maxConcurrentReadsForListUsers", "OPENFGA_MAX_CONCURRENT_READS_FOR_LIST_USERS", "OPENFGA_MAXCONCURRENTREADSFORLISTUSERS")

util.MustBindPFlag("maxConcurrentReadsForCheck", flags.Lookup("max-concurrent-reads-for-check"))
util.MustBindEnv("maxConcurrentReadsForCheck", "OPENFGA_MAX_CONCURRENT_READS_FOR_CHECK", "OPENFGA_MAXCONCURRENTREADSFORCHECK")

Expand All @@ -174,6 +177,12 @@ func bindRunFlagsFunc(flags *pflag.FlagSet) func(*cobra.Command, []string) {
util.MustBindPFlag("listObjectsMaxResults", flags.Lookup("listObjects-max-results"))
util.MustBindEnv("listObjectsMaxResults", "OPENFGA_LIST_OBJECTS_MAX_RESULTS", "OPENFGA_LISTOBJECTSMAXRESULTS")

util.MustBindPFlag("listUsersDeadline", flags.Lookup("listUsers-deadline"))
util.MustBindEnv("listUsersDeadline", "OPENFGA_LIST_USERS_DEADLINE", "OPENFGA_LISTUSERSDEADLINE")

util.MustBindPFlag("listUsersMaxResults", flags.Lookup("listUsers-max-results"))
util.MustBindEnv("listUsersMaxResults", "OPENFGA_LIST_USERS_MAX_RESULTS", "OPENFGA_LISTUSERSMAXRESULTS")

util.MustBindPFlag("checkQueryCache.enabled", flags.Lookup("check-query-cache-enabled"))
util.MustBindEnv("checkQueryCache.enabled", "OPENFGA_CHECK_QUERY_CACHE_ENABLED")

Expand Down
9 changes: 9 additions & 0 deletions cmd/run/run.go
Expand Up @@ -181,6 +181,8 @@ func NewRunCommand() *cobra.Command {

flags.Int("max-authorization-model-size-in-bytes", defaultConfig.MaxAuthorizationModelSizeInBytes, "the maximum size in bytes allowed for persisting an Authorization Model.")

flags.Uint32("max-concurrent-reads-for-list-users", defaultConfig.MaxConcurrentReadsForListUsers, "the maximum allowed number of concurrent datastore reads in a single ListUsers query. A high number will consume more connections from the datastore pool and will attempt to prioritize performance for the request at the expense of other queries performance.")

flags.Uint32("max-concurrent-reads-for-list-objects", defaultConfig.MaxConcurrentReadsForListObjects, "the maximum allowed number of concurrent datastore reads in a single ListObjects or StreamedListObjects query. A high number will consume more connections from the datastore pool and will attempt to prioritize performance for the request at the expense of other queries performance.")

flags.Uint32("max-concurrent-reads-for-check", defaultConfig.MaxConcurrentReadsForCheck, "the maximum allowed number of concurrent datastore reads in a single Check query. A high number will consume more connections from the datastore pool and will attempt to prioritize performance for the request at the expense of other queries performance.")
Expand All @@ -195,6 +197,10 @@ func NewRunCommand() *cobra.Command {

flags.Uint32("listObjects-max-results", defaultConfig.ListObjectsMaxResults, "the maximum results to return in non-streaming ListObjects API responses. If 0, all results can be returned")

flags.Duration("listUsers-deadline", defaultConfig.ListUsersDeadline, "the timeout deadline for serving ListUsers requests. If 0, there is no deadline")

flags.Uint32("listUsers-max-results", defaultConfig.ListUsersMaxResults, "the maximum results to return in ListUsers API responses. If 0, all results can be returned")

flags.Bool("check-query-cache-enabled", defaultConfig.CheckQueryCache.Enabled, "when executing Check and ListObjects requests, enables caching. This will turn Check and ListObjects responses into eventually consistent responses")

flags.Uint32("check-query-cache-limit", defaultConfig.CheckQueryCache.Limit, "if caching of Check and ListObjects calls is enabled, this is the size limit of the cache")
Expand Down Expand Up @@ -492,8 +498,11 @@ func (s *ServerContext) Run(ctx context.Context, config *serverconfig.Config) er
server.WithChangelogHorizonOffset(config.ChangelogHorizonOffset),
server.WithListObjectsDeadline(config.ListObjectsDeadline),
server.WithListObjectsMaxResults(config.ListObjectsMaxResults),
server.WithListUsersDeadline(config.ListUsersDeadline),
server.WithListUsersMaxResults(config.ListUsersMaxResults),
server.WithMaxConcurrentReadsForListObjects(config.MaxConcurrentReadsForListObjects),
server.WithMaxConcurrentReadsForCheck(config.MaxConcurrentReadsForCheck),
server.WithMaxConcurrentReadsForListUsers(config.MaxConcurrentReadsForListUsers),
server.WithCheckQueryCacheEnabled(config.CheckQueryCache.Enabled),
server.WithCheckQueryCacheLimit(config.CheckQueryCache.Limit),
server.WithCheckQueryCacheTTL(config.CheckQueryCache.TTL),
Expand Down
12 changes: 12 additions & 0 deletions cmd/run/run_test.go
Expand Up @@ -1028,6 +1028,10 @@ func TestDefaultConfig(t *testing.T) {
require.True(t, val.Exists())
require.EqualValues(t, val.Int(), cfg.MaxConcurrentReadsForCheck)

val = res.Get("properties.maxConcurrentReadsForListUsers.default")
require.True(t, val.Exists())
require.EqualValues(t, val.Int(), cfg.MaxConcurrentReadsForListUsers)

val = res.Get("properties.changelogHorizonOffset.default")
require.True(t, val.Exists())
require.EqualValues(t, val.Int(), cfg.ChangelogHorizonOffset)
Expand Down Expand Up @@ -1056,6 +1060,14 @@ func TestDefaultConfig(t *testing.T) {
require.True(t, val.Exists())
require.EqualValues(t, val.Int(), cfg.ListObjectsMaxResults)

val = res.Get("properties.listUsersDeadline.default")
require.True(t, val.Exists())
require.Equal(t, val.String(), cfg.ListUsersDeadline.String())

val = res.Get("properties.listUsersMaxResults.default")
require.True(t, val.Exists())
require.EqualValues(t, val.Int(), cfg.ListUsersMaxResults)

val = res.Get("properties.experimentals.default")
require.True(t, val.Exists())
require.Equal(t, len(val.Array()), len(cfg.Experimentals))
Expand Down
31 changes: 31 additions & 0 deletions internal/server/config/config.go
Expand Up @@ -22,6 +22,9 @@ const (
DefaultListObjectsMaxResults = 1000
DefaultMaxConcurrentReadsForCheck = math.MaxUint32
DefaultMaxConcurrentReadsForListObjects = math.MaxUint32
DefaultListUsersDeadline = 3 * time.Second
DefaultListUsersMaxResults = 1000
DefaultMaxConcurrentReadsForListUsers = math.MaxUint32

DefaultWriteContextByteLimit = 32 * 1_024 // 32KB
DefaultCheckQueryCacheLimit = 10000
Expand Down Expand Up @@ -199,6 +202,16 @@ type Config struct {
// This is to protect the server from misuse of the ListObjects endpoints.
ListObjectsMaxResults uint32

// ListUsersDeadline defines the maximum amount of time to accumulate ListUsers results
// before the server will respond. This is to protect the server from misuse of the
// ListUsers endpoints. It cannot be larger than HTTPConfig.UpstreamTimeout.
ListUsersDeadline time.Duration

// ListUsersMaxResults defines the maximum number of results to accumulate
// before the non-streaming ListUsers API will respond to the client.
// This is to protect the server from misuse of the ListUsers endpoints.
ListUsersMaxResults uint32

// MaxTuplesPerWrite defines the maximum number of tuples per Write endpoint.
MaxTuplesPerWrite int

Expand All @@ -218,6 +231,10 @@ type Config struct {
// Check queries
MaxConcurrentReadsForCheck uint32

// MaxConcurrentReadsForListUsers defines the maximum number of concurrent database reads
// allowed in ListUsers queries
MaxConcurrentReadsForListUsers uint32

// ChangelogHorizonOffset is an offset in minutes from the current time. Changes that occur
// after this offset will not be included in the response of ReadChanges.
ChangelogHorizonOffset int
Expand Down Expand Up @@ -257,6 +274,17 @@ func (cfg *Config) Verify() error {
cfg.ListObjectsDeadline,
)
}
if cfg.ListUsersDeadline > cfg.HTTP.UpstreamTimeout {
return fmt.Errorf(
"config 'http.upstreamTimeout' (%s) cannot be lower than 'listUsersDeadline' config (%s)",
cfg.HTTP.UpstreamTimeout,
cfg.ListUsersDeadline,
)
}

if cfg.MaxConcurrentReadsForListUsers == 0 {
return fmt.Errorf("config 'maxConcurrentReadsForListUsers' cannot be 0")
}

if cfg.Log.Format != "text" && cfg.Log.Format != "json" {
return fmt.Errorf("config 'log.format' must be one of ['text', 'json']")
Expand Down Expand Up @@ -344,12 +372,15 @@ func DefaultConfig() *Config {
MaxAuthorizationModelSizeInBytes: DefaultMaxAuthorizationModelSizeInBytes,
MaxConcurrentReadsForCheck: DefaultMaxConcurrentReadsForCheck,
MaxConcurrentReadsForListObjects: DefaultMaxConcurrentReadsForListObjects,
MaxConcurrentReadsForListUsers: DefaultMaxConcurrentReadsForListUsers,
ChangelogHorizonOffset: DefaultChangelogHorizonOffset,
ResolveNodeLimit: DefaultResolveNodeLimit,
ResolveNodeBreadthLimit: DefaultResolveNodeBreadthLimit,
Experimentals: []string{},
ListObjectsDeadline: DefaultListObjectsDeadline,
ListObjectsMaxResults: DefaultListObjectsMaxResults,
ListUsersMaxResults: DefaultListUsersMaxResults,
ListUsersDeadline: DefaultListUsersDeadline,
RequestDurationDatastoreQueryCountBuckets: []string{"50", "200"},
RequestDurationDispatchCountBuckets: []string{"50", "200"},
Datastore: DatastoreConfig{
Expand Down
17 changes: 17 additions & 0 deletions internal/server/config/config_test.go
Expand Up @@ -16,6 +16,23 @@ func TestVerifyConfig(t *testing.T) {
err := cfg.Verify()
require.EqualError(t, err, "config 'http.upstreamTimeout' (2s) cannot be lower than 'listObjectsDeadline' config (5m0s)")
})
t.Run("UpstreamTimeout_cannot_be_less_than_ListUsersDeadline", func(t *testing.T) {
cfg := DefaultConfig()
cfg.ListObjectsDeadline = 2 * time.Second
cfg.ListUsersDeadline = 5 * time.Minute
cfg.HTTP.UpstreamTimeout = 2 * time.Second

err := cfg.Verify()
require.EqualError(t, err, "config 'http.upstreamTimeout' (2s) cannot be lower than 'listUsersDeadline' config (5m0s)")
})

t.Run("maxConcurrentReadsForListUsers_not_zero", func(t *testing.T) {
cfg := DefaultConfig()
cfg.MaxConcurrentReadsForListUsers = 0

err := cfg.Verify()
require.EqualError(t, err, "config 'maxConcurrentReadsForListUsers' cannot be 0")
})

t.Run("failing_to_set_http_cert_path_will_not_allow_server_to_start", func(t *testing.T) {
cfg := DefaultConfig()
Expand Down
82 changes: 68 additions & 14 deletions pkg/server/commands/listusers/list_users_rpc.go
Expand Up @@ -6,12 +6,15 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

openfgav1 "github.com/openfga/api/proto/openfga/v1"
"github.com/sourcegraph/conc/pool"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"

serverconfig "github.com/openfga/openfga/internal/server/config"

"github.com/openfga/openfga/pkg/telemetry"

"github.com/openfga/openfga/pkg/logger"
Expand All @@ -35,13 +38,51 @@ type listUsersQuery struct {
typesystemResolver typesystem.TypesystemResolverFunc
resolveNodeBreadthLimit uint32
resolveNodeLimit uint32
maxResults uint32
maxConcurrentReads uint32
deadline time.Duration
}

type ListUsersQueryOption func(l *listUsersQuery)

func WithListUsersQueryLogger(l logger.Logger) ListUsersQueryOption {
return func(rq *listUsersQuery) {
rq.logger = l
return func(d *listUsersQuery) {
d.logger = l
}
}

// WithListUserMaxResults see server.WithListUsersMaxResults.
func WithListUsersMaxResults(max uint32) ListUsersQueryOption {
return func(d *listUsersQuery) {
d.maxResults = max
}
}

// WithListUsersDeadline see server.WithListUsersDeadline.
func WithListUsersDeadline(t time.Duration) ListUsersQueryOption {
return func(d *listUsersQuery) {
d.deadline = t
}
}

// WithResolveNodeLimit see server.WithResolveNodeLimit.
func WithResolveNodeLimit(limit uint32) ListUsersQueryOption {
return func(d *listUsersQuery) {
d.resolveNodeLimit = limit
}
}

// WithResolveNodeBreadthLimit see server.WithResolveNodeBreadthLimit.
func WithResolveNodeBreadthLimit(limit uint32) ListUsersQueryOption {
return func(d *listUsersQuery) {
d.resolveNodeBreadthLimit = limit
}
}

// WithListUsersMaxConcurrentReads see server.WithMaxConcurrentReadsForListUsers.
func WithListUsersMaxConcurrentReads(limit uint32) ListUsersQueryOption {
return func(d *listUsersQuery) {
d.maxConcurrentReads = limit
}
}

Expand All @@ -58,8 +99,11 @@ func NewListUsersQuery(ds storage.RelationshipTupleReader, opts ...ListUsersQuer

return typesys, nil
},
resolveNodeBreadthLimit: 20,
resolveNodeLimit: 25,
resolveNodeBreadthLimit: serverconfig.DefaultResolveNodeBreadthLimit,
resolveNodeLimit: serverconfig.DefaultResolveNodeLimit,
deadline: serverconfig.DefaultListObjectsDeadline,
maxResults: serverconfig.DefaultListObjectsMaxResults,
maxConcurrentReads: serverconfig.DefaultMaxConcurrentReadsForListObjects,
}

for _, opt := range opts {
Expand All @@ -69,13 +113,6 @@ func NewListUsersQuery(ds storage.RelationshipTupleReader, opts ...ListUsersQuer
return l
}

// WithResolveNodeLimit see server.WithResolveNodeLimit.
func WithResolveNodeLimit(limit uint32) ListUsersQueryOption {
return func(d *listUsersQuery) {
d.resolveNodeLimit = limit
}
}

// ListUsers assumes that the typesystem is in the context.
func (l *listUsersQuery) ListUsers(
ctx context.Context,
Expand All @@ -84,8 +121,18 @@ func (l *listUsersQuery) ListUsers(
ctx, span := tracer.Start(ctx, "ListUsers")
defer span.End()

l.ds = storagewrappers.NewCombinedTupleReader(l.ds, req.GetContextualTuples())
typesys, ok := typesystem.TypesystemFromContext(ctx)
cancellableCtx, cancelContextIfMaxResultsMet := context.WithCancel(ctx)
defer cancelContextIfMaxResultsMet()
if l.deadline != 0 {
var cancelContextIfDeadlineHit context.CancelFunc
cancellableCtx, cancelContextIfDeadlineHit = context.WithTimeout(cancellableCtx, l.deadline)
defer cancelContextIfDeadlineHit()
}
l.ds = storagewrappers.NewCombinedTupleReader(
storagewrappers.NewBoundedConcurrencyTupleReader(l.ds, l.maxConcurrentReads),
req.GetContextualTuples(),
)
typesys, ok := typesystem.TypesystemFromContext(cancellableCtx)
if !ok {
return nil, fmt.Errorf("typesystem missing in context")
}
Expand Down Expand Up @@ -114,6 +161,12 @@ func (l *listUsersQuery) ListUsers(
go func() {
for foundObject := range foundUsersCh {
foundUsersUnique[tuple.UserProtoToString(foundObject)] = struct{}{}
if l.maxResults > 0 {
if uint32(len(foundUsersUnique)) >= l.maxResults {
span.SetAttributes(attribute.Bool("max_results_found", true))
break
}
}
}

done <- struct{}{}
Expand All @@ -122,7 +175,7 @@ func (l *listUsersQuery) ListUsers(
go func() {
defer close(foundUsersCh)
internalRequest := fromListUsersRequest(req)
if err := l.expand(ctx, internalRequest, foundUsersCh); err != nil {
if err := l.expand(cancellableCtx, internalRequest, foundUsersCh); err != nil {
expandErrCh <- err
return
}
Expand All @@ -133,6 +186,7 @@ func (l *listUsersQuery) ListUsers(
telemetry.TraceError(span, err)
return nil, err
case <-done:
cancelContextIfMaxResultsMet()
break
}
foundUsers := make([]*openfgav1.User, 0, len(foundUsersUnique))
Expand Down

0 comments on commit 4954cd6

Please sign in to comment.