Skip to content

Commit

Permalink
Update Pebble to latest version (#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Nov 13, 2023
1 parent 087405b commit 6f2dd0f
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 44 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ go 1.21
require (
github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e
github.com/cenkalti/backoff/v4 v4.2.0
github.com/cockroachdb/pebble v0.0.0-20230721221451-fcaeb47a50e0
github.com/cockroachdb/pebble v0.0.0-20231110205751-b224e8b90a87
github.com/dgraph-io/ristretto v0.1.1
github.com/dustin/go-humanize v1.0.1
github.com/edsrzf/mmap-go v1.1.0
Expand All @@ -41,7 +41,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.20.0
go.uber.org/automaxprocs v1.5.2
go.uber.org/multierr v1.11.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/net v0.17.0
golang.org/x/sync v0.3.0
golang.org/x/sys v0.14.0
Expand All @@ -59,10 +59,10 @@ require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.10.0 // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,20 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH
github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4=
github.com/cockroachdb/errors v1.10.0 h1:lfxS8zZz1+OjtV4MtNWgboi/W5tyLEB6VQZBXN+0VUU=
github.com/cockroachdb/errors v1.10.0/go.mod h1:lknhIsEVQ9Ss/qKDBQS/UqFSvPQjOwNq2qyKAxtHRqE=
github.com/cockroachdb/errors v1.11.1 h1:xSEW75zKaKCWzR3OfxXUxgrk/NtT4G1MiOv5lWZazG8=
github.com/cockroachdb/errors v1.11.1/go.mod h1:8MUxA3Gi6b25tYlFEBGLf+D8aISL+M4MIpiWMSNRfxw=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/pebble v0.0.0-20230721221451-fcaeb47a50e0 h1:6OwRzk7AKRNumJttanSCJPVMkmXvfioVgijKfJHAceU=
github.com/cockroachdb/pebble v0.0.0-20230721221451-fcaeb47a50e0/go.mod h1:FN5O47SBEz5+kO9fG8UTR64g2WS1u5ZFCgTvxGjoSks=
github.com/cockroachdb/pebble v0.0.0-20231110205751-b224e8b90a87 h1:pkf52dVBGjsBZ2jpBAFQHyRwLFOtZDgeePxMKQX/yX4=
github.com/cockroachdb/pebble v0.0.0-20231110205751-b224e8b90a87/go.mod h1:acMRUGd/BK8AUmQNK3spUCCGzFLZU2bSST3NMXSq2Kc=
github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30=
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6 h1:DJK8W/iB+s/qkTtmXSrHA49lp5O3OsR7E6z4byOLy34=
github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -374,6 +380,8 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
20 changes: 14 additions & 6 deletions server/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type DB interface {

ProcessWrite(b *proto.WriteRequest, commitOffset int64, timestamp uint64, updateOperationCallback UpdateOperationCallback) (*proto.WriteResponse, error)
Get(request *proto.GetRequest) (*proto.GetResponse, error)
List(request *proto.ListRequest) KeyIterator
List(request *proto.ListRequest) (KeyIterator, error)
ReadCommitOffset() (int64, error)

ReadNextNotifications(ctx context.Context, startOffset int64) ([]*proto.NotificationBatch, error)
Expand Down Expand Up @@ -240,11 +240,15 @@ func (it *listIterator) Close() error {
return it.KeyIterator.Close()
}

func (d *db) List(request *proto.ListRequest) KeyIterator {
func (d *db) List(request *proto.ListRequest) (KeyIterator, error) {
d.listCounter.Add(1)
return &listIterator{
KeyIterator: d.kv.KeyRangeScan(request.StartInclusive, request.EndExclusive),
timer: d.listLatencyHisto.Timer(),
if it, err := d.kv.KeyRangeScan(request.StartInclusive, request.EndExclusive); err != nil {
return nil, err
} else {
return &listIterator{
KeyIterator: it,
timer: d.listLatencyHisto.Timer(),
}, nil
}
}

Expand Down Expand Up @@ -418,7 +422,11 @@ func (d *db) applyDelete(batch WriteBatch, notifications *notifications, delReq

func (d *db) applyDeleteRange(batch WriteBatch, notifications *notifications, delReq *proto.DeleteRangeRequest, updateOperationCallback UpdateOperationCallback) (*proto.DeleteRangeResponse, error) {
if notifications != nil || updateOperationCallback != NoOpCallback {
it := batch.KeyRangeScan(delReq.StartInclusive, delReq.EndExclusive)
it, err := batch.KeyRangeScan(delReq.StartInclusive, delReq.EndExclusive)
if err != nil {
return nil, err
}

for it.Next() {
if notifications != nil {
notifications.Deleted(it.Key())
Expand Down
7 changes: 5 additions & 2 deletions server/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ func TestDBList(t *testing.T) {
assert.NoError(t, factory.Close())
}

func keyIteratorToSlice(it KeyIterator) []string {
func keyIteratorToSlice(it KeyIterator, err error) []string {
assert.NoError(nil, err)
var keys []string
for ; it.Valid(); it.Next() {
keys = append(keys, it.Key())
Expand Down Expand Up @@ -388,10 +389,12 @@ func TestDBDeleteRange(t *testing.T) {
assert.NoError(t, err)

keys := make([]string, 0)
listIt := db.List(&proto.ListRequest{
listIt, err := db.List(&proto.ListRequest{
StartInclusive: "a",
EndExclusive: "z",
})

assert.NoError(t, err)
for ; listIt.Valid(); listIt.Next() {
keys = append(keys, listIt.Key())
}
Expand Down
8 changes: 4 additions & 4 deletions server/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type WriteBatch interface {
Get(key string) ([]byte, io.Closer, error)

DeleteRange(lowerBound, upperBound string) error
KeyRangeScan(lowerBound, upperBound string) KeyIterator
KeyRangeScan(lowerBound, upperBound string) (KeyIterator, error)

// Count is the number of transactions that are currently in the batch
Count() int
Expand Down Expand Up @@ -99,10 +99,10 @@ type KV interface {

Get(key string) ([]byte, io.Closer, error)

KeyRangeScan(lowerBound, upperBound string) KeyIterator
KeyRangeScanReverse(lowerBound, upperBound string) ReverseKeyIterator
KeyRangeScan(lowerBound, upperBound string) (KeyIterator, error)
KeyRangeScanReverse(lowerBound, upperBound string) (ReverseKeyIterator, error)

RangeScan(lowerBound, upperBound string) KeyValueIterator
RangeScan(lowerBound, upperBound string) (KeyValueIterator, error)

Snapshot() (Snapshot, error)

Expand Down
40 changes: 28 additions & 12 deletions server/kv/kv_pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,31 +366,40 @@ func (p *Pebble) Get(key string) ([]byte, io.Closer, error) {
return value, closer, err
}

func (p *Pebble) KeyRangeScan(lowerBound, upperBound string) KeyIterator {
pbit := p.db.NewIter(&pebble.IterOptions{
func (p *Pebble) KeyRangeScan(lowerBound, upperBound string) (KeyIterator, error) {
pbit, err := p.db.NewIter(&pebble.IterOptions{
LowerBound: []byte(lowerBound),
UpperBound: []byte(upperBound),
})
if err != nil {
return nil, err
}
pbit.SeekGE([]byte(lowerBound))
return &PebbleIterator{p, pbit}
return &PebbleIterator{p, pbit}, nil
}

func (p *Pebble) KeyRangeScanReverse(lowerBound, upperBound string) ReverseKeyIterator {
pbit := p.db.NewIter(&pebble.IterOptions{
func (p *Pebble) KeyRangeScanReverse(lowerBound, upperBound string) (ReverseKeyIterator, error) {
pbit, err := p.db.NewIter(&pebble.IterOptions{
LowerBound: []byte(lowerBound),
UpperBound: []byte(upperBound),
})
if err != nil {
return nil, err
}
pbit.Last()
return &PebbleReverseIterator{p, pbit}
return &PebbleReverseIterator{p, pbit}, nil
}

func (p *Pebble) RangeScan(lowerBound, upperBound string) KeyValueIterator {
pbit := p.db.NewIter(&pebble.IterOptions{
func (p *Pebble) RangeScan(lowerBound, upperBound string) (KeyValueIterator, error) {
pbit, err := p.db.NewIter(&pebble.IterOptions{
LowerBound: []byte(lowerBound),
UpperBound: []byte(upperBound),
})
if err != nil {
return nil, err
}
pbit.SeekGE([]byte(lowerBound))
return &PebbleIterator{p, pbit}
return &PebbleIterator{p, pbit}, nil
}

func (p *Pebble) Snapshot() (Snapshot, error) {
Expand All @@ -416,13 +425,16 @@ func (b *PebbleBatch) DeleteRange(lowerBound, upperBound string) error {
return b.b.DeleteRange([]byte(lowerBound), []byte(upperBound), pebble.NoSync)
}

func (b *PebbleBatch) KeyRangeScan(lowerBound, upperBound string) KeyIterator {
pbit := b.b.NewIter(&pebble.IterOptions{
func (b *PebbleBatch) KeyRangeScan(lowerBound, upperBound string) (KeyIterator, error) {
pbit, err := b.b.NewIter(&pebble.IterOptions{
LowerBound: []byte(lowerBound),
UpperBound: []byte(upperBound),
})
if err != nil {
return nil, err
}
pbit.SeekGE([]byte(lowerBound))
return &PebbleIterator{b.p, pbit}
return &PebbleIterator{b.p, pbit}, nil
}

func (b *PebbleBatch) Close() error {
Expand Down Expand Up @@ -543,6 +555,10 @@ func (pl *PebbleLogger) Infof(format string, args ...interface{}) {
pl.zl.Info().Msgf(format, args...)
}

func (pl *PebbleLogger) Errorf(format string, args ...interface{}) {
pl.zl.Error().Msgf(format, args...)
}

func (pl *PebbleLogger) Fatalf(format string, args ...interface{}) {
pl.zl.Fatal().Msgf(format, args...)
}
Expand Down
30 changes: 20 additions & 10 deletions server/kv/kv_pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func TestPebbbleKeyRangeScan(t *testing.T) {
assert.NoError(t, wb.Commit())
assert.NoError(t, wb.Close())

it := kv.KeyRangeScan("/root/a", "/root/c")
it, err := kv.KeyRangeScan("/root/a", "/root/c")
assert.NoError(t, err)

assert.True(t, it.Valid())
assert.Equal(t, "/root/a", it.Key())
Expand All @@ -124,7 +125,8 @@ func TestPebbbleKeyRangeScan(t *testing.T) {
assert.NoError(t, it.Close())

// Scan with empty result
it = kv.KeyRangeScan("/xyz/a", "/xyz/c")
it, err = kv.KeyRangeScan("/xyz/a", "/xyz/c")
assert.NoError(t, err)
assert.False(t, it.Valid())
assert.NoError(t, it.Close())

Expand All @@ -146,7 +148,8 @@ func TestPebbbleKeyRangeScanReverse(t *testing.T) {
assert.NoError(t, wb.Commit())
assert.NoError(t, wb.Close())

it := kv.KeyRangeScanReverse("/root/a", "/root/c")
it, err := kv.KeyRangeScanReverse("/root/a", "/root/c")
assert.NoError(t, err)

assert.True(t, it.Valid())
assert.Equal(t, "/root/b", it.Key())
Expand All @@ -161,7 +164,8 @@ func TestPebbbleKeyRangeScanReverse(t *testing.T) {
assert.NoError(t, it.Close())

// Scan with empty result
it = kv.KeyRangeScanReverse("/xyz/a", "/xyz/c")
it, err = kv.KeyRangeScanReverse("/xyz/a", "/xyz/c")
assert.NoError(t, err)
assert.False(t, it.Valid())
assert.NoError(t, it.Close())

Expand All @@ -183,7 +187,8 @@ func TestPebbleRangeScan(t *testing.T) {
assert.NoError(t, wb.Commit())
assert.NoError(t, wb.Close())

it := kv.RangeScan("/root/a", "/root/c")
it, err := kv.RangeScan("/root/a", "/root/c")
assert.NoError(t, err)

assert.True(t, it.Valid())
assert.Equal(t, "/root/a", it.Key())
Expand All @@ -204,7 +209,8 @@ func TestPebbleRangeScan(t *testing.T) {
assert.NoError(t, it.Close())

// Scan with empty result
it = kv.RangeScan("/xyz/a", "/xyz/c")
it, err = kv.RangeScan("/xyz/a", "/xyz/c")
assert.NoError(t, err)
assert.False(t, it.Valid())
assert.NoError(t, it.Close())

Expand Down Expand Up @@ -275,7 +281,8 @@ func TestPebbleRangeScanWithSlashOrder(t *testing.T) {
assert.NoError(t, wb.Commit())
assert.NoError(t, wb.Close())

it := kv.KeyRangeScan("/a/b/a/", "/a/b/a//")
it, err := kv.KeyRangeScan("/a/b/a/", "/a/b/a//")
assert.NoError(t, err)

assert.True(t, it.Valid())
assert.Equal(t, "/a/b/a/a", it.Key())
Expand Down Expand Up @@ -399,7 +406,8 @@ func TestPebbbleRangeScanInBatch(t *testing.T) {
assert.NoError(t, wb.Put("/root/b", []byte("b")))
assert.NoError(t, wb.Put("/root/c", []byte("c")))

it := wb.KeyRangeScan("/root/a", "/root/c")
it, err := wb.KeyRangeScan("/root/a", "/root/c")
assert.NoError(t, err)
assert.True(t, it.Valid())
assert.Equal(t, "/root/a", it.Key())
assert.True(t, it.Next())
Expand All @@ -416,7 +424,8 @@ func TestPebbbleRangeScanInBatch(t *testing.T) {

assert.NoError(t, wb.Delete("/root/a"))

it = wb.KeyRangeScan("/root/a", "/root/c")
it, err = wb.KeyRangeScan("/root/a", "/root/c")
assert.NoError(t, err)
assert.True(t, it.Valid())
assert.Equal(t, "/root/b", it.Key())
assert.False(t, it.Next())
Expand All @@ -429,7 +438,8 @@ func TestPebbbleRangeScanInBatch(t *testing.T) {
assert.NoError(t, wb.Close())

// Scan with empty result
it = kv.KeyRangeScan("/xyz/a", "/xyz/c")
it, err = kv.KeyRangeScan("/xyz/a", "/xyz/c")
assert.NoError(t, err)
assert.False(t, it.Valid())
assert.NoError(t, it.Close())

Expand Down
5 changes: 4 additions & 1 deletion server/kv/notifications_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ func (nt *notificationsTracker) ReadNextNotifications(ctx context.Context, start
return nil, err
}

it := nt.kv.RangeScan(notificationKey(startOffset), lastNotificationKey)
it, err := nt.kv.RangeScan(notificationKey(startOffset), lastNotificationKey)
if err != nil {
return nil, err
}
defer it.Close()

var res []*proto.NotificationBatch
Expand Down
10 changes: 8 additions & 2 deletions server/kv/notifications_trimmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ func (t *notificationsTrimmer) trimNotifications() error {
}

func (t *notificationsTrimmer) getFirstLast() (first, last int64, err error) {
it1 := t.kv.KeyRangeScan(firstNotificationKey, lastNotificationKey)
it1, err := t.kv.KeyRangeScan(firstNotificationKey, lastNotificationKey)
if err != nil {
return -1, -1, err
}
defer it1.Close()
if !it1.Valid() {
// There are no entries in DB
Expand All @@ -165,7 +168,10 @@ func (t *notificationsTrimmer) getFirstLast() (first, last int64, err error) {
return first, last, err
}

it2 := t.kv.KeyRangeScanReverse(firstNotificationKey, lastNotificationKey)
it2, err := t.kv.KeyRangeScanReverse(firstNotificationKey, lastNotificationKey)
if err != nil {
return -1, -1, err
}
defer it2.Close()
if !it2.Valid() {
// There are no entries in DB
Expand Down
9 changes: 8 additions & 1 deletion server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,14 @@ func (lc *leaderController) list(ctx context.Context, request *proto.ListRequest
lc.log.Debug().
Msg("Received list request")

it := lc.db.List(request)
it, err := lc.db.List(request)
if err != nil {
lc.log.Warn().Err(err).
Msg("Failed to process list request")
close(ch)
return
}

defer func() {
_ = it.Close()
}()
Expand Down
4 changes: 2 additions & 2 deletions server/session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func (m mockWriteBatch) DeleteRange(_, _ string) error {
return nil
}

func (m mockWriteBatch) KeyRangeScan(_, _ string) kv.KeyIterator {
return nil
func (m mockWriteBatch) KeyRangeScan(_, _ string) (kv.KeyIterator, error) {
return nil, nil
}

func (m mockWriteBatch) Commit() error {
Expand Down

0 comments on commit 6f2dd0f

Please sign in to comment.