Skip to content

Commit

Permalink
fix: list objects to respect configurable ListObjectsDeadline (#704)
Browse files Browse the repository at this point in the history
  • Loading branch information
adriantam committed Apr 25, 2023
1 parent f3c97f1 commit 5c60d9d
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 2 deletions.
18 changes: 18 additions & 0 deletions pkg/server/commands/list_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ func (q *ListObjectsQuery) Execute(

for {
select {

case <-timeoutCtx.Done():
q.Logger.WarnWithContext(
ctx, "list objects timeout with list object configuration timeout",
zap.String("timeout duration", q.ListObjectsDeadline.String()),
)
return &openfgapb.ListObjectsResponse{
Objects: objects,
}, nil

case objectID, ok := <-resultsChan:
if !ok {
return &openfgapb.ListObjectsResponse{
Expand Down Expand Up @@ -236,6 +246,14 @@ func (q *ListObjectsQuery) ExecuteStreamed(

for {
select {

case <-timeoutCtx.Done():
q.Logger.WarnWithContext(
ctx, "list objects timeout with list object configuration timeout",
zap.String("timeout duration", q.ListObjectsDeadline.String()),
)
return nil

case object, ok := <-resultsChan:
if !ok {
// Channel closed! No more results.
Expand Down
41 changes: 39 additions & 2 deletions pkg/server/test/list_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/openfga/openfga/pkg/logger"
"github.com/openfga/openfga/pkg/server/commands"
"github.com/openfga/openfga/pkg/storage"
"github.com/openfga/openfga/pkg/storage/mocks"
"github.com/openfga/openfga/pkg/tuple"
"github.com/openfga/openfga/pkg/typesystem"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -41,6 +42,8 @@ type listObjectsTestCase struct {
allResults []string //all the results. the server may return less
maxResults uint32
minimumResultsExpected uint32
listObjectsDeadline time.Duration // 1 minute if not set
readTuplesDelay time.Duration // if set, purposely use a slow storage to slow down read and simulate timeout
}

func TestListObjectsRespectsMaxResults(t *testing.T, ds storage.OpenFGADatastore) {
Expand Down Expand Up @@ -158,6 +161,29 @@ func TestListObjectsRespectsMaxResults(t *testing.T, ds storage.OpenFGADatastore
minimumResultsExpected: 1,
allResults: []string{"team:1"},
},
{
name: "respects_max_results_when_deadline_timeout_and_returns_no_error_and_no_results",
schema: typesystem.SchemaVersion1_1,
model: `
type user
type repo
relations
define admin: [user] as self
`,
tuples: []*openfgapb.TupleKey{
tuple.NewTupleKey("repo:1", "admin", "user:alice"),
tuple.NewTupleKey("repo:2", "admin", "user:alice"),
},
user: "user:alice",
objectType: "repo",
relation: "admin",
maxResults: 2,
minimumResultsExpected: 0,
// We expect empty array to be returned as list object will timeout due to readTuplesDelay > listObjectsDeadline
allResults: []string{},
listObjectsDeadline: 1 * time.Second,
readTuplesDelay: 2 * time.Second, // We are mocking the ds to slow down the read call and simulate timeout
},
}

for _, test := range testCases {
Expand All @@ -181,10 +207,21 @@ func TestListObjectsRespectsMaxResults(t *testing.T, ds storage.OpenFGADatastore
require.NoError(t, err)

// act: run ListObjects

listObjectsDeadline := time.Minute
if test.listObjectsDeadline > 0 {
listObjectsDeadline = test.listObjectsDeadline
}

datastore := ds
if test.readTuplesDelay > 0 {
datastore = mocks.NewMockSlowDataStorage(ds, test.readTuplesDelay)
}

listObjectsQuery := &commands.ListObjectsQuery{
Datastore: ds,
Datastore: datastore,
Logger: logger.NewNoopLogger(),
ListObjectsDeadline: time.Minute,
ListObjectsDeadline: listObjectsDeadline,
ListObjectsMaxResults: test.maxResults,
ResolveNodeLimit: defaultResolveNodeLimit,
}
Expand Down
120 changes: 120 additions & 0 deletions pkg/storage/mocks/slow_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package mocks

import (
"context"
"time"

"github.com/openfga/openfga/pkg/storage"
openfgapb "go.buf.build/openfga/go/openfga/api/openfga/v1"
)

// slowDataStorage is a proxy to the actual ds except the Reads are slow time by the readTuplesDelay
// This allows simulating list objection condition that times out
type slowDataStorage struct {
readTuplesDelay time.Duration
ds storage.OpenFGADatastore
}

// NewMockSlowDataStorage returns a wrapper of a datastore that adds artificial delays into the reads of tuples
func NewMockSlowDataStorage(ds storage.OpenFGADatastore, readTuplesDelay time.Duration) storage.OpenFGADatastore {
return &slowDataStorage{
readTuplesDelay: readTuplesDelay,
ds: ds,
}
}

func (m *slowDataStorage) Close() {}

func (m *slowDataStorage) ListObjectsByType(ctx context.Context, store string, objectType string) (storage.ObjectIterator, error) {
time.Sleep(m.readTuplesDelay)
return m.ds.ListObjectsByType(ctx, store, objectType)
}

func (m *slowDataStorage) Read(ctx context.Context, store string, key *openfgapb.TupleKey) (storage.TupleIterator, error) {
time.Sleep(m.readTuplesDelay)
return m.ds.Read(ctx, store, key)
}

func (m *slowDataStorage) ReadPage(ctx context.Context, store string, key *openfgapb.TupleKey, paginationOptions storage.PaginationOptions) ([]*openfgapb.Tuple, []byte, error) {
time.Sleep(m.readTuplesDelay)
return m.ds.ReadPage(ctx, store, key, paginationOptions)
}

func (m *slowDataStorage) ReadChanges(ctx context.Context, store, objectType string, paginationOptions storage.PaginationOptions, horizonOffset time.Duration) ([]*openfgapb.TupleChange, []byte, error) {
return m.ds.ReadChanges(ctx, store, objectType, paginationOptions, horizonOffset)
}

func (m *slowDataStorage) Write(ctx context.Context, store string, deletes storage.Deletes, writes storage.Writes) error {
return m.ds.Write(ctx, store, deletes, writes)
}

func (m *slowDataStorage) ReadUserTuple(ctx context.Context, store string, key *openfgapb.TupleKey) (*openfgapb.Tuple, error) {
time.Sleep(m.readTuplesDelay)
return m.ds.ReadUserTuple(ctx, store, key)
}

func (m *slowDataStorage) ReadUsersetTuples(ctx context.Context, store string, filter storage.ReadUsersetTuplesFilter) (storage.TupleIterator, error) {
time.Sleep(m.readTuplesDelay)
return m.ds.ReadUsersetTuples(ctx, store, filter)
}

func (m *slowDataStorage) ReadStartingWithUser(
ctx context.Context,
store string,
filter storage.ReadStartingWithUserFilter,
) (storage.TupleIterator, error) {
time.Sleep(m.readTuplesDelay)
return m.ds.ReadStartingWithUser(ctx, store, filter)
}

func (m *slowDataStorage) ReadAuthorizationModel(ctx context.Context, store string, id string) (*openfgapb.AuthorizationModel, error) {
return m.ds.ReadAuthorizationModel(ctx, store, id)
}

func (m *slowDataStorage) ReadAuthorizationModels(ctx context.Context, store string, options storage.PaginationOptions) ([]*openfgapb.AuthorizationModel, []byte, error) {
return m.ds.ReadAuthorizationModels(ctx, store, options)
}

func (m *slowDataStorage) FindLatestAuthorizationModelID(ctx context.Context, store string) (string, error) {
return m.ds.FindLatestAuthorizationModelID(ctx, store)
}

func (m *slowDataStorage) WriteAuthorizationModel(ctx context.Context, store string, model *openfgapb.AuthorizationModel) error {
return m.ds.WriteAuthorizationModel(ctx, store, model)
}

func (m *slowDataStorage) CreateStore(ctx context.Context, newStore *openfgapb.Store) (*openfgapb.Store, error) {
return m.ds.CreateStore(ctx, newStore)
}

func (m *slowDataStorage) DeleteStore(ctx context.Context, id string) error {
return m.ds.DeleteStore(ctx, id)
}

func (m *slowDataStorage) WriteAssertions(ctx context.Context, store, modelID string, assertions []*openfgapb.Assertion) error {
return m.ds.WriteAssertions(ctx, store, modelID, assertions)
}

func (m *slowDataStorage) ReadAssertions(ctx context.Context, store, modelID string) ([]*openfgapb.Assertion, error) {
return m.ds.ReadAssertions(ctx, store, modelID)
}

func (m *slowDataStorage) MaxTuplesPerWrite() int {
return m.ds.MaxTuplesPerWrite()
}

func (m *slowDataStorage) MaxTypesPerAuthorizationModel() int {
return m.ds.MaxTypesPerAuthorizationModel()
}

func (m *slowDataStorage) GetStore(ctx context.Context, storeID string) (*openfgapb.Store, error) {
return m.ds.GetStore(ctx, storeID)
}

func (m *slowDataStorage) ListStores(ctx context.Context, paginationOptions storage.PaginationOptions) ([]*openfgapb.Store, []byte, error) {
return m.ds.ListStores(ctx, paginationOptions)
}

func (m *slowDataStorage) IsReady(ctx context.Context) (bool, error) {
return m.ds.IsReady(ctx)
}

0 comments on commit 5c60d9d

Please sign in to comment.