-
Notifications
You must be signed in to change notification settings - Fork 171
/
read_changes.go
59 lines (51 loc) · 1.96 KB
/
read_changes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package commands
import (
"context"
"errors"
"time"
"github.com/openfga/openfga/pkg/encoder"
"github.com/openfga/openfga/pkg/logger"
serverErrors "github.com/openfga/openfga/pkg/server/errors"
"github.com/openfga/openfga/pkg/storage"
openfgapb "go.buf.build/openfga/go/openfga/api/openfga/v1"
)
type ReadChangesQuery struct {
backend storage.ChangelogBackend
logger logger.Logger
encoder encoder.Encoder
horizonOffset time.Duration
}
// NewReadChangesQuery creates a ReadChangesQuery with specified `ChangelogBackend` and `typeDefinitionReadBackend` to use for storage
func NewReadChangesQuery(backend storage.ChangelogBackend, logger logger.Logger, encoder encoder.Encoder, horizonOffset int) *ReadChangesQuery {
return &ReadChangesQuery{
backend: backend,
logger: logger,
encoder: encoder,
horizonOffset: time.Duration(horizonOffset) * time.Minute,
}
}
// Execute the ReadChangesQuery, returning paginated `openfga.TupleChange`(s) and a possibly non-empty continuation token.
func (q *ReadChangesQuery) Execute(ctx context.Context, req *openfgapb.ReadChangesRequest) (*openfgapb.ReadChangesResponse, error) {
decodedContToken, err := q.encoder.Decode(req.GetContinuationToken())
if err != nil {
return nil, serverErrors.InvalidContinuationToken
}
paginationOptions := storage.NewPaginationOptions(req.GetPageSize().GetValue(), string(decodedContToken))
changes, contToken, err := q.backend.ReadChanges(ctx, req.StoreId, req.Type, paginationOptions, q.horizonOffset)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return &openfgapb.ReadChangesResponse{
ContinuationToken: req.GetContinuationToken(),
}, nil
}
return nil, serverErrors.HandleError("", err)
}
encodedContToken, err := q.encoder.Encode(contToken)
if err != nil {
return nil, serverErrors.HandleError("", err)
}
return &openfgapb.ReadChangesResponse{
Changes: changes,
ContinuationToken: encodedContToken,
}, nil
}