-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
delta_buffer.go
67 lines (54 loc) · 1.42 KB
/
delta_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package writebuffer
import (
"math"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type DeltaBuffer struct {
BufferBase
buffer *storage.DeleteData
}
func NewDeltaBuffer() *DeltaBuffer {
return &DeltaBuffer{
BufferBase: BufferBase{
rowLimit: noLimit,
sizeLimit: paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64(),
TimestampFrom: math.MaxUint64,
TimestampTo: 0,
},
buffer: &storage.DeleteData{},
}
}
func (ib *DeltaBuffer) getTimestampRange(tss []typeutil.Timestamp) TimeRange {
tr := TimeRange{
timestampMin: math.MaxUint64,
timestampMax: 0,
}
for _, data := range tss {
if data < tr.timestampMin {
tr.timestampMin = data
}
if data > tr.timestampMax {
tr.timestampMax = data
}
}
return tr
}
func (db *DeltaBuffer) Yield() *storage.DeleteData {
if db.IsEmpty() {
return nil
}
return db.buffer
}
func (db *DeltaBuffer) Buffer(pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) (bufSize int64) {
beforeSize := db.buffer.Size()
rowCount := len(pks)
for i := 0; i < rowCount; i++ {
db.buffer.Append(pks[i], tss[i])
}
bufSize = db.buffer.Size() - beforeSize
db.UpdateStatistics(int64(rowCount), bufSize, db.getTimestampRange(tss), startPos, endPos)
return bufSize
}