/
segment_replace_strategy.go
133 lines (113 loc) · 4.23 KB
/
segment_replace_strategy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: hello@weaviate.io
//
package lsmkv
import (
"encoding/binary"
"errors"
"fmt"
"time"
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv/segmentindex"
"github.com/weaviate/weaviate/entities/lsmkv"
)
func (s *segment) get(key []byte) ([]byte, error) {
if s.strategy != segmentindex.StrategyReplace {
return nil, fmt.Errorf("get only possible for strategy %q", StrategyReplace)
}
before := time.Now()
if s.useBloomFilter && !s.bloomFilter.Test(key) {
s.bloomFilterMetrics.trueNegative(before)
return nil, lsmkv.NotFound
}
node, err := s.index.Get(key)
if err != nil {
if errors.Is(err, lsmkv.NotFound) {
if s.useBloomFilter {
s.bloomFilterMetrics.falsePositive(before)
}
return nil, lsmkv.NotFound
} else {
return nil, err
}
}
defer func() {
if s.useBloomFilter {
s.bloomFilterMetrics.truePositive(before)
}
}()
// We need to copy the data we read from the segment exactly once in this
// place. This means that future processing can share this memory as much as
// it wants to, as it can now be considered immutable. If we didn't copy in
// this place it would only be safe to hold this data while still under the
// protection of the segmentGroup.maintenanceLock. This lock makes sure that
// no compaction is started during an ongoing read. However, once read,
// further processing is no longer protected by lock.
// If a compaction completes and the old segment is removed, we would be accessing
// invalid memory without the copy, thus leading to a SEGFAULT.
// Similar approach was used to fix SEGFAULT in collection strategy
// https://github.com/weaviate/weaviate/issues/1837
contentsCopy := make([]byte, node.End-node.Start)
if err = s.copyNode(contentsCopy, nodeOffset{node.Start, node.End}); err != nil {
return nil, err
}
return s.replaceStratParseData(contentsCopy)
}
func (s *segment) getBySecondaryIntoMemory(pos int, key []byte, buffer []byte) ([]byte, error, []byte) {
if s.strategy != segmentindex.StrategyReplace {
return nil, fmt.Errorf("get only possible for strategy %q", StrategyReplace), nil
}
if pos > len(s.secondaryIndices) || s.secondaryIndices[pos] == nil {
return nil, fmt.Errorf("no secondary index at pos %d", pos), nil
}
if s.useBloomFilter && !s.secondaryBloomFilters[pos].Test(key) {
return nil, lsmkv.NotFound, nil
}
node, err := s.secondaryIndices[pos].Get(key)
if err != nil {
return nil, err, nil
}
// We need to copy the data we read from the segment exactly once in this
// place. This means that future processing can share this memory as much as
// it wants to, as it can now be considered immutable. If we didn't copy in
// this place it would only be safe to hold this data while still under the
// protection of the segmentGroup.maintenanceLock. This lock makes sure that
// no compaction is started during an ongoing read. However, once read,
// further processing is no longer protected by lock.
// If a compaction completes and the old segment is removed, we would be accessing
// invalid memory without the copy, thus leading to a SEGFAULT.
// Similar approach was used to fix SEGFAULT in collection strategy
// https://github.com/weaviate/weaviate/issues/1837
var contentsCopy []byte
if uint64(cap(buffer)) >= node.End-node.Start {
contentsCopy = buffer[:node.End-node.Start]
} else {
contentsCopy = make([]byte, node.End-node.Start)
}
if err = s.copyNode(contentsCopy, nodeOffset{node.Start, node.End}); err != nil {
return nil, err, nil
}
currContent, err := s.replaceStratParseData(contentsCopy)
return currContent, err, contentsCopy
}
func (s *segment) replaceStratParseData(in []byte) ([]byte, error) {
if len(in) == 0 {
return nil, lsmkv.NotFound
}
// byte meaning
// 0 is tombstone
// 1-8 data length as Little Endian uint64
// 9-length data
// check the tombstone byte
if in[0] == 0x01 {
return nil, lsmkv.Deleted
}
valueLength := binary.LittleEndian.Uint64(in[1:9])
return in[9 : 9+valueLength], nil
}