From 5c60d9dccf68f495d1cb1df322ce81ba8207ecda Mon Sep 17 00:00:00 2001 From: Adrian Tam Date: Tue, 25 Apr 2023 11:19:00 -0400 Subject: [PATCH] fix: list objects to respect configurable ListObjectsDeadline (#704) --- pkg/server/commands/list_objects.go | 18 +++++ pkg/server/test/list_objects.go | 41 +++++++++- pkg/storage/mocks/slow_storage.go | 120 ++++++++++++++++++++++++++++ 3 files changed, 177 insertions(+), 2 deletions(-) create mode 100644 pkg/storage/mocks/slow_storage.go diff --git a/pkg/server/commands/list_objects.go b/pkg/server/commands/list_objects.go index 70e60a920b..08149a60c9 100644 --- a/pkg/server/commands/list_objects.go +++ b/pkg/server/commands/list_objects.go @@ -194,6 +194,16 @@ func (q *ListObjectsQuery) Execute( for { select { + + case <-timeoutCtx.Done(): + q.Logger.WarnWithContext( + ctx, "list objects timeout with list object configuration timeout", + zap.String("timeout duration", q.ListObjectsDeadline.String()), + ) + return &openfgapb.ListObjectsResponse{ + Objects: objects, + }, nil + case objectID, ok := <-resultsChan: if !ok { return &openfgapb.ListObjectsResponse{ @@ -236,6 +246,14 @@ func (q *ListObjectsQuery) ExecuteStreamed( for { select { + + case <-timeoutCtx.Done(): + q.Logger.WarnWithContext( + ctx, "list objects timeout with list object configuration timeout", + zap.String("timeout duration", q.ListObjectsDeadline.String()), + ) + return nil + case object, ok := <-resultsChan: if !ok { // Channel closed! No more results. diff --git a/pkg/server/test/list_objects.go b/pkg/server/test/list_objects.go index 6c181d1460..b2d4ce91c3 100644 --- a/pkg/server/test/list_objects.go +++ b/pkg/server/test/list_objects.go @@ -12,6 +12,7 @@ import ( "github.com/openfga/openfga/pkg/logger" "github.com/openfga/openfga/pkg/server/commands" "github.com/openfga/openfga/pkg/storage" + "github.com/openfga/openfga/pkg/storage/mocks" "github.com/openfga/openfga/pkg/tuple" "github.com/openfga/openfga/pkg/typesystem" "github.com/stretchr/testify/require" @@ -41,6 +42,8 @@ type listObjectsTestCase struct { allResults []string //all the results. the server may return less maxResults uint32 minimumResultsExpected uint32 + listObjectsDeadline time.Duration // 1 minute if not set + readTuplesDelay time.Duration // if set, purposely use a slow storage to slow down read and simulate timeout } func TestListObjectsRespectsMaxResults(t *testing.T, ds storage.OpenFGADatastore) { @@ -158,6 +161,29 @@ func TestListObjectsRespectsMaxResults(t *testing.T, ds storage.OpenFGADatastore minimumResultsExpected: 1, allResults: []string{"team:1"}, }, + { + name: "respects_max_results_when_deadline_timeout_and_returns_no_error_and_no_results", + schema: typesystem.SchemaVersion1_1, + model: ` + type user + type repo + relations + define admin: [user] as self + `, + tuples: []*openfgapb.TupleKey{ + tuple.NewTupleKey("repo:1", "admin", "user:alice"), + tuple.NewTupleKey("repo:2", "admin", "user:alice"), + }, + user: "user:alice", + objectType: "repo", + relation: "admin", + maxResults: 2, + minimumResultsExpected: 0, + // We expect empty array to be returned as list object will timeout due to readTuplesDelay > listObjectsDeadline + allResults: []string{}, + listObjectsDeadline: 1 * time.Second, + readTuplesDelay: 2 * time.Second, // We are mocking the ds to slow down the read call and simulate timeout + }, } for _, test := range testCases { @@ -181,10 +207,21 @@ func TestListObjectsRespectsMaxResults(t *testing.T, ds storage.OpenFGADatastore require.NoError(t, err) // act: run ListObjects + + listObjectsDeadline := time.Minute + if test.listObjectsDeadline > 0 { + listObjectsDeadline = test.listObjectsDeadline + } + + datastore := ds + if test.readTuplesDelay > 0 { + datastore = mocks.NewMockSlowDataStorage(ds, test.readTuplesDelay) + } + listObjectsQuery := &commands.ListObjectsQuery{ - Datastore: ds, + Datastore: datastore, Logger: logger.NewNoopLogger(), - ListObjectsDeadline: time.Minute, + ListObjectsDeadline: listObjectsDeadline, ListObjectsMaxResults: test.maxResults, ResolveNodeLimit: defaultResolveNodeLimit, } diff --git a/pkg/storage/mocks/slow_storage.go b/pkg/storage/mocks/slow_storage.go new file mode 100644 index 0000000000..92e21060d7 --- /dev/null +++ b/pkg/storage/mocks/slow_storage.go @@ -0,0 +1,120 @@ +package mocks + +import ( + "context" + "time" + + "github.com/openfga/openfga/pkg/storage" + openfgapb "go.buf.build/openfga/go/openfga/api/openfga/v1" +) + +// slowDataStorage is a proxy to the actual ds except the Reads are slow time by the readTuplesDelay +// This allows simulating list objection condition that times out +type slowDataStorage struct { + readTuplesDelay time.Duration + ds storage.OpenFGADatastore +} + +// NewMockSlowDataStorage returns a wrapper of a datastore that adds artificial delays into the reads of tuples +func NewMockSlowDataStorage(ds storage.OpenFGADatastore, readTuplesDelay time.Duration) storage.OpenFGADatastore { + return &slowDataStorage{ + readTuplesDelay: readTuplesDelay, + ds: ds, + } +} + +func (m *slowDataStorage) Close() {} + +func (m *slowDataStorage) ListObjectsByType(ctx context.Context, store string, objectType string) (storage.ObjectIterator, error) { + time.Sleep(m.readTuplesDelay) + return m.ds.ListObjectsByType(ctx, store, objectType) +} + +func (m *slowDataStorage) Read(ctx context.Context, store string, key *openfgapb.TupleKey) (storage.TupleIterator, error) { + time.Sleep(m.readTuplesDelay) + return m.ds.Read(ctx, store, key) +} + +func (m *slowDataStorage) ReadPage(ctx context.Context, store string, key *openfgapb.TupleKey, paginationOptions storage.PaginationOptions) ([]*openfgapb.Tuple, []byte, error) { + time.Sleep(m.readTuplesDelay) + return m.ds.ReadPage(ctx, store, key, paginationOptions) +} + +func (m *slowDataStorage) ReadChanges(ctx context.Context, store, objectType string, paginationOptions storage.PaginationOptions, horizonOffset time.Duration) ([]*openfgapb.TupleChange, []byte, error) { + return m.ds.ReadChanges(ctx, store, objectType, paginationOptions, horizonOffset) +} + +func (m *slowDataStorage) Write(ctx context.Context, store string, deletes storage.Deletes, writes storage.Writes) error { + return m.ds.Write(ctx, store, deletes, writes) +} + +func (m *slowDataStorage) ReadUserTuple(ctx context.Context, store string, key *openfgapb.TupleKey) (*openfgapb.Tuple, error) { + time.Sleep(m.readTuplesDelay) + return m.ds.ReadUserTuple(ctx, store, key) +} + +func (m *slowDataStorage) ReadUsersetTuples(ctx context.Context, store string, filter storage.ReadUsersetTuplesFilter) (storage.TupleIterator, error) { + time.Sleep(m.readTuplesDelay) + return m.ds.ReadUsersetTuples(ctx, store, filter) +} + +func (m *slowDataStorage) ReadStartingWithUser( + ctx context.Context, + store string, + filter storage.ReadStartingWithUserFilter, +) (storage.TupleIterator, error) { + time.Sleep(m.readTuplesDelay) + return m.ds.ReadStartingWithUser(ctx, store, filter) +} + +func (m *slowDataStorage) ReadAuthorizationModel(ctx context.Context, store string, id string) (*openfgapb.AuthorizationModel, error) { + return m.ds.ReadAuthorizationModel(ctx, store, id) +} + +func (m *slowDataStorage) ReadAuthorizationModels(ctx context.Context, store string, options storage.PaginationOptions) ([]*openfgapb.AuthorizationModel, []byte, error) { + return m.ds.ReadAuthorizationModels(ctx, store, options) +} + +func (m *slowDataStorage) FindLatestAuthorizationModelID(ctx context.Context, store string) (string, error) { + return m.ds.FindLatestAuthorizationModelID(ctx, store) +} + +func (m *slowDataStorage) WriteAuthorizationModel(ctx context.Context, store string, model *openfgapb.AuthorizationModel) error { + return m.ds.WriteAuthorizationModel(ctx, store, model) +} + +func (m *slowDataStorage) CreateStore(ctx context.Context, newStore *openfgapb.Store) (*openfgapb.Store, error) { + return m.ds.CreateStore(ctx, newStore) +} + +func (m *slowDataStorage) DeleteStore(ctx context.Context, id string) error { + return m.ds.DeleteStore(ctx, id) +} + +func (m *slowDataStorage) WriteAssertions(ctx context.Context, store, modelID string, assertions []*openfgapb.Assertion) error { + return m.ds.WriteAssertions(ctx, store, modelID, assertions) +} + +func (m *slowDataStorage) ReadAssertions(ctx context.Context, store, modelID string) ([]*openfgapb.Assertion, error) { + return m.ds.ReadAssertions(ctx, store, modelID) +} + +func (m *slowDataStorage) MaxTuplesPerWrite() int { + return m.ds.MaxTuplesPerWrite() +} + +func (m *slowDataStorage) MaxTypesPerAuthorizationModel() int { + return m.ds.MaxTypesPerAuthorizationModel() +} + +func (m *slowDataStorage) GetStore(ctx context.Context, storeID string) (*openfgapb.Store, error) { + return m.ds.GetStore(ctx, storeID) +} + +func (m *slowDataStorage) ListStores(ctx context.Context, paginationOptions storage.PaginationOptions) ([]*openfgapb.Store, []byte, error) { + return m.ds.ListStores(ctx, paginationOptions) +} + +func (m *slowDataStorage) IsReady(ctx context.Context) (bool, error) { + return m.ds.IsReady(ctx) +}