Skip to content

Commit

Permalink
Merge pull request #301 from jamf/bug/iterate-single-key
Browse files Browse the repository at this point in the history
bug: fix iterate range with specific key
  • Loading branch information
coufalja committed Jun 11, 2024
2 parents daec9e5 + 6a9375b commit 73385da
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 3 deletions.
9 changes: 7 additions & 2 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ nav_order: 999
---

# Changelog
## v0.5.1 (unreleased)
## v0.5.2 (unreleased)

### Bugfixes
* Fix `regatta.v1.KV/IterateRange` without `range_end` returning 0 results.
---
## v0.5.1

### Improvements
* API server will now by default use fixed number of pre-created worker go routines for stream handling. The behaviour could be overridden by setting `api.stream-workers` config value.
Expand All @@ -17,7 +22,7 @@ nav_order: 999

### Bugfixes
* Fix storage compaction metrics.

---
## v0.5.0

### Highlights
Expand Down
162 changes: 162 additions & 0 deletions storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/jamf/regatta/storage/kv"
"github.com/jamf/regatta/storage/logreader"
"github.com/jamf/regatta/storage/table"
"github.com/jamf/regatta/util/iter"
"github.com/jamf/regatta/version"
lvfs "github.com/lni/vfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -239,6 +240,167 @@ func TestEngine_Range(t *testing.T) {
}
}

func TestEngine_IterateRange(t *testing.T) {
type args struct {
ctx context.Context
req *regattapb.RangeRequest
}
tests := []struct {
name string
args args
prepare func(t *testing.T, e *Engine)
want *regattapb.RangeResponse
wantErr require.ErrorAssertionFunc
}{
{
name: "table not found",
prepare: func(t *testing.T, e *Engine) {},
args: args{
ctx: context.Background(),
req: &regattapb.RangeRequest{
Table: []byte(testTableName),
Key: []byte("key"),
},
},
wantErr: require.Error,
},
{
name: "key not found serializable request",
prepare: func(t *testing.T, e *Engine) {
createTable(t, e)
},
args: args{
ctx: context.Background(),
req: &regattapb.RangeRequest{
Table: []byte(testTableName),
Key: []byte("key"),
},
},
want: &regattapb.RangeResponse{
Header: &regattapb.ResponseHeader{
ReplicaId: 1,
ShardId: 10001,
},
},
wantErr: require.NoError,
},
{
name: "key not found linearizable request",
prepare: func(t *testing.T, e *Engine) {
createTable(t, e)
},
args: args{
ctx: context.Background(),
req: &regattapb.RangeRequest{
Table: []byte(testTableName),
Key: []byte("key"),
Linearizable: true,
},
},
want: &regattapb.RangeResponse{
Header: &regattapb.ResponseHeader{
ReplicaId: 1,
ShardId: 10001,
},
},
wantErr: require.NoError,
},
{
name: "key found serializable request",
prepare: func(t *testing.T, e *Engine) {
createTable(t, e)
_, err := e.Put(context.Background(), &regattapb.PutRequest{Table: []byte(testTableName), Key: []byte("key"), Value: []byte("value")})
require.NoError(t, err)
},
args: args{
ctx: context.Background(),
req: &regattapb.RangeRequest{
Table: []byte(testTableName),
Key: []byte("key"),
},
},
want: &regattapb.RangeResponse{
Header: &regattapb.ResponseHeader{
ReplicaId: 1,
ShardId: 10001,
},
Kvs: []*regattapb.KeyValue{
{Key: []byte("key"), Value: []byte("value")},
},
Count: 1,
},
wantErr: require.NoError,
},
{
name: "key found range request",
prepare: func(t *testing.T, e *Engine) {
createTable(t, e)
_, err := e.Put(context.Background(), &regattapb.PutRequest{Table: []byte(testTableName), Key: []byte("key"), Value: []byte("value")})
require.NoError(t, err)
},
args: args{
ctx: context.Background(),
req: &regattapb.RangeRequest{
Table: []byte(testTableName),
Key: []byte("key"),
RangeEnd: []byte("key_"),
},
},
want: &regattapb.RangeResponse{
Header: &regattapb.ResponseHeader{
ReplicaId: 1,
ShardId: 10001,
},
Kvs: []*regattapb.KeyValue{
{Key: []byte("key"), Value: []byte("value")},
},
Count: 1,
},
wantErr: require.NoError,
},
{
name: "key found linearizable request",
prepare: func(t *testing.T, e *Engine) {
createTable(t, e)
_, err := e.Put(context.Background(), &regattapb.PutRequest{Table: []byte(testTableName), Key: []byte("key"), Value: []byte("value")})
require.NoError(t, err)
},
args: args{
ctx: context.Background(),
req: &regattapb.RangeRequest{
Table: []byte(testTableName),
Key: []byte("key"),
},
},
want: &regattapb.RangeResponse{
Header: &regattapb.ResponseHeader{
ReplicaId: 1,
ShardId: 10001,
},
Kvs: []*regattapb.KeyValue{
{Key: []byte("key"), Value: []byte("value")},
},
Count: 1,
},
wantErr: require.NoError,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ctx := cancellableTestContext(t)
e := newTestEngine(t, newTestConfig())
require.NoError(t, e.Start())
require.NoError(t, e.WaitUntilReady(ctx))
tt.prepare(t, e)
got, err := e.IterateRange(tt.args.ctx, tt.args.req)
tt.wantErr(t, err)
require.Equal(t, tt.want, iter.First(got))
})
}
}

func TestEngine_Put(t *testing.T) {
type args struct {
ctx context.Context
Expand Down
2 changes: 1 addition & 1 deletion storage/table/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (p *FSM) Lookup(l interface{}) (interface{}, error) {
return lookup(db, req)
case IteratorRequest:
db := p.pebble.Load()
return iterate(db, req.RangeOp)
return iteratorLookup(db, req.RangeOp)
case SnapshotRequest:
snapshot := p.pebble.Load().NewSnapshot()
defer snapshot.Close()
Expand Down
11 changes: 11 additions & 0 deletions storage/table/fsm/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ func singleLookup(reader pebble.Reader, req *regattapb.RequestOp_Range) (*regatt
}, nil
}

func iteratorLookup(reader pebble.Reader, req *regattapb.RequestOp_Range) (iter.Seq[*regattapb.ResponseOp_Range], error) {
if req.RangeEnd != nil {
return iterate(reader, req)
}
single, err := singleLookup(reader, req)
if err != nil {
return nil, err
}
return iter.From(single), nil
}

// IteratorRequest returns open pebble.Iterator it is an API consumer responsibility to close it.
type IteratorRequest struct {
RangeOp *regattapb.RequestOp_Range
Expand Down
3 changes: 3 additions & 0 deletions util/iter/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func From[T any](in ...T) Seq[T] {

func First[T any](seq Seq[T]) T {
var res T
if seq == nil {
return *new(T)
}
seq(func(t T) bool {
res = t
return false
Expand Down

0 comments on commit 73385da

Please sign in to comment.