Skip to content

Commit

Permalink
gh-1832 remove redundant encoding/decoding in map cursor and list
Browse files Browse the repository at this point in the history
This also highlighted that the name mapDecoder no longer makes sense, as
it acts more like a map merger now.
  • Loading branch information
etiennedi committed Feb 25, 2022
1 parent 38feb9e commit f1e460b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 130 deletions.
51 changes: 18 additions & 33 deletions adapters/repos/db/lsmkv/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,26 @@ func (b *Bucket) MapList(key []byte, cfgs ...MapListOption) ([]MapPair, error) {
cfg(&c)
}

segments := [][]MapPair{}
// before := time.Now()
segments, err := b.disk.getCollectionBySegments(key)
disk, err := b.disk.getCollectionBySegments(key)
if err != nil {
if err != nil && err != NotFound {
return nil, err
}
}

for i := range disk {
segmentDecoded := make([]MapPair, len(disk[i]))
for j, v := range disk[i] {
if err := segmentDecoded[j].FromBytes(v.value, false); err != nil {
return nil, err
}
segmentDecoded[j].Tombstone = v.tombstone
}
segments = append(segments, segmentDecoded)
}

// fmt.Printf("--map-list: get all disk segments took %s\n", time.Since(before))

// before = time.Now()
Expand All @@ -277,18 +290,7 @@ func (b *Bucket) MapList(key []byte, cfgs ...MapListOption) ([]MapPair, error) {
}
}

// TODO: encoding here makes no sense, it would be way better to stay
// decoded and change the mapMerger to only use decoded values
vEncoded := make([]value, len(v))
for i, pair := range v {
enc, err := pair.Bytes()
if err != nil {
return nil, err
}

vEncoded[i] = value{value: enc, tombstone: pair.Tombstone}
}
segments = append(segments, vEncoded)
segments = append(segments, v)
}

// before = time.Now()
Expand All @@ -298,30 +300,13 @@ func (b *Bucket) MapList(key []byte, cfgs ...MapListOption) ([]MapPair, error) {
return nil, err
}
}
// TODO: encoding here makes no sense, it would be way better to stay
// decoded and change the mapMerger to only use decoded values
vEncoded := make([]value, len(v))
for i, pair := range v {
enc, err := pair.Bytes()
if err != nil {
return nil, err
}

vEncoded[i] = value{value: enc, tombstone: pair.Tombstone}
}
segments = append(segments, vEncoded)
segments = append(segments, v)
// fmt.Printf("--map-list: get all active segments took %s\n", time.Since(before))

// before = time.Now()
for i := range segments {
sort.Slice(segments[i], func(a, b int) bool {
pairA := MapPair{}
pairB := MapPair{}

pairA.FromBytes(segments[i][a].value, true)
pairB.FromBytes(segments[i][a].value, true)

return bytes.Compare(pairA.Key, pairB.Key) == -1
return bytes.Compare(segments[i][a].Key, segments[i][b].Key) == -1
})
}
// fmt.Printf("--map-list: sort all segments took %s\n", time.Since(before))
Expand All @@ -331,7 +316,7 @@ func (b *Bucket) MapList(key []byte, cfgs ...MapListOption) ([]MapPair, error) {
// fmt.Printf("--map-list: run decoder took %s\n", time.Since(before))
// }()

return newSortedMapDecoder().do(segments)
return newSortedMapMerger().do(segments)
}

func (b *Bucket) MapSet(rowKey []byte, kv MapPair) error {
Expand Down
20 changes: 3 additions & 17 deletions adapters/repos/db/lsmkv/cursor_bucket_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ func (c *CursorMap) mergeDuplicatesInCurrentStateAndAdvance(ids []int) ([]byte,
// appending := time.Duration(0)
// advancing := time.Duration(0)

// TODO: remove unnecessary re-encoding
var perSegmentResults [][]value
var perSegmentResults [][]MapPair

for _, id := range ids {
candidates := c.state[id].value
Expand All @@ -225,23 +224,10 @@ func (c *CursorMap) mergeDuplicatesInCurrentStateAndAdvance(ids []int) ([]byte,
return bytes.Compare(candidates[a].Key, candidates[b].Key) < 0
})

encoded := make([]value, len(candidates))
for i, pv := range candidates {
enc, err := pv.Bytes()
if err != nil {
panic(errors.Wrap(err, "unexpected error encoding map values"))
}

encoded[i] = value{
value: enc,
tombstone: pv.Tombstone,
}
}

fmt.Printf("TODO: temp sorting and re-encoding because disk state is not currently sorted took %s\n",
time.Since(before))

perSegmentResults = append(perSegmentResults, encoded)
perSegmentResults = append(perSegmentResults, candidates)

// before = time.Now()
c.advanceInner(id)
Expand All @@ -251,7 +237,7 @@ func (c *CursorMap) mergeDuplicatesInCurrentStateAndAdvance(ids []int) ([]byte,
// fmt.Printf("--- extract values [advancing] took %s\n", advancing)

if !c.keyOnly {
merged, err := newSortedMapDecoder().do(perSegmentResults)
merged, err := newSortedMapMerger().do(perSegmentResults)
if err != nil {
panic(errors.Wrap(err, "unexpected error decoding map values"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,17 @@ import (
"github.com/pkg/errors"
)

type sortedMapDecoder struct {
type sortedMapMerger struct {
input [][]MapPair
output []MapPair
offsets []int
}

func newSortedMapDecoder() *sortedMapDecoder {
return &sortedMapDecoder{}
func newSortedMapMerger() *sortedMapMerger {
return &sortedMapMerger{}
}

func (s *sortedMapDecoder) do(segments [][]value) ([]MapPair, error) {
if len(segments) == 1 {
return s.parseSingleSegment(segments[0])
}

return s.mergeSegments(segments)
}

func (s *sortedMapDecoder) parseSingleSegment(seg []value) ([]MapPair, error) {
out := make([]MapPair, len(seg))

i := 0
for _, segVal := range seg {
if segVal.tombstone {
// in a single segment a tombstone doesn't have a meaning as there is no
// previous segment that it could remove something from. However, we
// still don't want to to serve the tombstoned key to the user as an
// actual value, so we still need to skip it
continue
}

if err := out[i].FromBytes(segVal.value, false); err != nil {
return nil, err
}
i++
}

return out[:i], nil
}

func (s *sortedMapDecoder) mergeSegments(segments [][]value) ([]MapPair, error) {
func (s *sortedMapMerger) do(segments [][]MapPair) ([]MapPair, error) {
if err := s.init(segments); err != nil {
return nil, errors.Wrap(err, "init sorted map decoder")
}
Expand All @@ -70,21 +40,8 @@ func (s *sortedMapDecoder) mergeSegments(segments [][]value) ([]MapPair, error)
return s.output[:i], nil
}

func (s *sortedMapDecoder) init(segments [][]value) error {
// first parse all the inputs, i.e. split them from pure byte slices into
// map-key byte slices and map-value byte slices, this will make it much
// simpler to determine which segment to work on and we will make sure that
// every map pair was parsed exactly once
s.input = make([][]MapPair, len(segments))
for segID, seg := range segments {
s.input[segID] = make([]MapPair, len(seg))
for valID, val := range seg {
if err := s.input[segID][valID].FromBytes(val.value, false); err != nil {
return err
}
s.input[segID][valID].Tombstone = val.tombstone
}
}
func (s *sortedMapMerger) init(segments [][]MapPair) error {
s.input = segments

// all offset pointers initialized at 0 which is where we want to start
s.offsets = make([]int, len(segments))
Expand All @@ -103,7 +60,7 @@ func (s *sortedMapDecoder) init(segments [][]value) error {
return nil
}

func (s *sortedMapDecoder) findSegmentWithLowestKey() (MapPair, bool) {
func (s *sortedMapMerger) findSegmentWithLowestKey() (MapPair, bool) {
bestSeg := -1
bestKey := []byte(nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (

func TestSortedDecoderRemoveTombstones(t *testing.T) {
t.Run("single entry, no tombstones", func(t *testing.T) {
m := newSortedMapDecoder()
input1 := mustEncode([]MapPair{
m := newSortedMapMerger()
input1 := []MapPair{
{
Key: []byte("hello"),
Value: []byte("world"),
},
})
}

input := [][]value{input1}
input := [][]MapPair{input1}

actual, err := m.do(input)
require.Nil(t, err)
Expand All @@ -32,8 +32,8 @@ func TestSortedDecoderRemoveTombstones(t *testing.T) {
})

t.Run("single entry, single tombstone for unrelated key", func(t *testing.T) {
m := newSortedMapDecoder()
input1 := mustEncode([]MapPair{
m := newSortedMapMerger()
input1 := []MapPair{
{
Key: []byte("hello"),
Value: []byte("world"),
Expand All @@ -42,9 +42,9 @@ func TestSortedDecoderRemoveTombstones(t *testing.T) {
Key: []byte("unrelated"),
Tombstone: true,
},
})
}

input := [][]value{input1}
input := [][]MapPair{input1}

actual, err := m.do(input)
require.Nil(t, err)
Expand All @@ -59,20 +59,20 @@ func TestSortedDecoderRemoveTombstones(t *testing.T) {
})

t.Run("single entry with tombstone over two segments", func(t *testing.T) {
m := newSortedMapDecoder()
input := [][]value{
mustEncode([]MapPair{
m := newSortedMapMerger()
input := [][]MapPair{
[]MapPair{
{
Key: []byte("hello"),
Value: []byte("world"),
},
}),
mustEncode([]MapPair{
},
[]MapPair{
{
Key: []byte("hello"),
Tombstone: true,
},
}),
},
}

actual, err := m.do(input)
Expand All @@ -83,9 +83,9 @@ func TestSortedDecoderRemoveTombstones(t *testing.T) {
})

t.Run("multiple segments including updates", func(t *testing.T) {
m := newSortedMapDecoder()
input := [][]value{
mustEncode([]MapPair{
m := newSortedMapMerger()
input := [][]MapPair{
[]MapPair{
{
Key: []byte("a"),
Value: []byte("a1"),
Expand All @@ -98,8 +98,8 @@ func TestSortedDecoderRemoveTombstones(t *testing.T) {
Key: []byte("e"),
Value: []byte("e1"),
},
}),
mustEncode([]MapPair{
},
[]MapPair{
{
Key: []byte("a"),
Value: []byte("a2"),
Expand All @@ -112,13 +112,13 @@ func TestSortedDecoderRemoveTombstones(t *testing.T) {
Key: []byte("c"),
Value: []byte("c2"),
},
}),
mustEncode([]MapPair{
},
[]MapPair{
{
Key: []byte("b"),
Value: []byte("b3"),
},
}),
},
}

actual, err := m.do(input)
Expand Down Expand Up @@ -146,9 +146,9 @@ func TestSortedDecoderRemoveTombstones(t *testing.T) {
})

t.Run("multiple segments including deletes and re-adds", func(t *testing.T) {
m := newSortedMapDecoder()
input := [][]value{
mustEncode([]MapPair{
m := newSortedMapMerger()
input := [][]MapPair{
[]MapPair{
{
Key: []byte("a"),
Value: []byte("a1"),
Expand All @@ -161,8 +161,8 @@ func TestSortedDecoderRemoveTombstones(t *testing.T) {
Key: []byte("e"),
Value: []byte("e1"),
},
}),
mustEncode([]MapPair{
},
[]MapPair{
{
Key: []byte("a"),
Value: []byte("a2"),
Expand All @@ -175,8 +175,8 @@ func TestSortedDecoderRemoveTombstones(t *testing.T) {
Key: []byte("c"),
Value: []byte("c2"),
},
}),
mustEncode([]MapPair{
},
[]MapPair{
{
Key: []byte("b"),
Value: []byte("b3"),
Expand All @@ -185,7 +185,7 @@ func TestSortedDecoderRemoveTombstones(t *testing.T) {
Key: []byte("e"),
Tombstone: true,
},
}),
},
}

actual, err := m.do(input)
Expand Down

0 comments on commit f1e460b

Please sign in to comment.