Skip to content

Commit

Permalink
enhance: timetick interceptor implementation
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <chyezh@outlook.com>
  • Loading branch information
chyezh committed Jun 27, 2024
1 parent d2bc4a5 commit 46bbe8c
Show file tree
Hide file tree
Showing 46 changed files with 1,899 additions and 180 deletions.
16 changes: 7 additions & 9 deletions internal/mocks/streamingnode/server/mock_wal/mock_WAL.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 0 additions & 36 deletions internal/proto/streamingpb/extends.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,6 @@
package streamingpb

import (
"google.golang.org/protobuf/types/known/emptypb"
)

const (
ServiceMethodPrefix = "/milvus.proto.log"
InitialTerm = int64(-1)
)

func NewDeliverAll() *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_All{
All: &emptypb.Empty{},
},
}
}

func NewDeliverLatest() *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_Latest{
Latest: &emptypb.Empty{},
},
}
}

func NewDeliverStartFrom(messageID *MessageID) *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_StartFrom{
StartFrom: messageID,
},
}
}

func NewDeliverStartAfter(messageID *MessageID) *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_StartAfter{
StartAfter: messageID,
},
}
}
76 changes: 76 additions & 0 deletions internal/streamingnode/server/resource/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package resource

import (
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource/timestamp"
"github.com/milvus-io/milvus/internal/types"
)

var r *resourceImpl // singleton resource instance

// optResourceInit is the option to initialize the resource.
type optResourceInit func(r *resourceImpl)

// OptETCD provides the etcd client to the resource.
func OptETCD(etcd *clientv3.Client) optResourceInit {
return func(r *resourceImpl) {
r.etcdClient = etcd
}
}

// OptRootCoordClient provides the root coordinator client to the resource.
func OptRootCoordClient(rootCoordClient types.RootCoordClient) optResourceInit {
return func(r *resourceImpl) {
r.rootCoordClient = rootCoordClient
}
}

// Init initializes the singleton of resources.
// Should be call when streaming node startup.
func Init(opts ...optResourceInit) {
r = &resourceImpl{}
for _, opt := range opts {
opt(r)
}
r.timestampAllocator = timestamp.NewAllocator(r.rootCoordClient)

assertNotNil(r.TimestampAllocator())
assertNotNil(r.ETCD())
assertNotNil(r.RootCoordClient())
}

// Resource access the underlying singleton of resources.
func Resource() *resourceImpl {
return r
}

// resourceImpl is a basic resource dependency for streamingnode server.
// All utility on it is concurrent-safe and singleton.
type resourceImpl struct {
timestampAllocator timestamp.Allocator
etcdClient *clientv3.Client
rootCoordClient types.RootCoordClient
}

// TimestampAllocator returns the timestamp allocator to allocate timestamp.
func (r *resourceImpl) TimestampAllocator() timestamp.Allocator {
return r.timestampAllocator
}

// ETCD returns the etcd client.
func (r *resourceImpl) ETCD() *clientv3.Client {
return r.etcdClient
}

// RootCoordClient returns the root coordinator client.
func (r *resourceImpl) RootCoordClient() types.RootCoordClient {
return r.rootCoordClient
}

// assertNotNil panics if the resource is nil.
func assertNotNil(v interface{}) {
if v == nil {
panic("nil resource")
}
}
31 changes: 31 additions & 0 deletions internal/streamingnode/server/resource/resource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package resource

import (
"testing"

"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus/internal/mocks"
)

func TestInit(t *testing.T) {
assert.Panics(t, func() {
Init()
})
assert.Panics(t, func() {
Init(OptETCD(&clientv3.Client{}))
})
assert.Panics(t, func() {
Init(OptETCD(&clientv3.Client{}))
})
Init(OptETCD(&clientv3.Client{}), OptRootCoordClient(mocks.NewMockRootCoordClient(t)))

assert.NotNil(t, Resource().TimestampAllocator())
assert.NotNil(t, Resource().ETCD())
assert.NotNil(t, Resource().RootCoordClient())
}

func TestInitForTest(t *testing.T) {
InitForTest()
}
17 changes: 17 additions & 0 deletions internal/streamingnode/server/resource/test_utility.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//go:build test
// +build test

package resource

import "github.com/milvus-io/milvus/internal/streamingnode/server/resource/timestamp"

// InitForTest initializes the singleton of resources for test.
func InitForTest(opts ...optResourceInit) {
r = &resourceImpl{}
for _, opt := range opts {
opt(r)
}
if r.rootCoordClient != nil {
r.timestampAllocator = timestamp.NewAllocator(r.rootCoordClient)
}
}
Loading

0 comments on commit 46bbe8c

Please sign in to comment.