Skip to content

Commit

Permalink
enhance: timetick interceptor implementation (#34238)
Browse files Browse the repository at this point in the history
issue: #33285

- optimize the message package
- add interceptor package to achieve append operation intercepting.
- add timetick interceptor to attach timetick properties for message.
- add timetick background task to send timetick message.

Signed-off-by: chyezh <chyezh@outlook.com>
  • Loading branch information
chyezh committed Jul 2, 2024
1 parent a5be322 commit 3563136
Show file tree
Hide file tree
Showing 46 changed files with 1,899 additions and 180 deletions.
16 changes: 7 additions & 9 deletions internal/mocks/streamingnode/server/mock_wal/mock_WAL.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 0 additions & 36 deletions internal/proto/streamingpb/extends.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,6 @@
package streamingpb

import (
"google.golang.org/protobuf/types/known/emptypb"
)

const (
ServiceMethodPrefix = "/milvus.proto.log"
InitialTerm = int64(-1)
)

func NewDeliverAll() *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_All{
All: &emptypb.Empty{},
},
}
}

func NewDeliverLatest() *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_Latest{
Latest: &emptypb.Empty{},
},
}
}

func NewDeliverStartFrom(messageID *MessageID) *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_StartFrom{
StartFrom: messageID,
},
}
}

func NewDeliverStartAfter(messageID *MessageID) *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_StartAfter{
StartAfter: messageID,
},
}
}
76 changes: 76 additions & 0 deletions internal/streamingnode/server/resource/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package resource

import (
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource/timestamp"
"github.com/milvus-io/milvus/internal/types"
)

var r *resourceImpl // singleton resource instance

// optResourceInit is the option to initialize the resource.
type optResourceInit func(r *resourceImpl)

// OptETCD provides the etcd client to the resource.
func OptETCD(etcd *clientv3.Client) optResourceInit {
return func(r *resourceImpl) {
r.etcdClient = etcd
}
}

// OptRootCoordClient provides the root coordinator client to the resource.
func OptRootCoordClient(rootCoordClient types.RootCoordClient) optResourceInit {
return func(r *resourceImpl) {
r.rootCoordClient = rootCoordClient
}
}

// Init initializes the singleton of resources.
// Should be call when streaming node startup.
func Init(opts ...optResourceInit) {
r = &resourceImpl{}
for _, opt := range opts {
opt(r)
}
r.timestampAllocator = timestamp.NewAllocator(r.rootCoordClient)

assertNotNil(r.TimestampAllocator())
assertNotNil(r.ETCD())
assertNotNil(r.RootCoordClient())
}

// Resource access the underlying singleton of resources.
func Resource() *resourceImpl {
return r
}

// resourceImpl is a basic resource dependency for streamingnode server.
// All utility on it is concurrent-safe and singleton.
type resourceImpl struct {
timestampAllocator timestamp.Allocator
etcdClient *clientv3.Client
rootCoordClient types.RootCoordClient
}

// TimestampAllocator returns the timestamp allocator to allocate timestamp.
func (r *resourceImpl) TimestampAllocator() timestamp.Allocator {
return r.timestampAllocator
}

// ETCD returns the etcd client.
func (r *resourceImpl) ETCD() *clientv3.Client {
return r.etcdClient
}

// RootCoordClient returns the root coordinator client.
func (r *resourceImpl) RootCoordClient() types.RootCoordClient {
return r.rootCoordClient
}

// assertNotNil panics if the resource is nil.
func assertNotNil(v interface{}) {
if v == nil {
panic("nil resource")
}
}
31 changes: 31 additions & 0 deletions internal/streamingnode/server/resource/resource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package resource

import (
"testing"

"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus/internal/mocks"
)

func TestInit(t *testing.T) {
assert.Panics(t, func() {
Init()
})
assert.Panics(t, func() {
Init(OptETCD(&clientv3.Client{}))
})
assert.Panics(t, func() {
Init(OptETCD(&clientv3.Client{}))
})
Init(OptETCD(&clientv3.Client{}), OptRootCoordClient(mocks.NewMockRootCoordClient(t)))

assert.NotNil(t, Resource().TimestampAllocator())
assert.NotNil(t, Resource().ETCD())
assert.NotNil(t, Resource().RootCoordClient())
}

func TestInitForTest(t *testing.T) {
InitForTest()
}
17 changes: 17 additions & 0 deletions internal/streamingnode/server/resource/test_utility.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//go:build test
// +build test

package resource

import "github.com/milvus-io/milvus/internal/streamingnode/server/resource/timestamp"

// InitForTest initializes the singleton of resources for test.
func InitForTest(opts ...optResourceInit) {
r = &resourceImpl{}
for _, opt := range opts {
opt(r)
}
if r.rootCoordClient != nil {
r.timestampAllocator = timestamp.NewAllocator(r.rootCoordClient)
}
}
Loading

0 comments on commit 3563136

Please sign in to comment.