/
sync_policy.go
134 lines (112 loc) · 4.12 KB
/
sync_policy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package writebuffer
import (
"container/heap"
"math/rand"
"time"
"github.com/samber/lo"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type SyncPolicy interface {
SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64
Reason() string
}
type SelectSegmentFunc func(buffer []*segmentBuffer, ts typeutil.Timestamp) []int64
type SelectSegmentFnPolicy struct {
fn SelectSegmentFunc
reason string
}
func (f SelectSegmentFnPolicy) SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
return f.fn(buffers, ts)
}
func (f SelectSegmentFnPolicy) Reason() string { return f.reason }
func wrapSelectSegmentFuncPolicy(fn SelectSegmentFunc, reason string) SelectSegmentFnPolicy {
return SelectSegmentFnPolicy{
fn: fn,
reason: reason,
}
}
func GetFullBufferPolicy() SyncPolicy {
return wrapSelectSegmentFuncPolicy(
func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
return buf.segmentID, buf.IsFull()
})
}, "buffer full")
}
func GetCompactedSegmentsPolicy(meta metacache.MetaCache) SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
segmentIDs := lo.Map(buffers, func(buffer *segmentBuffer, _ int) int64 { return buffer.segmentID })
return meta.GetSegmentIDsBy(metacache.WithSegmentIDs(segmentIDs...), metacache.WithCompacted())
}, "segment compacted")
}
func GetSyncStaleBufferPolicy(staleDuration time.Duration) SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
current := tsoutil.PhysicalTime(ts)
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
minTs := buf.MinTimestamp()
start := tsoutil.PhysicalTime(minTs)
jitter := time.Duration(rand.Float64() * 0.1 * float64(staleDuration))
return buf.segmentID, current.Sub(start) > staleDuration+jitter
})
}, "buffer stale")
}
func GetSealedSegmentsPolicy(meta metacache.MetaCache) SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(_ []*segmentBuffer, _ typeutil.Timestamp) []int64 {
ids := meta.GetSegmentIDsBy(metacache.WithSegmentState(commonpb.SegmentState_Sealed))
meta.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushing),
metacache.WithSegmentIDs(ids...), metacache.WithSegmentState(commonpb.SegmentState_Sealed))
return ids
}, "segment flushing")
}
func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
flushTs := flushTimestamp.Load()
if flushTs != nonFlushTS && ts >= flushTs {
// flush segment start pos < flushTs && checkpoint > flushTs
ids := lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
_, ok := meta.GetSegmentByID(buf.segmentID)
if !ok {
return buf.segmentID, false
}
return buf.segmentID, buf.MinTimestamp() < flushTs
})
// flush all buffer
return ids
}
return nil
}, "flush ts")
}
func GetOldestBufferPolicy(num int) SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
h := &SegStartPosHeap{}
heap.Init(h)
for _, buf := range buffers {
heap.Push(h, buf)
if h.Len() > num {
heap.Pop(h)
}
}
return lo.Map(*h, func(buf *segmentBuffer, _ int) int64 { return buf.segmentID })
}, "oldest buffers")
}
// SegMemSizeHeap implement max-heap for sorting.
type SegStartPosHeap []*segmentBuffer
func (h SegStartPosHeap) Len() int { return len(h) }
func (h SegStartPosHeap) Less(i, j int) bool {
return h[i].MinTimestamp() > h[j].MinTimestamp()
}
func (h SegStartPosHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *SegStartPosHeap) Push(x any) {
*h = append(*h, x.(*segmentBuffer))
}
func (h *SegStartPosHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}