Skip to content

Commit

Permalink
Add a seal policy which restrict the lifetime of a segment (#7172)
Browse files Browse the repository at this point in the history
issue: #7164
Signed-off-by: sunby <bingyi.sun@zilliz.com>
  • Loading branch information
sunby committed Aug 20, 2021
1 parent 9b7c782 commit 6e34f4c
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 23 deletions.
6 changes: 0 additions & 6 deletions internal/datacoord/policy.go
Expand Up @@ -21,12 +21,6 @@ import (
"go.uber.org/zap"
)

type clusterDeltaChange struct {
newNodes []string
offlines []string
restarts []string
}

// data node register func, simple func wrapping policy
type dataNodeRegisterPolicy func(cluster []*NodeInfo, session *NodeInfo, buffer []*datapb.ChannelStatus) ([]*NodeInfo, []*datapb.ChannelStatus)

Expand Down
18 changes: 9 additions & 9 deletions internal/datacoord/segment_allocation_policy.go
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)

Expand Down Expand Up @@ -81,9 +82,6 @@ type sealPolicy func(maxCount, writtenCount, allocatedCount int64) bool
// segmentSealPolicy seal policy applies to segment
type segmentSealPolicy func(segment *SegmentInfo, ts Timestamp) bool

// channelSealPolicy seal policy applies to channel
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo

// getSegmentCapacityPolicy get segmentSealPolicy with segment size factor policy
func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
return func(segment *SegmentInfo, ts Timestamp) bool {
Expand All @@ -96,12 +94,18 @@ func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
}

// getLastExpiresLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime
func getLastExpiresLifetimePolicy(lifetime uint64) segmentSealPolicy {
func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy {
return func(segment *SegmentInfo, ts Timestamp) bool {
return (ts - segment.GetLastExpireTime()) > lifetime
pts, _ := tsoutil.ParseTS(ts)
epts, _ := tsoutil.ParseTS(segment.GetLastExpireTime())
d := pts.Sub(epts)
return d >= lifetime
}
}

// channelSealPolicy seal policy applies to channel
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo

// getChannelCapacityPolicy get channelSealPolicy with channel segment capacity policy
func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy {
return func(channel string, segs []*SegmentInfo, ts Timestamp) []*SegmentInfo {
Expand All @@ -121,10 +125,6 @@ func sortSegmentsByLastExpires(segs []*SegmentInfo) {
})
}

func sealPolicyV1(maxCount, writtenCount, allocatedCount int64) bool {
return float64(writtenCount) >= Params.SegmentSealProportion*float64(maxCount)
}

type flushPolicy func(segment *SegmentInfo, t Timestamp) bool

const flushInterval = 2 * time.Second
Expand Down
35 changes: 35 additions & 0 deletions internal/datacoord/segment_allocation_policy_test.go
@@ -0,0 +1,35 @@
package datacoord

import (
"testing"
"time"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/stretchr/testify/assert"
)

func TestSealSegmentPolicy(t *testing.T) {
t.Run("test seal segment by lifetime", func(t *testing.T) {
lifetime := 2 * time.Second
now := time.Now()
curTS := now.UnixNano() / int64(time.Millisecond)
nosealTs := (now.Add(lifetime / 2)).UnixNano() / int64(time.Millisecond)
sealTs := (now.Add(lifetime)).UnixNano() / int64(time.Millisecond)

p := sealByLifetimePolicy(lifetime)

segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
LastExpireTime: tsoutil.ComposeTS(curTS, 0),
},
}

shouldSeal := p(segment, tsoutil.ComposeTS(nosealTs, 0))
assert.False(t, shouldSeal)

shouldSeal = p(segment, tsoutil.ComposeTS(sealTs, 0))
assert.True(t, shouldSeal)
})
}
17 changes: 9 additions & 8 deletions internal/datacoord/segment_manager.go
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
)

const segmentMaxLifetime = 24 * time.Hour

// Manager manage segment related operations.
type Manager interface {
// AllocSegment allocate rows and record the allocation.
Expand Down Expand Up @@ -133,12 +135,11 @@ func defaultAlocatePolicy() AllocatePolicy {
return AllocatePolicyV1
}

func defaultSealPolicy() sealPolicy {
return sealPolicyV1
}

func defaultSegmentSealPolicy() segmentSealPolicy {
return getSegmentCapacityPolicy(Params.SegmentSealProportion)
func defaultSegmentSealPolicy() []segmentSealPolicy {
return []segmentSealPolicy{
sealByLifetimePolicy(segmentMaxLifetime),
getSegmentCapacityPolicy(Params.SegmentSealProportion),
}
}

func defaultFlushPolicy() flushPolicy {
Expand All @@ -154,8 +155,8 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *Se
segments: make([]UniqueID, 0),
estimatePolicy: defaultCalUpperLimitPolicy(),
allocPolicy: defaultAlocatePolicy(),
segmentSealPolicies: []segmentSealPolicy{defaultSegmentSealPolicy()}, // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
flushPolicy: defaultFlushPolicy(),
allocPool: sync.Pool{
New: func() interface{} {
Expand Down

0 comments on commit 6e34f4c

Please sign in to comment.