diff --git a/Makefile b/Makefile index 5f770e276c08..2a3372bad495 100644 --- a/Makefile +++ b/Makefile @@ -516,9 +516,6 @@ generate-mockery-log: generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg generate-mockery-log -generate-mockery-lognode: getdeps - $(INSTALL_PATH)/mockery --config $(PWD)/internal/lognode/.mockery.yaml - generate-yaml: milvus-tools @echo "Updating milvus config yaml" @$(PWD)/bin/tools/config gen-yaml && mv milvus.yaml configs/milvus.yaml diff --git a/internal/lognode/server/wal/walimplstest/builder.go b/internal/lognode/server/wal/walimplstest/builder.go new file mode 100644 index 000000000000..2a8187d4f6a7 --- /dev/null +++ b/internal/lognode/server/wal/walimplstest/builder.go @@ -0,0 +1,32 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "github.com/milvus-io/milvus/internal/lognode/server/wal/registry" + "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" +) + +const ( + walName = "test" +) + +func init() { + // register the builder to the registry. + registry.RegisterBuilder(&openerBuilder{}) + message.RegisterMessageIDUnmsarshaler(walName, UnmarshalTestMessageID) +} + +var _ walimpls.OpenerBuilderImpls = &openerBuilder{} + +type openerBuilder struct{} + +func (o *openerBuilder) Name() string { + return walName +} + +func (o *openerBuilder) Build() (walimpls.OpenerImpls, error) { + return &opener{}, nil +} diff --git a/internal/util/logserviceutil/message/test_message_id.go b/internal/lognode/server/wal/walimplstest/message_id.go similarity index 69% rename from internal/util/logserviceutil/message/test_message_id.go rename to internal/lognode/server/wal/walimplstest/message_id.go index 55d00a3dd931..5d614c3e41b8 100644 --- a/internal/util/logserviceutil/message/test_message_id.go +++ b/internal/lognode/server/wal/walimplstest/message_id.go @@ -1,27 +1,23 @@ //go:build test // +build test -package message +package walimplstest import ( "strconv" -) - -var _ MessageID = testMessageID(0) -const testWALName = "test" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" +) -func init() { - RegisterMessageIDUnmsarshaler(testWALName, UnmarshalTestMessageID) -} +var _ message.MessageID = testMessageID(0) // NewTestMessageID create a new test message id. -func NewTestMessageID(id int64) MessageID { +func NewTestMessageID(id int64) message.MessageID { return testMessageID(id) } // UnmarshalTestMessageID unmarshal the message id. -func UnmarshalTestMessageID(data []byte) (MessageID, error) { +func UnmarshalTestMessageID(data []byte) (message.MessageID, error) { id, err := unmarshalTestMessageID(data) if err != nil { return nil, err @@ -43,21 +39,21 @@ type testMessageID int64 // WALName returns the name of message id related wal. func (id testMessageID) WALName() string { - return testWALName + return walName } // LT less than. -func (id testMessageID) LT(other MessageID) bool { +func (id testMessageID) LT(other message.MessageID) bool { return id < other.(testMessageID) } // LTE less than or equal to. -func (id testMessageID) LTE(other MessageID) bool { +func (id testMessageID) LTE(other message.MessageID) bool { return id <= other.(testMessageID) } // EQ Equal to. -func (id testMessageID) EQ(other MessageID) bool { +func (id testMessageID) EQ(other message.MessageID) bool { return id == other.(testMessageID) } diff --git a/internal/lognode/server/wal/walimplstest/message_log.go b/internal/lognode/server/wal/walimplstest/message_log.go new file mode 100644 index 000000000000..40235d9ef657 --- /dev/null +++ b/internal/lognode/server/wal/walimplstest/message_log.go @@ -0,0 +1,64 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "context" + "sync" + + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var logs = typeutil.NewConcurrentMap[string, *messageLog]() + +func getOrCreateLogs(name string) *messageLog { + l := newMessageLog() + l, _ = logs.GetOrInsert(name, l) + return l +} + +func newMessageLog() *messageLog { + return &messageLog{ + cond: syncutil.NewContextCond(&sync.Mutex{}), + id: 0, + logs: make([]message.ImmutableMessage, 0), + } +} + +type messageLog struct { + cond *syncutil.ContextCond + id int64 + logs []message.ImmutableMessage +} + +func (l *messageLog) Append(_ context.Context, msg message.MutableMessage) (message.MessageID, error) { + l.cond.LockAndBroadcast() + defer l.cond.L.Unlock() + newMessageID := NewTestMessageID(l.id) + l.id++ + l.logs = append(l.logs, msg.IntoImmutableMessage(newMessageID)) + return newMessageID, nil +} + +func (l *messageLog) ReadAt(ctx context.Context, idx int) (message.ImmutableMessage, error) { + var msg message.ImmutableMessage + l.cond.L.Lock() + for idx >= len(l.logs) { + if err := l.cond.Wait(ctx); err != nil { + return nil, err + } + } + msg = l.logs[idx] + l.cond.L.Unlock() + + return msg, nil +} + +func (l *messageLog) Len() int64 { + l.cond.L.Lock() + defer l.cond.L.Unlock() + return int64(len(l.logs)) +} diff --git a/internal/lognode/server/wal/walimplstest/opener.go b/internal/lognode/server/wal/walimplstest/opener.go new file mode 100644 index 000000000000..ff34a20ed34c --- /dev/null +++ b/internal/lognode/server/wal/walimplstest/opener.go @@ -0,0 +1,26 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "context" + + "github.com/milvus-io/milvus/internal/lognode/server/wal/helper" + "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" +) + +var _ walimpls.OpenerImpls = &opener{} + +type opener struct{} + +func (*opener) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) { + l := getOrCreateLogs(opt.Channel.GetName()) + return &walImpls{ + WALHelper: *helper.NewWALHelper(opt), + datas: l, + }, nil +} + +func (*opener) Close() { +} diff --git a/internal/lognode/server/wal/walimplstest/scanner.go b/internal/lognode/server/wal/walimplstest/scanner.go new file mode 100644 index 000000000000..7412da694083 --- /dev/null +++ b/internal/lognode/server/wal/walimplstest/scanner.go @@ -0,0 +1,51 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "github.com/milvus-io/milvus/internal/lognode/server/wal/helper" + "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" +) + +var _ walimpls.ScannerImpls = &scannerImpls{} + +func newScannerImpls(opts walimpls.ReadOption, data *messageLog, offset int) *scannerImpls { + s := &scannerImpls{ + ScannerHelper: helper.NewScannerHelper(opts.Name), + datas: data, + ch: make(chan message.ImmutableMessage), + offset: offset, + } + go s.executeConsume() + return s +} + +type scannerImpls struct { + *helper.ScannerHelper + datas *messageLog + ch chan message.ImmutableMessage + offset int +} + +func (s *scannerImpls) executeConsume() { + defer close(s.ch) + for { + msg, err := s.datas.ReadAt(s.Context(), s.offset) + if err != nil { + s.Finish(nil) + return + } + s.ch <- msg + s.offset++ + } +} + +func (s *scannerImpls) Chan() <-chan message.ImmutableMessage { + return s.ch +} + +func (s *scannerImpls) Close() error { + return s.ScannerHelper.Close() +} diff --git a/internal/lognode/server/wal/walimplstest/wal.go b/internal/lognode/server/wal/walimplstest/wal.go new file mode 100644 index 000000000000..ddea29aada2f --- /dev/null +++ b/internal/lognode/server/wal/walimplstest/wal.go @@ -0,0 +1,52 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "context" + + "github.com/milvus-io/milvus/internal/lognode/server/wal/helper" + "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" + "github.com/milvus-io/milvus/internal/proto/logpb" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" +) + +var _ walimpls.WALImpls = &walImpls{} + +type walImpls struct { + helper.WALHelper + datas *messageLog +} + +func (w *walImpls) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + return w.datas.Append(ctx, msg) +} + +func (w *walImpls) Read(ctx context.Context, opts walimpls.ReadOption) (walimpls.ScannerImpls, error) { + offset := int64(0) + switch policy := opts.DeliverPolicy.Policy.(type) { + case *logpb.DeliverPolicy_All: + offset = 0 + case *logpb.DeliverPolicy_Latest: + offset = w.datas.Len() + case *logpb.DeliverPolicy_StartFrom: + id, err := unmarshalTestMessageID(policy.StartFrom.Id) + if err != nil { + return nil, err + } + offset = int64(id) + case *logpb.DeliverPolicy_StartAfter: + id, err := unmarshalTestMessageID(policy.StartAfter.Id) + if err != nil { + return nil, err + } + offset = int64(id + 1) + } + return newScannerImpls( + opts, w.datas, int(offset), + ), nil +} + +func (w *walImpls) Close() { +} diff --git a/internal/lognode/server/wal/walimplstest/wal_test.go b/internal/lognode/server/wal/walimplstest/wal_test.go new file mode 100644 index 000000000000..47845725473a --- /dev/null +++ b/internal/lognode/server/wal/walimplstest/wal_test.go @@ -0,0 +1,11 @@ +package walimplstest + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" +) + +func TestWALImplsTest(t *testing.T) { + walimpls.NewWALImplsTestFramework(t, 100, &openerBuilder{}).Run() +} diff --git a/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go b/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go index 190966c7783c..de15d436a081 100644 --- a/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go +++ b/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go @@ -61,6 +61,50 @@ func (_c *MockMutableMessage_EstimateSize_Call) RunAndReturn(run func() int) *Mo return _c } +// IntoImmutableMessage provides a mock function with given fields: msgID +func (_m *MockMutableMessage) IntoImmutableMessage(msgID message.MessageID) message.ImmutableMessage { + ret := _m.Called(msgID) + + var r0 message.ImmutableMessage + if rf, ok := ret.Get(0).(func(message.MessageID) message.ImmutableMessage); ok { + r0 = rf(msgID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.ImmutableMessage) + } + } + + return r0 +} + +// MockMutableMessage_IntoImmutableMessage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IntoImmutableMessage' +type MockMutableMessage_IntoImmutableMessage_Call struct { + *mock.Call +} + +// IntoImmutableMessage is a helper method to define mock.On call +// - msgID message.MessageID +func (_e *MockMutableMessage_Expecter) IntoImmutableMessage(msgID interface{}) *MockMutableMessage_IntoImmutableMessage_Call { + return &MockMutableMessage_IntoImmutableMessage_Call{Call: _e.mock.On("IntoImmutableMessage", msgID)} +} + +func (_c *MockMutableMessage_IntoImmutableMessage_Call) Run(run func(msgID message.MessageID)) *MockMutableMessage_IntoImmutableMessage_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(message.MessageID)) + }) + return _c +} + +func (_c *MockMutableMessage_IntoImmutableMessage_Call) Return(_a0 message.ImmutableMessage) *MockMutableMessage_IntoImmutableMessage_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_IntoImmutableMessage_Call) RunAndReturn(run func(message.MessageID) message.ImmutableMessage) *MockMutableMessage_IntoImmutableMessage_Call { + _c.Call.Return(run) + return _c +} + // MessageType provides a mock function with given fields: func (_m *MockMutableMessage) MessageType() message.MessageType { ret := _m.Called() diff --git a/internal/util/logserviceutil/message/message.go b/internal/util/logserviceutil/message/message.go index 776dd8107329..83bf8356d89d 100644 --- a/internal/util/logserviceutil/message/message.go +++ b/internal/util/logserviceutil/message/message.go @@ -33,6 +33,9 @@ type MutableMessage interface { // Properties returns the message properties. Properties() Properties + + // IntoImmutableMessage converts the mutable message to immutable message. + IntoImmutableMessage(msgID MessageID) ImmutableMessage } // ImmutableMessage is the read-only message interface. diff --git a/internal/util/logserviceutil/message/message_builder_test.go b/internal/util/logserviceutil/message/message_builder_test.go index 399ca85570ff..c937c3d5d26b 100644 --- a/internal/util/logserviceutil/message/message_builder_test.go +++ b/internal/util/logserviceutil/message/message_builder_test.go @@ -1,11 +1,13 @@ package message_test import ( + "fmt" "testing" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message" "github.com/milvus-io/milvus/internal/util/logserviceutil/message" ) @@ -30,13 +32,22 @@ func TestMessage(t *testing.T) { assert.Equal(t, uint64(123), tt) assert.Equal(t, len([]byte(v)), n) - lcMsgID := message.NewTestMessageID(456) + lcMsgID := mock_message.NewMockMessageID(t) + lcMsgID.EXPECT().Marshal().Return([]byte("lcMsgID")) mutableMessage.WithLastConfirmed(lcMsgID) v, ok = mutableMessage.Properties().Get("_lc") assert.True(t, ok) - assert.Equal(t, v, "456") + assert.Equal(t, v, "lcMsgID") - msgID := message.NewTestMessageID(123) + msgID := mock_message.NewMockMessageID(t) + msgID.EXPECT().EQ(msgID).Return(true) + msgID.EXPECT().WALName().Return("testMsgID") + message.RegisterMessageIDUnmsarshaler("testMsgID", func(data []byte) (message.MessageID, error) { + if string(data) == "lcMsgID" { + return msgID, nil + } + panic(fmt.Sprintf("unexpected data: %s", data)) + }) b = message.NewBuilder() immutableMessage := b.WithMessageID(msgID). @@ -46,7 +57,7 @@ func TestMessage(t *testing.T) { "_t": "1", "_tt": string(proto.EncodeVarint(456)), "_v": "1", - "_lc": "456", + "_lc": "lcMsgID", }). BuildImmutable() @@ -57,7 +68,7 @@ func TestMessage(t *testing.T) { assert.Equal(t, "value", v) assert.True(t, ok) assert.Equal(t, message.MessageTypeTimeTick, immutableMessage.MessageType()) - assert.Equal(t, 32, immutableMessage.EstimateSize()) + assert.Equal(t, 36, immutableMessage.EstimateSize()) assert.Equal(t, message.Version(1), immutableMessage.Version()) assert.Equal(t, uint64(456), immutableMessage.TimeTick()) assert.NotNil(t, immutableMessage.LastConfirmedMessageID()) diff --git a/internal/util/logserviceutil/message/message_id_test.go b/internal/util/logserviceutil/message/message_id_test.go index 349a16f2bdcf..5aef58bddd8c 100644 --- a/internal/util/logserviceutil/message/message_id_test.go +++ b/internal/util/logserviceutil/message/message_id_test.go @@ -1,4 +1,4 @@ -package message +package message_test import ( "bytes" @@ -6,25 +6,35 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" ) func TestRegisterMessageIDUnmarshaler(t *testing.T) { - msgID := NewTestMessageID(123) + msgID := mock_message.NewMockMessageID(t) + + message.RegisterMessageIDUnmsarshaler("test", func(b []byte) (message.MessageID, error) { + if bytes.Equal(b, []byte("123")) { + return msgID, nil + } + return nil, errors.New("invalid") + }) - id, err := UnmarshalMessageID("test", []byte("123")) - assert.True(t, id.EQ(msgID)) + id, err := message.UnmarshalMessageID("test", []byte("123")) + assert.NotNil(t, id) assert.NoError(t, err) - id, err = UnmarshalMessageID("test", []byte("1234a")) + id, err = message.UnmarshalMessageID("test", []byte("1234")) assert.Nil(t, id) assert.Error(t, err) assert.Panics(t, func() { - UnmarshalMessageID("test1", []byte("123")) + message.UnmarshalMessageID("test1", []byte("123")) }) assert.Panics(t, func() { - RegisterMessageIDUnmsarshaler("test", func(b []byte) (MessageID, error) { + message.RegisterMessageIDUnmsarshaler("test", func(b []byte) (message.MessageID, error) { if bytes.Equal(b, []byte("123")) { return msgID, nil } diff --git a/internal/util/logserviceutil/message/message_impl.go b/internal/util/logserviceutil/message/message_impl.go index f1f9de5bdaa0..e2ad94feaa4e 100644 --- a/internal/util/logserviceutil/message/message_impl.go +++ b/internal/util/logserviceutil/message/message_impl.go @@ -49,6 +49,14 @@ func (m *messageImpl) WithLastConfirmed(id MessageID) MutableMessage { return m } +// IntoImmutableMessage converts current message to immutable message. +func (m *messageImpl) IntoImmutableMessage(id MessageID) ImmutableMessage { + return &immutableMessageImpl{ + messageImpl: *m, + id: id, + } +} + type immutableMessageImpl struct { messageImpl id MessageID