diff --git a/docs/changelog.md b/docs/changelog.md index 5f07f590..0da7f673 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -6,7 +6,17 @@ nav_order: 999 # Changelog -## v0.4.0 (unreleased) +## v0.4.1 (unreleased) + +### Improvements +* Server now reports transient errors for requests that could be potentially retried as `codes.Unavailable`. + +### Bugfixes +* Fixed the default timeout of `KV/IterateRange` operation. + + +--- +## v0.4.0 ### Breaking changes * Remove maintenance server port, the API is now available on standard API port. It could still though be secured by a separate API token. diff --git a/regattaserver/cluster.go b/regattaserver/cluster.go index 29a39620..a19d1d44 100644 --- a/regattaserver/cluster.go +++ b/regattaserver/cluster.go @@ -7,6 +7,7 @@ import ( "encoding/json" "github.com/jamf/regatta/regattapb" + serrors "github.com/jamf/regatta/storage/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/structpb" @@ -21,7 +22,10 @@ type ClusterServer struct { func (c *ClusterServer) MemberList(ctx context.Context, req *regattapb.MemberListRequest) (*regattapb.MemberListResponse, error) { res, err := c.Cluster.MemberList(ctx, req) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + if serrors.IsSafeToRetry(err) { + return nil, status.Error(codes.Unavailable, err.Error()) + } + return nil, status.Error(codes.FailedPrecondition, err.Error()) } return res, nil } @@ -41,7 +45,10 @@ func (c *ClusterServer) Status(ctx context.Context, req *regattapb.StatusRequest res.Config = &st } if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + if serrors.IsSafeToRetry(err) { + return nil, status.Error(codes.Unavailable, err.Error()) + } + return nil, status.Error(codes.FailedPrecondition, err.Error()) } return res, nil } diff --git a/regattaserver/kv.go b/regattaserver/kv.go index 2886f151..6bf6f4ce 100644 --- a/regattaserver/kv.go +++ b/regattaserver/kv.go @@ -50,7 +50,10 @@ func (s *KVServer) Range(ctx context.Context, req *regattapb.RangeRequest) (*reg if errors.Is(err, serrors.ErrTableNotFound) { return nil, status.Error(codes.NotFound, "table not found") } - return nil, status.Error(codes.Internal, err.Error()) + if serrors.IsSafeToRetry(err) { + return nil, status.Error(codes.Unavailable, err.Error()) + } + return nil, status.Error(codes.FailedPrecondition, err.Error()) } return val, nil } @@ -85,7 +88,10 @@ func (s *KVServer) IterateRange(req *regattapb.RangeRequest, srv regattapb.KV_It if errors.Is(err, serrors.ErrTableNotFound) { return status.Error(codes.NotFound, "table not found") } - return status.Error(codes.Internal, err.Error()) + if serrors.IsSafeToRetry(err) { + return status.Error(codes.Unavailable, err.Error()) + } + return status.Error(codes.FailedPrecondition, err.Error()) } pull, stop := iter.Pull(r) defer stop() @@ -120,7 +126,10 @@ func (s *KVServer) Put(ctx context.Context, req *regattapb.PutRequest) (*regatta if errors.Is(err, serrors.ErrTableNotFound) { return nil, status.Error(codes.NotFound, "table not found") } - return nil, status.Error(codes.Internal, err.Error()) + if serrors.IsSafeToRetry(err) { + return nil, status.Error(codes.Unavailable, err.Error()) + } + return nil, status.Error(codes.FailedPrecondition, err.Error()) } return r, nil } @@ -140,7 +149,10 @@ func (s *KVServer) DeleteRange(ctx context.Context, req *regattapb.DeleteRangeRe if errors.Is(err, serrors.ErrTableNotFound) { return nil, status.Error(codes.NotFound, "table not found") } - return nil, status.Error(codes.Internal, err.Error()) + if serrors.IsSafeToRetry(err) { + return nil, status.Error(codes.Unavailable, err.Error()) + } + return nil, status.Error(codes.FailedPrecondition, err.Error()) } return r, nil } @@ -159,7 +171,10 @@ func (s *KVServer) Txn(ctx context.Context, req *regattapb.TxnRequest) (*regatta if errors.Is(err, serrors.ErrTableNotFound) { return nil, status.Error(codes.NotFound, "table not found") } - return nil, status.Error(codes.Internal, err.Error()) + if serrors.IsSafeToRetry(err) { + return nil, status.Error(codes.Unavailable, err.Error()) + } + return nil, status.Error(codes.FailedPrecondition, err.Error()) } return r, nil } diff --git a/regattaserver/kv_test.go b/regattaserver/kv_test.go index c04abd04..31ee218a 100644 --- a/regattaserver/kv_test.go +++ b/regattaserver/kv_test.go @@ -10,6 +10,7 @@ import ( "github.com/jamf/regatta/regattapb" "github.com/jamf/regatta/storage/errors" "github.com/jamf/regatta/util/iter" + "github.com/lni/dragonboat/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -100,7 +101,17 @@ func TestKVServer_RangeError(t *testing.T) { Table: table1Name, Key: []byte("foo"), }) - r.EqualError(err, status.Errorf(codes.Internal, "unknown").Error()) + r.EqualError(err, status.Errorf(codes.FailedPrecondition, "unknown").Error()) + + t.Log("Get retry-safe error") + storage = &mockKVService{} + storage.On("Range", mock.Anything, mock.Anything).Return((*regattapb.RangeResponse)(nil), dragonboat.ErrSystemBusy) + kv.Storage = storage + _, err = kv.Range(context.Background(), ®attapb.RangeRequest{ + Table: table1Name, + Key: []byte("foo"), + }) + r.EqualError(err, status.Errorf(codes.Unavailable, dragonboat.ErrSystemBusy.Error()).Error()) } func TestKVServer_RangeUnimplemented(t *testing.T) { @@ -200,7 +211,7 @@ func TestKVServer_IterateRangeError(t *testing.T) { Table: table1Name, Key: []byte("foo"), }, srv) - r.EqualError(err, status.Errorf(codes.Internal, "unknown").Error()) + r.EqualError(err, status.Errorf(codes.FailedPrecondition, "unknown").Error()) t.Log("Get unknown send error") storage = &mockKVService{} @@ -213,6 +224,16 @@ func TestKVServer_IterateRangeError(t *testing.T) { Key: []byte("foo"), }, srv) r.EqualError(err, status.Errorf(codes.Internal, "uknown send error").Error()) + + t.Log("Get retry-safe error") + storage = &mockKVService{} + storage.On("IterateRange", mock.Anything, mock.Anything).Return((iter.Seq[*regattapb.RangeResponse])(nil), dragonboat.ErrSystemBusy) + kv.Storage = storage + err = kv.IterateRange(®attapb.RangeRequest{ + Table: table1Name, + Key: []byte("foo"), + }, srv) + r.EqualError(err, status.Errorf(codes.Unavailable, dragonboat.ErrSystemBusy.Error()).Error()) } func TestKVServer_IterateRangeUnimplemented(t *testing.T) { @@ -337,7 +358,17 @@ func TestKVServer_PutError(t *testing.T) { Table: table1Name, Key: []byte("foo"), }) - r.EqualError(err, status.Errorf(codes.Internal, "unknown").Error()) + r.EqualError(err, status.Errorf(codes.FailedPrecondition, "unknown").Error()) + + t.Log("Put retry-safe error") + storage = &mockKVService{} + storage.On("Put", mock.Anything, mock.Anything).Return((*regattapb.PutResponse)(nil), dragonboat.ErrSystemBusy) + kv.Storage = storage + _, err = kv.Put(context.Background(), ®attapb.PutRequest{ + Table: table1Name, + Key: []byte("foo"), + }) + r.EqualError(err, status.Errorf(codes.Unavailable, dragonboat.ErrSystemBusy.Error()).Error()) } func TestKVServer_Put(t *testing.T) { @@ -398,7 +429,17 @@ func TestKVServer_DeleteRangeError(t *testing.T) { Table: table1Name, Key: []byte("foo"), }) - r.EqualError(err, status.Errorf(codes.Internal, "unknown").Error()) + r.EqualError(err, status.Errorf(codes.FailedPrecondition, "unknown").Error()) + + t.Log("Delete retry-safe error") + storage = &mockKVService{} + storage.On("Delete", mock.Anything, mock.Anything).Return((*regattapb.DeleteRangeResponse)(nil), dragonboat.ErrSystemBusy) + kv.Storage = storage + _, err = kv.DeleteRange(context.Background(), ®attapb.DeleteRangeRequest{ + Table: table1Name, + Key: []byte("foo"), + }) + r.EqualError(err, status.Errorf(codes.Unavailable, dragonboat.ErrSystemBusy.Error()).Error()) } func TestKVServer_DeleteRange(t *testing.T) { @@ -448,7 +489,16 @@ func TestKVServer_TxnError(t *testing.T) { _, err = kv.Txn(context.Background(), ®attapb.TxnRequest{ Table: table1Name, }) - r.EqualError(err, status.Errorf(codes.Internal, "unknown").Error()) + r.EqualError(err, status.Errorf(codes.FailedPrecondition, "unknown").Error()) + + t.Log("Txn retry-safe error") + storage = &mockKVService{} + storage.On("Txn", mock.Anything, mock.Anything).Return((*regattapb.TxnResponse)(nil), dragonboat.ErrSystemBusy) + kv.Storage = storage + _, err = kv.Txn(context.Background(), ®attapb.TxnRequest{ + Table: table1Name, + }) + r.EqualError(err, status.Errorf(codes.Unavailable, dragonboat.ErrSystemBusy.Error()).Error()) } func TestKVServer_Txn(t *testing.T) { diff --git a/storage/engine.go b/storage/engine.go index 27ddec71..3a0279de 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -111,7 +111,7 @@ func (e *Engine) IterateRange(ctx context.Context, req *regattapb.RangeRequest) if err != nil { return nil, err } - it, err := t.Iterator(ctx, req) + it, err := withDefaultTimeout(ctx, req, t.Iterator) if err != nil { return nil, err } diff --git a/storage/errors/errors.go b/storage/errors/errors.go index 6720f22d..23131a5f 100644 --- a/storage/errors/errors.go +++ b/storage/errors/errors.go @@ -4,11 +4,17 @@ package errors import ( "errors" + + "github.com/lni/dragonboat/v4" ) +// IsSafeToRetry returns true for transient errors +// for operations that client could attempt to retry using the same arguments. +func IsSafeToRetry(err error) bool { + return dragonboat.IsTempError(err) +} + var ( - // ErrKeyNotFound returned when the key is not found. - ErrKeyNotFound = errors.New("key not found") // ErrTableNotFound returned when the table is not found. ErrTableNotFound = errors.New("table not found") // ErrEmptyKey returned when the key is not provided.