Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: timetick interceptor implementation #34238

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions internal/mocks/streamingnode/server/mock_wal/mock_WAL.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 0 additions & 36 deletions internal/proto/streamingpb/extends.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,6 @@
package streamingpb

import (
"google.golang.org/protobuf/types/known/emptypb"
)

const (
ServiceMethodPrefix = "/milvus.proto.log"
InitialTerm = int64(-1)
)

func NewDeliverAll() *DeliverPolicy {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant code here

return &DeliverPolicy{
Policy: &DeliverPolicy_All{
All: &emptypb.Empty{},
},
}
}

func NewDeliverLatest() *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_Latest{
Latest: &emptypb.Empty{},
},
}
}

func NewDeliverStartFrom(messageID *MessageID) *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_StartFrom{
StartFrom: messageID,
},
}
}

func NewDeliverStartAfter(messageID *MessageID) *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_StartAfter{
StartAfter: messageID,
},
}
}
76 changes: 76 additions & 0 deletions internal/streamingnode/server/resource/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package resource
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resource package is used to access the streamingnode global singleton.


import (
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource/timestamp"
"github.com/milvus-io/milvus/internal/types"
)

var r *resourceImpl // singleton resource instance

// optResourceInit is the option to initialize the resource.
type optResourceInit func(r *resourceImpl)

// OptETCD provides the etcd client to the resource.
func OptETCD(etcd *clientv3.Client) optResourceInit {
return func(r *resourceImpl) {
r.etcdClient = etcd
}
}

// OptRootCoordClient provides the root coordinator client to the resource.
func OptRootCoordClient(rootCoordClient types.RootCoordClient) optResourceInit {
return func(r *resourceImpl) {
r.rootCoordClient = rootCoordClient
}
}

// Init initializes the singleton of resources.
// Should be call when streaming node startup.
func Init(opts ...optResourceInit) {
r = &resourceImpl{}
for _, opt := range opts {
opt(r)
}
r.timestampAllocator = timestamp.NewAllocator(r.rootCoordClient)

assertNotNil(r.TimestampAllocator())
assertNotNil(r.ETCD())
assertNotNil(r.RootCoordClient())
}

// Resource access the underlying singleton of resources.
func Resource() *resourceImpl {
return r
}

// resourceImpl is a basic resource dependency for streamingnode server.
// All utility on it is concurrent-safe and singleton.
type resourceImpl struct {
timestampAllocator timestamp.Allocator
etcdClient *clientv3.Client
rootCoordClient types.RootCoordClient
}

// TimestampAllocator returns the timestamp allocator to allocate timestamp.
func (r *resourceImpl) TimestampAllocator() timestamp.Allocator {
return r.timestampAllocator
}

// ETCD returns the etcd client.
func (r *resourceImpl) ETCD() *clientv3.Client {
return r.etcdClient
}

// RootCoordClient returns the root coordinator client.
func (r *resourceImpl) RootCoordClient() types.RootCoordClient {
return r.rootCoordClient
}

// assertNotNil panics if the resource is nil.
func assertNotNil(v interface{}) {
if v == nil {
panic("nil resource")
}
}
31 changes: 31 additions & 0 deletions internal/streamingnode/server/resource/resource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package resource

import (
"testing"

"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus/internal/mocks"
)

func TestInit(t *testing.T) {
assert.Panics(t, func() {
Init()
})
assert.Panics(t, func() {
Init(OptETCD(&clientv3.Client{}))
})
assert.Panics(t, func() {
Init(OptETCD(&clientv3.Client{}))
})
Init(OptETCD(&clientv3.Client{}), OptRootCoordClient(mocks.NewMockRootCoordClient(t)))

assert.NotNil(t, Resource().TimestampAllocator())
assert.NotNil(t, Resource().ETCD())
assert.NotNil(t, Resource().RootCoordClient())
}

func TestInitForTest(t *testing.T) {
InitForTest()
}
17 changes: 17 additions & 0 deletions internal/streamingnode/server/resource/test_utility.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//go:build test
// +build test

package resource

import "github.com/milvus-io/milvus/internal/streamingnode/server/resource/timestamp"

// InitForTest initializes the singleton of resources for test.
func InitForTest(opts ...optResourceInit) {
r = &resourceImpl{}
for _, opt := range opts {
opt(r)
}
if r.rootCoordClient != nil {
r.timestampAllocator = timestamp.NewAllocator(r.rootCoordClient)
}
}
Loading
Loading