diff --git a/Makefile b/Makefile index 24a6e716540b..f7e0e763fbff 100644 --- a/Makefile +++ b/Makefile @@ -517,8 +517,8 @@ generate-mockery-chunk-manager: getdeps generate-mockery-pkg: $(MAKE) -C pkg generate-mockery -generate-mockery-log: - $(INSTALL_PATH)/mockery --config $(PWD)/internal/logservice/.mockery.yaml +generate-mockery-streaming: + $(INSTALL_PATH)/mockery --config $(PWD)/internal/streamingservice/.mockery.yaml generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg generate-mockery-log diff --git a/go.mod b/go.mod index 653c0798af39..166f21a01293 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,9 @@ require ( github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/pkg/errors v0.9.1 + github.com/remeh/sizedwaitgroup v1.0.0 github.com/zeebo/xxh3 v1.0.2 + google.golang.org/protobuf v1.33.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -238,7 +240,6 @@ require ( google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect - google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 70b031afdde7..f52d95b12cdc 100644 --- a/go.sum +++ b/go.sum @@ -762,6 +762,8 @@ github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/quasilyte/go-ruleguard/dsl v0.3.22 h1:wd8zkOhSNr+I+8Qeciml08ivDt1pSXe60+5DqOpCjPE= github.com/quasilyte/go-ruleguard/dsl v0.3.22/go.mod h1:KeCP03KrjuSO0H1kTuZQCWlQPulDV6YMIXmpQss17rU= +github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E= +github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ= diff --git a/internal/logservice/.mockery.yaml b/internal/logservice/.mockery.yaml deleted file mode 100644 index 22c293d1556a..000000000000 --- a/internal/logservice/.mockery.yaml +++ /dev/null @@ -1,13 +0,0 @@ -quiet: False -with-expecter: True -filename: "mock_{{.InterfaceName}}.go" -dir: "internal/mocks/{{trimPrefix .PackagePath \"github.com/milvus-io/milvus/internal\" | dir }}/mock_{{.PackageName}}" -mockname: "Mock{{.InterfaceName}}" -outpkg: "mock_{{.PackageName}}" -packages: - github.com/milvus-io/milvus/internal/util/logserviceutil/message: - interfaces: - MessageID: - ImmutableMessage: - MutableMessage: - RProperties: \ No newline at end of file diff --git a/internal/mocks/streamingnode/server/mock_wal/mock_Opener.go b/internal/mocks/streamingnode/server/mock_wal/mock_Opener.go new file mode 100644 index 000000000000..868a981dcd58 --- /dev/null +++ b/internal/mocks/streamingnode/server/mock_wal/mock_Opener.go @@ -0,0 +1,124 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_wal + +import ( + context "context" + + wal "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + mock "github.com/stretchr/testify/mock" +) + +// MockOpener is an autogenerated mock type for the Opener type +type MockOpener struct { + mock.Mock +} + +type MockOpener_Expecter struct { + mock *mock.Mock +} + +func (_m *MockOpener) EXPECT() *MockOpener_Expecter { + return &MockOpener_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *MockOpener) Close() { + _m.Called() +} + +// MockOpener_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockOpener_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockOpener_Expecter) Close() *MockOpener_Close_Call { + return &MockOpener_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockOpener_Close_Call) Run(run func()) *MockOpener_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpener_Close_Call) Return() *MockOpener_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockOpener_Close_Call) RunAndReturn(run func()) *MockOpener_Close_Call { + _c.Call.Return(run) + return _c +} + +// Open provides a mock function with given fields: ctx, opt +func (_m *MockOpener) Open(ctx context.Context, opt *wal.OpenOption) (wal.WAL, error) { + ret := _m.Called(ctx, opt) + + var r0 wal.WAL + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *wal.OpenOption) (wal.WAL, error)); ok { + return rf(ctx, opt) + } + if rf, ok := ret.Get(0).(func(context.Context, *wal.OpenOption) wal.WAL); ok { + r0 = rf(ctx, opt) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(wal.WAL) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *wal.OpenOption) error); ok { + r1 = rf(ctx, opt) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockOpener_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' +type MockOpener_Open_Call struct { + *mock.Call +} + +// Open is a helper method to define mock.On call +// - ctx context.Context +// - opt *wal.OpenOption +func (_e *MockOpener_Expecter) Open(ctx interface{}, opt interface{}) *MockOpener_Open_Call { + return &MockOpener_Open_Call{Call: _e.mock.On("Open", ctx, opt)} +} + +func (_c *MockOpener_Open_Call) Run(run func(ctx context.Context, opt *wal.OpenOption)) *MockOpener_Open_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*wal.OpenOption)) + }) + return _c +} + +func (_c *MockOpener_Open_Call) Return(_a0 wal.WAL, _a1 error) *MockOpener_Open_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockOpener_Open_Call) RunAndReturn(run func(context.Context, *wal.OpenOption) (wal.WAL, error)) *MockOpener_Open_Call { + _c.Call.Return(run) + return _c +} + +// NewMockOpener creates a new instance of MockOpener. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockOpener(t interface { + mock.TestingT + Cleanup(func()) +}) *MockOpener { + mock := &MockOpener{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/mock_wal/mock_OpenerBuilder.go b/internal/mocks/streamingnode/server/mock_wal/mock_OpenerBuilder.go new file mode 100644 index 000000000000..2f2b3d0c76f2 --- /dev/null +++ b/internal/mocks/streamingnode/server/mock_wal/mock_OpenerBuilder.go @@ -0,0 +1,129 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_wal + +import ( + wal "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + mock "github.com/stretchr/testify/mock" +) + +// MockOpenerBuilder is an autogenerated mock type for the OpenerBuilder type +type MockOpenerBuilder struct { + mock.Mock +} + +type MockOpenerBuilder_Expecter struct { + mock *mock.Mock +} + +func (_m *MockOpenerBuilder) EXPECT() *MockOpenerBuilder_Expecter { + return &MockOpenerBuilder_Expecter{mock: &_m.Mock} +} + +// Build provides a mock function with given fields: +func (_m *MockOpenerBuilder) Build() (wal.Opener, error) { + ret := _m.Called() + + var r0 wal.Opener + var r1 error + if rf, ok := ret.Get(0).(func() (wal.Opener, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() wal.Opener); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(wal.Opener) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockOpenerBuilder_Build_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Build' +type MockOpenerBuilder_Build_Call struct { + *mock.Call +} + +// Build is a helper method to define mock.On call +func (_e *MockOpenerBuilder_Expecter) Build() *MockOpenerBuilder_Build_Call { + return &MockOpenerBuilder_Build_Call{Call: _e.mock.On("Build")} +} + +func (_c *MockOpenerBuilder_Build_Call) Run(run func()) *MockOpenerBuilder_Build_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpenerBuilder_Build_Call) Return(_a0 wal.Opener, _a1 error) *MockOpenerBuilder_Build_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockOpenerBuilder_Build_Call) RunAndReturn(run func() (wal.Opener, error)) *MockOpenerBuilder_Build_Call { + _c.Call.Return(run) + return _c +} + +// Name provides a mock function with given fields: +func (_m *MockOpenerBuilder) Name() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockOpenerBuilder_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name' +type MockOpenerBuilder_Name_Call struct { + *mock.Call +} + +// Name is a helper method to define mock.On call +func (_e *MockOpenerBuilder_Expecter) Name() *MockOpenerBuilder_Name_Call { + return &MockOpenerBuilder_Name_Call{Call: _e.mock.On("Name")} +} + +func (_c *MockOpenerBuilder_Name_Call) Run(run func()) *MockOpenerBuilder_Name_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpenerBuilder_Name_Call) Return(_a0 string) *MockOpenerBuilder_Name_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockOpenerBuilder_Name_Call) RunAndReturn(run func() string) *MockOpenerBuilder_Name_Call { + _c.Call.Return(run) + return _c +} + +// NewMockOpenerBuilder creates a new instance of MockOpenerBuilder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockOpenerBuilder(t interface { + mock.TestingT + Cleanup(func()) +}) *MockOpenerBuilder { + mock := &MockOpenerBuilder{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/mock_wal/mock_Scanner.go b/internal/mocks/streamingnode/server/mock_wal/mock_Scanner.go new file mode 100644 index 000000000000..1ea03c53cc29 --- /dev/null +++ b/internal/mocks/streamingnode/server/mock_wal/mock_Scanner.go @@ -0,0 +1,203 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_wal + +import ( + message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockScanner is an autogenerated mock type for the Scanner type +type MockScanner struct { + mock.Mock +} + +type MockScanner_Expecter struct { + mock *mock.Mock +} + +func (_m *MockScanner) EXPECT() *MockScanner_Expecter { + return &MockScanner_Expecter{mock: &_m.Mock} +} + +// Chan provides a mock function with given fields: +func (_m *MockScanner) Chan() <-chan message.ImmutableMessage { + ret := _m.Called() + + var r0 <-chan message.ImmutableMessage + if rf, ok := ret.Get(0).(func() <-chan message.ImmutableMessage); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan message.ImmutableMessage) + } + } + + return r0 +} + +// MockScanner_Chan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Chan' +type MockScanner_Chan_Call struct { + *mock.Call +} + +// Chan is a helper method to define mock.On call +func (_e *MockScanner_Expecter) Chan() *MockScanner_Chan_Call { + return &MockScanner_Chan_Call{Call: _e.mock.On("Chan")} +} + +func (_c *MockScanner_Chan_Call) Run(run func()) *MockScanner_Chan_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScanner_Chan_Call) Return(_a0 <-chan message.ImmutableMessage) *MockScanner_Chan_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScanner_Chan_Call) RunAndReturn(run func() <-chan message.ImmutableMessage) *MockScanner_Chan_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockScanner) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockScanner_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockScanner_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockScanner_Expecter) Close() *MockScanner_Close_Call { + return &MockScanner_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockScanner_Close_Call) Run(run func()) *MockScanner_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScanner_Close_Call) Return(_a0 error) *MockScanner_Close_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScanner_Close_Call) RunAndReturn(run func() error) *MockScanner_Close_Call { + _c.Call.Return(run) + return _c +} + +// Done provides a mock function with given fields: +func (_m *MockScanner) Done() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// MockScanner_Done_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Done' +type MockScanner_Done_Call struct { + *mock.Call +} + +// Done is a helper method to define mock.On call +func (_e *MockScanner_Expecter) Done() *MockScanner_Done_Call { + return &MockScanner_Done_Call{Call: _e.mock.On("Done")} +} + +func (_c *MockScanner_Done_Call) Run(run func()) *MockScanner_Done_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScanner_Done_Call) Return(_a0 <-chan struct{}) *MockScanner_Done_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScanner_Done_Call) RunAndReturn(run func() <-chan struct{}) *MockScanner_Done_Call { + _c.Call.Return(run) + return _c +} + +// Error provides a mock function with given fields: +func (_m *MockScanner) Error() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockScanner_Error_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Error' +type MockScanner_Error_Call struct { + *mock.Call +} + +// Error is a helper method to define mock.On call +func (_e *MockScanner_Expecter) Error() *MockScanner_Error_Call { + return &MockScanner_Error_Call{Call: _e.mock.On("Error")} +} + +func (_c *MockScanner_Error_Call) Run(run func()) *MockScanner_Error_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScanner_Error_Call) Return(_a0 error) *MockScanner_Error_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScanner_Error_Call) RunAndReturn(run func() error) *MockScanner_Error_Call { + _c.Call.Return(run) + return _c +} + +// NewMockScanner creates a new instance of MockScanner. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockScanner(t interface { + mock.TestingT + Cleanup(func()) +}) *MockScanner { + mock := &MockScanner{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go b/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go new file mode 100644 index 000000000000..b2f2f49517e0 --- /dev/null +++ b/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go @@ -0,0 +1,261 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_wal + +import ( + context "context" + + message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + mock "github.com/stretchr/testify/mock" + + streamingpb "github.com/milvus-io/milvus/internal/proto/streamingpb" + + wal "github.com/milvus-io/milvus/internal/streamingnode/server/wal" +) + +// MockWAL is an autogenerated mock type for the WAL type +type MockWAL struct { + mock.Mock +} + +type MockWAL_Expecter struct { + mock *mock.Mock +} + +func (_m *MockWAL) EXPECT() *MockWAL_Expecter { + return &MockWAL_Expecter{mock: &_m.Mock} +} + +// Append provides a mock function with given fields: ctx, msg +func (_m *MockWAL) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + ret := _m.Called(ctx, msg) + + var r0 message.MessageID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (message.MessageID, error)); ok { + return rf(ctx, msg) + } + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) message.MessageID); ok { + r0 = rf(ctx, msg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MessageID) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage) error); ok { + r1 = rf(ctx, msg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWAL_Append_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Append' +type MockWAL_Append_Call struct { + *mock.Call +} + +// Append is a helper method to define mock.On call +// - ctx context.Context +// - msg message.MutableMessage +func (_e *MockWAL_Expecter) Append(ctx interface{}, msg interface{}) *MockWAL_Append_Call { + return &MockWAL_Append_Call{Call: _e.mock.On("Append", ctx, msg)} +} + +func (_c *MockWAL_Append_Call) Run(run func(ctx context.Context, msg message.MutableMessage)) *MockWAL_Append_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MutableMessage)) + }) + return _c +} + +func (_c *MockWAL_Append_Call) Return(_a0 message.MessageID, _a1 error) *MockWAL_Append_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWAL_Append_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (message.MessageID, error)) *MockWAL_Append_Call { + _c.Call.Return(run) + return _c +} + +// AppendAsync provides a mock function with given fields: ctx, msg, cb +func (_m *MockWAL) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error)) { + _m.Called(ctx, msg, cb) +} + +// MockWAL_AppendAsync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AppendAsync' +type MockWAL_AppendAsync_Call struct { + *mock.Call +} + +// AppendAsync is a helper method to define mock.On call +// - ctx context.Context +// - msg message.MutableMessage +// - cb func(message.MessageID , error) +func (_e *MockWAL_Expecter) AppendAsync(ctx interface{}, msg interface{}, cb interface{}) *MockWAL_AppendAsync_Call { + return &MockWAL_AppendAsync_Call{Call: _e.mock.On("AppendAsync", ctx, msg, cb)} +} + +func (_c *MockWAL_AppendAsync_Call) Run(run func(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error))) *MockWAL_AppendAsync_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(message.MessageID, error))) + }) + return _c +} + +func (_c *MockWAL_AppendAsync_Call) Return() *MockWAL_AppendAsync_Call { + _c.Call.Return() + return _c +} + +func (_c *MockWAL_AppendAsync_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(message.MessageID, error))) *MockWAL_AppendAsync_Call { + _c.Call.Return(run) + return _c +} + +// Channel provides a mock function with given fields: +func (_m *MockWAL) Channel() *streamingpb.PChannelInfo { + ret := _m.Called() + + var r0 *streamingpb.PChannelInfo + if rf, ok := ret.Get(0).(func() *streamingpb.PChannelInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*streamingpb.PChannelInfo) + } + } + + return r0 +} + +// MockWAL_Channel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Channel' +type MockWAL_Channel_Call struct { + *mock.Call +} + +// Channel is a helper method to define mock.On call +func (_e *MockWAL_Expecter) Channel() *MockWAL_Channel_Call { + return &MockWAL_Channel_Call{Call: _e.mock.On("Channel")} +} + +func (_c *MockWAL_Channel_Call) Run(run func()) *MockWAL_Channel_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWAL_Channel_Call) Return(_a0 *streamingpb.PChannelInfo) *MockWAL_Channel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWAL_Channel_Call) RunAndReturn(run func() *streamingpb.PChannelInfo) *MockWAL_Channel_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockWAL) Close() { + _m.Called() +} + +// MockWAL_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockWAL_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockWAL_Expecter) Close() *MockWAL_Close_Call { + return &MockWAL_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockWAL_Close_Call) Run(run func()) *MockWAL_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWAL_Close_Call) Return() *MockWAL_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockWAL_Close_Call) RunAndReturn(run func()) *MockWAL_Close_Call { + _c.Call.Return(run) + return _c +} + +// Read provides a mock function with given fields: ctx, deliverPolicy +func (_m *MockWAL) Read(ctx context.Context, deliverPolicy wal.ReadOption) (wal.Scanner, error) { + ret := _m.Called(ctx, deliverPolicy) + + var r0 wal.Scanner + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, wal.ReadOption) (wal.Scanner, error)); ok { + return rf(ctx, deliverPolicy) + } + if rf, ok := ret.Get(0).(func(context.Context, wal.ReadOption) wal.Scanner); ok { + r0 = rf(ctx, deliverPolicy) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(wal.Scanner) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, wal.ReadOption) error); ok { + r1 = rf(ctx, deliverPolicy) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWAL_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read' +type MockWAL_Read_Call struct { + *mock.Call +} + +// Read is a helper method to define mock.On call +// - ctx context.Context +// - deliverPolicy wal.ReadOption +func (_e *MockWAL_Expecter) Read(ctx interface{}, deliverPolicy interface{}) *MockWAL_Read_Call { + return &MockWAL_Read_Call{Call: _e.mock.On("Read", ctx, deliverPolicy)} +} + +func (_c *MockWAL_Read_Call) Run(run func(ctx context.Context, deliverPolicy wal.ReadOption)) *MockWAL_Read_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(wal.ReadOption)) + }) + return _c +} + +func (_c *MockWAL_Read_Call) Return(_a0 wal.Scanner, _a1 error) *MockWAL_Read_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWAL_Read_Call) RunAndReturn(run func(context.Context, wal.ReadOption) (wal.Scanner, error)) *MockWAL_Read_Call { + _c.Call.Return(run) + return _c +} + +// NewMockWAL creates a new instance of MockWAL. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockWAL(t interface { + mock.TestingT + Cleanup(func()) +}) *MockWAL { + mock := &MockWAL{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_Interceptor.go b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_Interceptor.go new file mode 100644 index 000000000000..35eb067f5cd8 --- /dev/null +++ b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_Interceptor.go @@ -0,0 +1,125 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + context "context" + + message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockInterceptor is an autogenerated mock type for the Interceptor type +type MockInterceptor struct { + mock.Mock +} + +type MockInterceptor_Expecter struct { + mock *mock.Mock +} + +func (_m *MockInterceptor) EXPECT() *MockInterceptor_Expecter { + return &MockInterceptor_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *MockInterceptor) Close() { + _m.Called() +} + +// MockInterceptor_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockInterceptor_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockInterceptor_Expecter) Close() *MockInterceptor_Close_Call { + return &MockInterceptor_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockInterceptor_Close_Call) Run(run func()) *MockInterceptor_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockInterceptor_Close_Call) Return() *MockInterceptor_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockInterceptor_Close_Call) RunAndReturn(run func()) *MockInterceptor_Close_Call { + _c.Call.Return(run) + return _c +} + +// DoAppend provides a mock function with given fields: ctx, msg, append +func (_m *MockInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) { + ret := _m.Called(ctx, msg, append) + + var r0 message.MessageID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)); ok { + return rf(ctx, msg, append) + } + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) message.MessageID); ok { + r0 = rf(ctx, msg, append) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MessageID) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) error); ok { + r1 = rf(ctx, msg, append) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockInterceptor_DoAppend_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DoAppend' +type MockInterceptor_DoAppend_Call struct { + *mock.Call +} + +// DoAppend is a helper method to define mock.On call +// - ctx context.Context +// - msg message.MutableMessage +// - append func(context.Context , message.MutableMessage)(message.MessageID , error) +func (_e *MockInterceptor_Expecter) DoAppend(ctx interface{}, msg interface{}, append interface{}) *MockInterceptor_DoAppend_Call { + return &MockInterceptor_DoAppend_Call{Call: _e.mock.On("DoAppend", ctx, msg, append)} +} + +func (_c *MockInterceptor_DoAppend_Call) Run(run func(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error))) *MockInterceptor_DoAppend_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(context.Context, message.MutableMessage) (message.MessageID, error))) + }) + return _c +} + +func (_c *MockInterceptor_DoAppend_Call) Return(_a0 message.MessageID, _a1 error) *MockInterceptor_DoAppend_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockInterceptor_DoAppend_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)) *MockInterceptor_DoAppend_Call { + _c.Call.Return(run) + return _c +} + +// NewMockInterceptor creates a new instance of MockInterceptor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockInterceptor(t interface { + mock.TestingT + Cleanup(func()) +}) *MockInterceptor { + mock := &MockInterceptor{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorBuilder.go b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorBuilder.go new file mode 100644 index 000000000000..9a25052470fc --- /dev/null +++ b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorBuilder.go @@ -0,0 +1,79 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + mock "github.com/stretchr/testify/mock" +) + +// MockInterceptorBuilder is an autogenerated mock type for the InterceptorBuilder type +type MockInterceptorBuilder struct { + mock.Mock +} + +type MockInterceptorBuilder_Expecter struct { + mock *mock.Mock +} + +func (_m *MockInterceptorBuilder) EXPECT() *MockInterceptorBuilder_Expecter { + return &MockInterceptorBuilder_Expecter{mock: &_m.Mock} +} + +// Build provides a mock function with given fields: wal +func (_m *MockInterceptorBuilder) Build(wal <-chan walimpls.WALImpls) walimpls.BasicInterceptor { + ret := _m.Called(wal) + + var r0 walimpls.BasicInterceptor + if rf, ok := ret.Get(0).(func(<-chan walimpls.WALImpls) walimpls.BasicInterceptor); ok { + r0 = rf(wal) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(walimpls.BasicInterceptor) + } + } + + return r0 +} + +// MockInterceptorBuilder_Build_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Build' +type MockInterceptorBuilder_Build_Call struct { + *mock.Call +} + +// Build is a helper method to define mock.On call +// - wal <-chan walimpls.WALImpls +func (_e *MockInterceptorBuilder_Expecter) Build(wal interface{}) *MockInterceptorBuilder_Build_Call { + return &MockInterceptorBuilder_Build_Call{Call: _e.mock.On("Build", wal)} +} + +func (_c *MockInterceptorBuilder_Build_Call) Run(run func(wal <-chan walimpls.WALImpls)) *MockInterceptorBuilder_Build_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(<-chan walimpls.WALImpls)) + }) + return _c +} + +func (_c *MockInterceptorBuilder_Build_Call) Return(_a0 walimpls.BasicInterceptor) *MockInterceptorBuilder_Build_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockInterceptorBuilder_Build_Call) RunAndReturn(run func(<-chan walimpls.WALImpls) walimpls.BasicInterceptor) *MockInterceptorBuilder_Build_Call { + _c.Call.Return(run) + return _c +} + +// NewMockInterceptorBuilder creates a new instance of MockInterceptorBuilder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockInterceptorBuilder(t interface { + mock.TestingT + Cleanup(func()) +}) *MockInterceptorBuilder { + mock := &MockInterceptorBuilder{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorWithReady.go b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorWithReady.go new file mode 100644 index 000000000000..5744967b56e4 --- /dev/null +++ b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorWithReady.go @@ -0,0 +1,168 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + context "context" + + message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockInterceptorWithReady is an autogenerated mock type for the InterceptorWithReady type +type MockInterceptorWithReady struct { + mock.Mock +} + +type MockInterceptorWithReady_Expecter struct { + mock *mock.Mock +} + +func (_m *MockInterceptorWithReady) EXPECT() *MockInterceptorWithReady_Expecter { + return &MockInterceptorWithReady_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *MockInterceptorWithReady) Close() { + _m.Called() +} + +// MockInterceptorWithReady_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockInterceptorWithReady_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockInterceptorWithReady_Expecter) Close() *MockInterceptorWithReady_Close_Call { + return &MockInterceptorWithReady_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockInterceptorWithReady_Close_Call) Run(run func()) *MockInterceptorWithReady_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockInterceptorWithReady_Close_Call) Return() *MockInterceptorWithReady_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockInterceptorWithReady_Close_Call) RunAndReturn(run func()) *MockInterceptorWithReady_Close_Call { + _c.Call.Return(run) + return _c +} + +// DoAppend provides a mock function with given fields: ctx, msg, append +func (_m *MockInterceptorWithReady) DoAppend(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) { + ret := _m.Called(ctx, msg, append) + + var r0 message.MessageID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)); ok { + return rf(ctx, msg, append) + } + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) message.MessageID); ok { + r0 = rf(ctx, msg, append) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MessageID) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) error); ok { + r1 = rf(ctx, msg, append) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockInterceptorWithReady_DoAppend_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DoAppend' +type MockInterceptorWithReady_DoAppend_Call struct { + *mock.Call +} + +// DoAppend is a helper method to define mock.On call +// - ctx context.Context +// - msg message.MutableMessage +// - append func(context.Context , message.MutableMessage)(message.MessageID , error) +func (_e *MockInterceptorWithReady_Expecter) DoAppend(ctx interface{}, msg interface{}, append interface{}) *MockInterceptorWithReady_DoAppend_Call { + return &MockInterceptorWithReady_DoAppend_Call{Call: _e.mock.On("DoAppend", ctx, msg, append)} +} + +func (_c *MockInterceptorWithReady_DoAppend_Call) Run(run func(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error))) *MockInterceptorWithReady_DoAppend_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(context.Context, message.MutableMessage) (message.MessageID, error))) + }) + return _c +} + +func (_c *MockInterceptorWithReady_DoAppend_Call) Return(_a0 message.MessageID, _a1 error) *MockInterceptorWithReady_DoAppend_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockInterceptorWithReady_DoAppend_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)) *MockInterceptorWithReady_DoAppend_Call { + _c.Call.Return(run) + return _c +} + +// Ready provides a mock function with given fields: +func (_m *MockInterceptorWithReady) Ready() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// MockInterceptorWithReady_Ready_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ready' +type MockInterceptorWithReady_Ready_Call struct { + *mock.Call +} + +// Ready is a helper method to define mock.On call +func (_e *MockInterceptorWithReady_Expecter) Ready() *MockInterceptorWithReady_Ready_Call { + return &MockInterceptorWithReady_Ready_Call{Call: _e.mock.On("Ready")} +} + +func (_c *MockInterceptorWithReady_Ready_Call) Run(run func()) *MockInterceptorWithReady_Ready_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockInterceptorWithReady_Ready_Call) Return(_a0 <-chan struct{}) *MockInterceptorWithReady_Ready_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockInterceptorWithReady_Ready_Call) RunAndReturn(run func() <-chan struct{}) *MockInterceptorWithReady_Ready_Call { + _c.Call.Return(run) + return _c +} + +// NewMockInterceptorWithReady creates a new instance of MockInterceptorWithReady. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockInterceptorWithReady(t interface { + mock.TestingT + Cleanup(func()) +}) *MockInterceptorWithReady { + mock := &MockInterceptorWithReady{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerBuilderImpls.go b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerBuilderImpls.go new file mode 100644 index 000000000000..34fa8c45e39e --- /dev/null +++ b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerBuilderImpls.go @@ -0,0 +1,129 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + mock "github.com/stretchr/testify/mock" +) + +// MockOpenerBuilderImpls is an autogenerated mock type for the OpenerBuilderImpls type +type MockOpenerBuilderImpls struct { + mock.Mock +} + +type MockOpenerBuilderImpls_Expecter struct { + mock *mock.Mock +} + +func (_m *MockOpenerBuilderImpls) EXPECT() *MockOpenerBuilderImpls_Expecter { + return &MockOpenerBuilderImpls_Expecter{mock: &_m.Mock} +} + +// Build provides a mock function with given fields: +func (_m *MockOpenerBuilderImpls) Build() (walimpls.OpenerImpls, error) { + ret := _m.Called() + + var r0 walimpls.OpenerImpls + var r1 error + if rf, ok := ret.Get(0).(func() (walimpls.OpenerImpls, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() walimpls.OpenerImpls); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(walimpls.OpenerImpls) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockOpenerBuilderImpls_Build_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Build' +type MockOpenerBuilderImpls_Build_Call struct { + *mock.Call +} + +// Build is a helper method to define mock.On call +func (_e *MockOpenerBuilderImpls_Expecter) Build() *MockOpenerBuilderImpls_Build_Call { + return &MockOpenerBuilderImpls_Build_Call{Call: _e.mock.On("Build")} +} + +func (_c *MockOpenerBuilderImpls_Build_Call) Run(run func()) *MockOpenerBuilderImpls_Build_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpenerBuilderImpls_Build_Call) Return(_a0 walimpls.OpenerImpls, _a1 error) *MockOpenerBuilderImpls_Build_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockOpenerBuilderImpls_Build_Call) RunAndReturn(run func() (walimpls.OpenerImpls, error)) *MockOpenerBuilderImpls_Build_Call { + _c.Call.Return(run) + return _c +} + +// Name provides a mock function with given fields: +func (_m *MockOpenerBuilderImpls) Name() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockOpenerBuilderImpls_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name' +type MockOpenerBuilderImpls_Name_Call struct { + *mock.Call +} + +// Name is a helper method to define mock.On call +func (_e *MockOpenerBuilderImpls_Expecter) Name() *MockOpenerBuilderImpls_Name_Call { + return &MockOpenerBuilderImpls_Name_Call{Call: _e.mock.On("Name")} +} + +func (_c *MockOpenerBuilderImpls_Name_Call) Run(run func()) *MockOpenerBuilderImpls_Name_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpenerBuilderImpls_Name_Call) Return(_a0 string) *MockOpenerBuilderImpls_Name_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockOpenerBuilderImpls_Name_Call) RunAndReturn(run func() string) *MockOpenerBuilderImpls_Name_Call { + _c.Call.Return(run) + return _c +} + +// NewMockOpenerBuilderImpls creates a new instance of MockOpenerBuilderImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockOpenerBuilderImpls(t interface { + mock.TestingT + Cleanup(func()) +}) *MockOpenerBuilderImpls { + mock := &MockOpenerBuilderImpls{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerImpls.go b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerImpls.go new file mode 100644 index 000000000000..b4f8c62190a4 --- /dev/null +++ b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerImpls.go @@ -0,0 +1,124 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + context "context" + + walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + mock "github.com/stretchr/testify/mock" +) + +// MockOpenerImpls is an autogenerated mock type for the OpenerImpls type +type MockOpenerImpls struct { + mock.Mock +} + +type MockOpenerImpls_Expecter struct { + mock *mock.Mock +} + +func (_m *MockOpenerImpls) EXPECT() *MockOpenerImpls_Expecter { + return &MockOpenerImpls_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *MockOpenerImpls) Close() { + _m.Called() +} + +// MockOpenerImpls_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockOpenerImpls_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockOpenerImpls_Expecter) Close() *MockOpenerImpls_Close_Call { + return &MockOpenerImpls_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockOpenerImpls_Close_Call) Run(run func()) *MockOpenerImpls_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpenerImpls_Close_Call) Return() *MockOpenerImpls_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockOpenerImpls_Close_Call) RunAndReturn(run func()) *MockOpenerImpls_Close_Call { + _c.Call.Return(run) + return _c +} + +// Open provides a mock function with given fields: ctx, opt +func (_m *MockOpenerImpls) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) { + ret := _m.Called(ctx, opt) + + var r0 walimpls.WALImpls + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *walimpls.OpenOption) (walimpls.WALImpls, error)); ok { + return rf(ctx, opt) + } + if rf, ok := ret.Get(0).(func(context.Context, *walimpls.OpenOption) walimpls.WALImpls); ok { + r0 = rf(ctx, opt) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(walimpls.WALImpls) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *walimpls.OpenOption) error); ok { + r1 = rf(ctx, opt) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockOpenerImpls_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' +type MockOpenerImpls_Open_Call struct { + *mock.Call +} + +// Open is a helper method to define mock.On call +// - ctx context.Context +// - opt *walimpls.OpenOption +func (_e *MockOpenerImpls_Expecter) Open(ctx interface{}, opt interface{}) *MockOpenerImpls_Open_Call { + return &MockOpenerImpls_Open_Call{Call: _e.mock.On("Open", ctx, opt)} +} + +func (_c *MockOpenerImpls_Open_Call) Run(run func(ctx context.Context, opt *walimpls.OpenOption)) *MockOpenerImpls_Open_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*walimpls.OpenOption)) + }) + return _c +} + +func (_c *MockOpenerImpls_Open_Call) Return(_a0 walimpls.WALImpls, _a1 error) *MockOpenerImpls_Open_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockOpenerImpls_Open_Call) RunAndReturn(run func(context.Context, *walimpls.OpenOption) (walimpls.WALImpls, error)) *MockOpenerImpls_Open_Call { + _c.Call.Return(run) + return _c +} + +// NewMockOpenerImpls creates a new instance of MockOpenerImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockOpenerImpls(t interface { + mock.TestingT + Cleanup(func()) +}) *MockOpenerImpls { + mock := &MockOpenerImpls{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_ScannerImpls.go b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_ScannerImpls.go new file mode 100644 index 000000000000..06613efa2e91 --- /dev/null +++ b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_ScannerImpls.go @@ -0,0 +1,244 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockScannerImpls is an autogenerated mock type for the ScannerImpls type +type MockScannerImpls struct { + mock.Mock +} + +type MockScannerImpls_Expecter struct { + mock *mock.Mock +} + +func (_m *MockScannerImpls) EXPECT() *MockScannerImpls_Expecter { + return &MockScannerImpls_Expecter{mock: &_m.Mock} +} + +// Chan provides a mock function with given fields: +func (_m *MockScannerImpls) Chan() <-chan message.ImmutableMessage { + ret := _m.Called() + + var r0 <-chan message.ImmutableMessage + if rf, ok := ret.Get(0).(func() <-chan message.ImmutableMessage); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan message.ImmutableMessage) + } + } + + return r0 +} + +// MockScannerImpls_Chan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Chan' +type MockScannerImpls_Chan_Call struct { + *mock.Call +} + +// Chan is a helper method to define mock.On call +func (_e *MockScannerImpls_Expecter) Chan() *MockScannerImpls_Chan_Call { + return &MockScannerImpls_Chan_Call{Call: _e.mock.On("Chan")} +} + +func (_c *MockScannerImpls_Chan_Call) Run(run func()) *MockScannerImpls_Chan_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScannerImpls_Chan_Call) Return(_a0 <-chan message.ImmutableMessage) *MockScannerImpls_Chan_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScannerImpls_Chan_Call) RunAndReturn(run func() <-chan message.ImmutableMessage) *MockScannerImpls_Chan_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockScannerImpls) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockScannerImpls_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockScannerImpls_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockScannerImpls_Expecter) Close() *MockScannerImpls_Close_Call { + return &MockScannerImpls_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockScannerImpls_Close_Call) Run(run func()) *MockScannerImpls_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScannerImpls_Close_Call) Return(_a0 error) *MockScannerImpls_Close_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScannerImpls_Close_Call) RunAndReturn(run func() error) *MockScannerImpls_Close_Call { + _c.Call.Return(run) + return _c +} + +// Done provides a mock function with given fields: +func (_m *MockScannerImpls) Done() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// MockScannerImpls_Done_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Done' +type MockScannerImpls_Done_Call struct { + *mock.Call +} + +// Done is a helper method to define mock.On call +func (_e *MockScannerImpls_Expecter) Done() *MockScannerImpls_Done_Call { + return &MockScannerImpls_Done_Call{Call: _e.mock.On("Done")} +} + +func (_c *MockScannerImpls_Done_Call) Run(run func()) *MockScannerImpls_Done_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScannerImpls_Done_Call) Return(_a0 <-chan struct{}) *MockScannerImpls_Done_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScannerImpls_Done_Call) RunAndReturn(run func() <-chan struct{}) *MockScannerImpls_Done_Call { + _c.Call.Return(run) + return _c +} + +// Error provides a mock function with given fields: +func (_m *MockScannerImpls) Error() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockScannerImpls_Error_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Error' +type MockScannerImpls_Error_Call struct { + *mock.Call +} + +// Error is a helper method to define mock.On call +func (_e *MockScannerImpls_Expecter) Error() *MockScannerImpls_Error_Call { + return &MockScannerImpls_Error_Call{Call: _e.mock.On("Error")} +} + +func (_c *MockScannerImpls_Error_Call) Run(run func()) *MockScannerImpls_Error_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScannerImpls_Error_Call) Return(_a0 error) *MockScannerImpls_Error_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScannerImpls_Error_Call) RunAndReturn(run func() error) *MockScannerImpls_Error_Call { + _c.Call.Return(run) + return _c +} + +// Name provides a mock function with given fields: +func (_m *MockScannerImpls) Name() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockScannerImpls_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name' +type MockScannerImpls_Name_Call struct { + *mock.Call +} + +// Name is a helper method to define mock.On call +func (_e *MockScannerImpls_Expecter) Name() *MockScannerImpls_Name_Call { + return &MockScannerImpls_Name_Call{Call: _e.mock.On("Name")} +} + +func (_c *MockScannerImpls_Name_Call) Run(run func()) *MockScannerImpls_Name_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScannerImpls_Name_Call) Return(_a0 string) *MockScannerImpls_Name_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScannerImpls_Name_Call) RunAndReturn(run func() string) *MockScannerImpls_Name_Call { + _c.Call.Return(run) + return _c +} + +// NewMockScannerImpls creates a new instance of MockScannerImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockScannerImpls(t interface { + mock.TestingT + Cleanup(func()) +}) *MockScannerImpls { + mock := &MockScannerImpls{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_WALImpls.go b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_WALImpls.go new file mode 100644 index 000000000000..54eb4853e0fd --- /dev/null +++ b/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_WALImpls.go @@ -0,0 +1,226 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + context "context" + + message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + mock "github.com/stretchr/testify/mock" + + streamingpb "github.com/milvus-io/milvus/internal/proto/streamingpb" + + walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" +) + +// MockWALImpls is an autogenerated mock type for the WALImpls type +type MockWALImpls struct { + mock.Mock +} + +type MockWALImpls_Expecter struct { + mock *mock.Mock +} + +func (_m *MockWALImpls) EXPECT() *MockWALImpls_Expecter { + return &MockWALImpls_Expecter{mock: &_m.Mock} +} + +// Append provides a mock function with given fields: ctx, msg +func (_m *MockWALImpls) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + ret := _m.Called(ctx, msg) + + var r0 message.MessageID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (message.MessageID, error)); ok { + return rf(ctx, msg) + } + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) message.MessageID); ok { + r0 = rf(ctx, msg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MessageID) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage) error); ok { + r1 = rf(ctx, msg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWALImpls_Append_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Append' +type MockWALImpls_Append_Call struct { + *mock.Call +} + +// Append is a helper method to define mock.On call +// - ctx context.Context +// - msg message.MutableMessage +func (_e *MockWALImpls_Expecter) Append(ctx interface{}, msg interface{}) *MockWALImpls_Append_Call { + return &MockWALImpls_Append_Call{Call: _e.mock.On("Append", ctx, msg)} +} + +func (_c *MockWALImpls_Append_Call) Run(run func(ctx context.Context, msg message.MutableMessage)) *MockWALImpls_Append_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MutableMessage)) + }) + return _c +} + +func (_c *MockWALImpls_Append_Call) Return(_a0 message.MessageID, _a1 error) *MockWALImpls_Append_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWALImpls_Append_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (message.MessageID, error)) *MockWALImpls_Append_Call { + _c.Call.Return(run) + return _c +} + +// Channel provides a mock function with given fields: +func (_m *MockWALImpls) Channel() *streamingpb.PChannelInfo { + ret := _m.Called() + + var r0 *streamingpb.PChannelInfo + if rf, ok := ret.Get(0).(func() *streamingpb.PChannelInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*streamingpb.PChannelInfo) + } + } + + return r0 +} + +// MockWALImpls_Channel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Channel' +type MockWALImpls_Channel_Call struct { + *mock.Call +} + +// Channel is a helper method to define mock.On call +func (_e *MockWALImpls_Expecter) Channel() *MockWALImpls_Channel_Call { + return &MockWALImpls_Channel_Call{Call: _e.mock.On("Channel")} +} + +func (_c *MockWALImpls_Channel_Call) Run(run func()) *MockWALImpls_Channel_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWALImpls_Channel_Call) Return(_a0 *streamingpb.PChannelInfo) *MockWALImpls_Channel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWALImpls_Channel_Call) RunAndReturn(run func() *streamingpb.PChannelInfo) *MockWALImpls_Channel_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockWALImpls) Close() { + _m.Called() +} + +// MockWALImpls_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockWALImpls_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockWALImpls_Expecter) Close() *MockWALImpls_Close_Call { + return &MockWALImpls_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockWALImpls_Close_Call) Run(run func()) *MockWALImpls_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWALImpls_Close_Call) Return() *MockWALImpls_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockWALImpls_Close_Call) RunAndReturn(run func()) *MockWALImpls_Close_Call { + _c.Call.Return(run) + return _c +} + +// Read provides a mock function with given fields: ctx, opts +func (_m *MockWALImpls) Read(ctx context.Context, opts walimpls.ReadOption) (walimpls.ScannerImpls, error) { + ret := _m.Called(ctx, opts) + + var r0 walimpls.ScannerImpls + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, walimpls.ReadOption) (walimpls.ScannerImpls, error)); ok { + return rf(ctx, opts) + } + if rf, ok := ret.Get(0).(func(context.Context, walimpls.ReadOption) walimpls.ScannerImpls); ok { + r0 = rf(ctx, opts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(walimpls.ScannerImpls) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, walimpls.ReadOption) error); ok { + r1 = rf(ctx, opts) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWALImpls_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read' +type MockWALImpls_Read_Call struct { + *mock.Call +} + +// Read is a helper method to define mock.On call +// - ctx context.Context +// - opts walimpls.ReadOption +func (_e *MockWALImpls_Expecter) Read(ctx interface{}, opts interface{}) *MockWALImpls_Read_Call { + return &MockWALImpls_Read_Call{Call: _e.mock.On("Read", ctx, opts)} +} + +func (_c *MockWALImpls_Read_Call) Run(run func(ctx context.Context, opts walimpls.ReadOption)) *MockWALImpls_Read_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(walimpls.ReadOption)) + }) + return _c +} + +func (_c *MockWALImpls_Read_Call) Return(_a0 walimpls.ScannerImpls, _a1 error) *MockWALImpls_Read_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWALImpls_Read_Call) RunAndReturn(run func(context.Context, walimpls.ReadOption) (walimpls.ScannerImpls, error)) *MockWALImpls_Read_Call { + _c.Call.Return(run) + return _c +} + +// NewMockWALImpls creates a new instance of MockWALImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockWALImpls(t interface { + mock.TestingT + Cleanup(func()) +}) *MockWALImpls { + mock := &MockWALImpls{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/util/logserviceutil/mock_message/mock_ImmutableMessage.go b/internal/mocks/util/streamingutil/mock_message/mock_ImmutableMessage.go similarity index 99% rename from internal/mocks/util/logserviceutil/mock_message/mock_ImmutableMessage.go rename to internal/mocks/util/streamingutil/mock_message/mock_ImmutableMessage.go index cf42d0037e94..df2dae9940cd 100644 --- a/internal/mocks/util/logserviceutil/mock_message/mock_ImmutableMessage.go +++ b/internal/mocks/util/streamingutil/mock_message/mock_ImmutableMessage.go @@ -3,7 +3,7 @@ package mock_message import ( - message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + message "github.com/milvus-io/milvus/internal/util/streamingutil/message" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/util/logserviceutil/mock_message/mock_MessageID.go b/internal/mocks/util/streamingutil/mock_message/mock_MessageID.go similarity index 98% rename from internal/mocks/util/logserviceutil/mock_message/mock_MessageID.go rename to internal/mocks/util/streamingutil/mock_message/mock_MessageID.go index d48ae6cf6819..f160d324a530 100644 --- a/internal/mocks/util/logserviceutil/mock_message/mock_MessageID.go +++ b/internal/mocks/util/streamingutil/mock_message/mock_MessageID.go @@ -3,7 +3,7 @@ package mock_message import ( - message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + message "github.com/milvus-io/milvus/internal/util/streamingutil/message" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go b/internal/mocks/util/streamingutil/mock_message/mock_MutableMessage.go similarity index 68% rename from internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go rename to internal/mocks/util/streamingutil/mock_message/mock_MutableMessage.go index ab62e2a87d3c..5a718756e253 100644 --- a/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go +++ b/internal/mocks/util/streamingutil/mock_message/mock_MutableMessage.go @@ -3,7 +3,7 @@ package mock_message import ( - message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + message "github.com/milvus-io/milvus/internal/util/streamingutil/message" mock "github.com/stretchr/testify/mock" ) @@ -61,6 +61,50 @@ func (_c *MockMutableMessage_EstimateSize_Call) RunAndReturn(run func() int) *Mo return _c } +// IntoImmutableMessage provides a mock function with given fields: msgID +func (_m *MockMutableMessage) IntoImmutableMessage(msgID message.MessageID) message.ImmutableMessage { + ret := _m.Called(msgID) + + var r0 message.ImmutableMessage + if rf, ok := ret.Get(0).(func(message.MessageID) message.ImmutableMessage); ok { + r0 = rf(msgID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.ImmutableMessage) + } + } + + return r0 +} + +// MockMutableMessage_IntoImmutableMessage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IntoImmutableMessage' +type MockMutableMessage_IntoImmutableMessage_Call struct { + *mock.Call +} + +// IntoImmutableMessage is a helper method to define mock.On call +// - msgID message.MessageID +func (_e *MockMutableMessage_Expecter) IntoImmutableMessage(msgID interface{}) *MockMutableMessage_IntoImmutableMessage_Call { + return &MockMutableMessage_IntoImmutableMessage_Call{Call: _e.mock.On("IntoImmutableMessage", msgID)} +} + +func (_c *MockMutableMessage_IntoImmutableMessage_Call) Run(run func(msgID message.MessageID)) *MockMutableMessage_IntoImmutableMessage_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(message.MessageID)) + }) + return _c +} + +func (_c *MockMutableMessage_IntoImmutableMessage_Call) Return(_a0 message.ImmutableMessage) *MockMutableMessage_IntoImmutableMessage_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_IntoImmutableMessage_Call) RunAndReturn(run func(message.MessageID) message.ImmutableMessage) *MockMutableMessage_IntoImmutableMessage_Call { + _c.Call.Return(run) + return _c +} + // MessageType provides a mock function with given fields: func (_m *MockMutableMessage) MessageType() message.MessageType { ret := _m.Called() @@ -188,6 +232,50 @@ func (_c *MockMutableMessage_Properties_Call) RunAndReturn(run func() message.Pr return _c } +// WithLastConfirmed provides a mock function with given fields: id +func (_m *MockMutableMessage) WithLastConfirmed(id message.MessageID) message.MutableMessage { + ret := _m.Called(id) + + var r0 message.MutableMessage + if rf, ok := ret.Get(0).(func(message.MessageID) message.MutableMessage); ok { + r0 = rf(id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MutableMessage) + } + } + + return r0 +} + +// MockMutableMessage_WithLastConfirmed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WithLastConfirmed' +type MockMutableMessage_WithLastConfirmed_Call struct { + *mock.Call +} + +// WithLastConfirmed is a helper method to define mock.On call +// - id message.MessageID +func (_e *MockMutableMessage_Expecter) WithLastConfirmed(id interface{}) *MockMutableMessage_WithLastConfirmed_Call { + return &MockMutableMessage_WithLastConfirmed_Call{Call: _e.mock.On("WithLastConfirmed", id)} +} + +func (_c *MockMutableMessage_WithLastConfirmed_Call) Run(run func(id message.MessageID)) *MockMutableMessage_WithLastConfirmed_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(message.MessageID)) + }) + return _c +} + +func (_c *MockMutableMessage_WithLastConfirmed_Call) Return(_a0 message.MutableMessage) *MockMutableMessage_WithLastConfirmed_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_WithLastConfirmed_Call) RunAndReturn(run func(message.MessageID) message.MutableMessage) *MockMutableMessage_WithLastConfirmed_Call { + _c.Call.Return(run) + return _c +} + // WithTimeTick provides a mock function with given fields: tt func (_m *MockMutableMessage) WithTimeTick(tt uint64) message.MutableMessage { ret := _m.Called(tt) diff --git a/internal/mocks/util/logserviceutil/mock_message/mock_RProperties.go b/internal/mocks/util/streamingutil/mock_message/mock_RProperties.go similarity index 100% rename from internal/mocks/util/logserviceutil/mock_message/mock_RProperties.go rename to internal/mocks/util/streamingutil/mock_message/mock_RProperties.go diff --git a/internal/proto/log.proto b/internal/proto/log.proto deleted file mode 100644 index fb9da3a4c741..000000000000 --- a/internal/proto/log.proto +++ /dev/null @@ -1,18 +0,0 @@ -syntax = "proto3"; - -package milvus.proto.log; - -option go_package = "github.com/milvus-io/milvus/internal/proto/logpb"; - -import "milvus.proto"; -import "google/protobuf/empty.proto"; - -// -// Common -// - -// Message is the basic unit of communication between publisher and consumer. -message Message { - bytes payload = 1; // message body - map properties = 2; // message properties -} diff --git a/internal/proto/streaming.proto b/internal/proto/streaming.proto new file mode 100644 index 000000000000..8be9a14796c9 --- /dev/null +++ b/internal/proto/streaming.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package milvus.proto.log; + +option go_package = "github.com/milvus-io/milvus/internal/proto/streamingpb"; + +import "milvus.proto"; +import "google/protobuf/empty.proto"; + +// +// Common +// + +// MessageID is the unique identifier of a message. +message MessageID { + bytes id = 1; +} + +// Message is the basic unit of communication between publisher and consumer. +message Message { + bytes payload = 1; // message body + map properties = 2; // message properties +} + +// PChannelInfo is the information of a pchannel info. +message PChannelInfo { + string name = 1; // channel name + int64 term = 2; // A monotonic increasing term, every time the channel is recovered or moved to another streamingnode, the term will increase by meta server. + int64 serverID = 3; // The log node id address of the channel. + repeated VChannelInfo vChannelInfos = 4; // PChannel related vchannels. +} + +// VChannelInfo is the information of a vchannel info. +message VChannelInfo { + string name = 1; +} + +message DeliverPolicy { + oneof policy { + google.protobuf.Empty all = 1; // deliver all messages. + google.protobuf.Empty latest = 2; // deliver the latest message. + MessageID startFrom = 3; // deliver message from this message id. [startFrom, ...] + MessageID startAfter = 4; // deliver message after this message id. (startAfter, ...] + } +} diff --git a/internal/proto/streamingpb/extends.go b/internal/proto/streamingpb/extends.go new file mode 100644 index 000000000000..123ec63e1654 --- /dev/null +++ b/internal/proto/streamingpb/extends.go @@ -0,0 +1,42 @@ +package streamingpb + +import ( + "google.golang.org/protobuf/types/known/emptypb" +) + +const ( + ServiceMethodPrefix = "/milvus.proto.log" + InitialTerm = int64(-1) +) + +func NewDeliverAll() *DeliverPolicy { + return &DeliverPolicy{ + Policy: &DeliverPolicy_All{ + All: &emptypb.Empty{}, + }, + } +} + +func NewDeliverLatest() *DeliverPolicy { + return &DeliverPolicy{ + Policy: &DeliverPolicy_Latest{ + Latest: &emptypb.Empty{}, + }, + } +} + +func NewDeliverStartFrom(messageID *MessageID) *DeliverPolicy { + return &DeliverPolicy{ + Policy: &DeliverPolicy_StartFrom{ + StartFrom: messageID, + }, + } +} + +func NewDeliverStartAfter(messageID *MessageID) *DeliverPolicy { + return &DeliverPolicy{ + Policy: &DeliverPolicy_StartAfter{ + StartAfter: messageID, + }, + } +} diff --git a/internal/streamingnode/server/wal/RAEDME.md b/internal/streamingnode/server/wal/RAEDME.md new file mode 100644 index 000000000000..53a94dc0f846 --- /dev/null +++ b/internal/streamingnode/server/wal/RAEDME.md @@ -0,0 +1,66 @@ +# WAL + +`wal` package is the basic defination of wal interface of milvus streamingnode. + +## Project arrangement + +- `/`: only define exposed interfaces. +- `/walimpls/`: define the underlying message system interfaces need to be implemented. +- `/registry/`: A static lifetime registry to regsiter new implementation for inverting dependency. +- `/adaptor/`: adaptors to implement `wal` interface from `walimpls` interface +- `/helper/`: A utility used to help developer to implement `walimpls` conveniently. +- `/utility/`: A utility code for common logic or data structure. + +## Lifetime Of Interfaces + +- `OpenerBuilder` has a static lifetime in a programs: +- `Opener` keep same lifetime with underlying resources (such as mq client). +- `WAL` keep same lifetime with underlying writer of wal, and it's lifetime is always included in related `Opener`. +- `Scanner` keep same lifetime with underlying reader of wal, and it's lifetime is always included in related `WAL`. + +## Add New Implemetation Of WAL + +developper who want to add a new implementation of `wal` should implements the `walimpls` package interfaces. following interfaces is required: + +- `walimpls.OpenerBuilderImpls` +- `walimpls.OpenerImpls` +- `walimpls.ScannerImpls` +- `walimpls.WALImpls` + +`OpenerBuilderImpls` create `OpenerImpls`; `OpenerImpls` creates `WALImpls`; `WALImpls` create `ScannerImpls`. +Then register the implmentation of `walimpls.OpenerBuilderImpls` into `registry` package. + +``` +var _ OpenerBuilderImpls = b{}; +registry.RegisterBuilder(b{}) +``` + +All things have been done. + +## Use WAL + +``` +name := "your builder name" +var yourCh *streamingpb.PChannelInfo + +opener, err := registry.MustGetBuilder(name).Build() +if err != nil { + panic(err) +} +ctx := context.Background() +logger, err := opener.Open(ctx, wal.OpenOption{ + Channel: yourCh +}) +if err != nil { + panic(err) +} +``` +## Adaptor + +package `adaptor` is used to adapt `walimpls` and `wal` together. +common wal function should be implement by it. Such as: + +- lifetime management +- interceptor implementation +- scanner wrapped up +- write ahead cache implementation diff --git a/internal/streamingnode/server/wal/adaptor/builder.go b/internal/streamingnode/server/wal/adaptor/builder.go new file mode 100644 index 000000000000..2dbfa0bbb169 --- /dev/null +++ b/internal/streamingnode/server/wal/adaptor/builder.go @@ -0,0 +1,32 @@ +package adaptor + +import ( + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" +) + +var _ wal.OpenerBuilder = (*builderAdaptorImpl)(nil) + +func AdaptImplsToBuilder(builder walimpls.OpenerBuilderImpls) wal.OpenerBuilder { + return builderAdaptorImpl{ + builder: builder, + } +} + +type builderAdaptorImpl struct { + builder walimpls.OpenerBuilderImpls +} + +func (b builderAdaptorImpl) Name() string { + return b.builder.Name() +} + +func (b builderAdaptorImpl) Build() (wal.Opener, error) { + _, err := b.builder.Build() + if err != nil { + return nil, err + } + return nil, nil + // TODO: wait for implementation. + // return adaptImplsToOpener(o), nil +} diff --git a/internal/streamingnode/server/wal/builder.go b/internal/streamingnode/server/wal/builder.go new file mode 100644 index 000000000000..afb20f2238c1 --- /dev/null +++ b/internal/streamingnode/server/wal/builder.go @@ -0,0 +1,31 @@ +package wal + +import ( + "context" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" +) + +// OpenerBuilder is the interface for build wal opener. +type OpenerBuilder interface { + // Name of the wal builder, should be a lowercase string. + Name() string + + Build() (Opener, error) +} + +// OpenOption is the option for allocating wal instance. +type OpenOption struct { + Channel *streamingpb.PChannelInfo + InterceptorBuilders []walimpls.InterceptorBuilder // Interceptor builders to build when open. +} + +// Opener is the interface for build wal instance. +type Opener interface { + // Open open a wal instance. + Open(ctx context.Context, opt *OpenOption) (WAL, error) + + // Close closes the opener resources. + Close() +} diff --git a/internal/streamingnode/server/wal/helper/scanner_helper.go b/internal/streamingnode/server/wal/helper/scanner_helper.go new file mode 100644 index 000000000000..3d6ef34408e1 --- /dev/null +++ b/internal/streamingnode/server/wal/helper/scanner_helper.go @@ -0,0 +1,58 @@ +package helper + +import "context" + +// NewScannerHelper creates a new ScannerHelper. +func NewScannerHelper(scannerName string) *ScannerHelper { + ctx, cancel := context.WithCancel(context.Background()) + return &ScannerHelper{ + scannerName: scannerName, + ctx: ctx, + cancel: cancel, + finishCh: make(chan struct{}), + err: nil, + } +} + +// ScannerHelper is a helper for scanner implementation. +type ScannerHelper struct { + scannerName string + ctx context.Context + cancel context.CancelFunc + finishCh chan struct{} + err error +} + +// Context returns the context of the scanner, which will cancel when the scanner helper is closed. +func (s *ScannerHelper) Context() context.Context { + return s.ctx +} + +// Name returns the name of the scanner. +func (s *ScannerHelper) Name() string { + return s.scannerName +} + +// Error returns the error of the scanner. +func (s *ScannerHelper) Error() error { + <-s.finishCh + return s.err +} + +// Done returns a channel that will be closed when the scanner is finished. +func (s *ScannerHelper) Done() <-chan struct{} { + return s.finishCh +} + +// Close closes the scanner, block until the Finish is called. +func (s *ScannerHelper) Close() error { + s.cancel() + <-s.finishCh + return s.err +} + +// Finish finishes the scanner with an error. +func (s *ScannerHelper) Finish(err error) { + s.err = err + close(s.finishCh) +} diff --git a/internal/streamingnode/server/wal/helper/scanner_helper_test.go b/internal/streamingnode/server/wal/helper/scanner_helper_test.go new file mode 100644 index 000000000000..e304e32cae75 --- /dev/null +++ b/internal/streamingnode/server/wal/helper/scanner_helper_test.go @@ -0,0 +1,58 @@ +package helper + +import ( + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" +) + +func TestScanner(t *testing.T) { + h := NewScannerHelper("test") + assert.NotNil(t, h.Context()) + assert.Equal(t, h.Name(), "test") + assert.NotNil(t, h.Context()) + + select { + case <-h.Done(): + t.Errorf("should not done") + return + case <-h.Context().Done(): + t.Error("should not cancel") + return + default: + } + + finishErr := errors.New("test") + + ch := make(chan struct{}) + go func() { + defer close(ch) + done := false + cancel := false + cancelCh := h.Context().Done() + doneCh := h.Done() + for i := 0; ; i += 1 { + select { + case <-doneCh: + done = true + doneCh = nil + case <-cancelCh: + cancel = true + cancelCh = nil + h.Finish(finishErr) + } + if cancel && done { + return + } + if i == 0 { + assert.True(t, cancel && !done) + } else if i == 1 { + assert.True(t, cancel && done) + } + } + }() + h.Close() + assert.ErrorIs(t, h.Error(), finishErr) + <-ch +} diff --git a/internal/streamingnode/server/wal/helper/wal_helper.go b/internal/streamingnode/server/wal/helper/wal_helper.go new file mode 100644 index 000000000000..2700da56ce0a --- /dev/null +++ b/internal/streamingnode/server/wal/helper/wal_helper.go @@ -0,0 +1,33 @@ +package helper + +import ( + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + "github.com/milvus-io/milvus/pkg/log" +) + +// NewWALHelper creates a new WALHelper. +func NewWALHelper(opt *walimpls.OpenOption) *WALHelper { + return &WALHelper{ + logger: log.With(zap.Any("channel", opt.Channel)), + channel: opt.Channel, + } +} + +// WALHelper is a helper for WAL implementation. +type WALHelper struct { + logger *log.MLogger + channel *streamingpb.PChannelInfo +} + +// Channel returns the channel of the WAL. +func (w *WALHelper) Channel() *streamingpb.PChannelInfo { + return w.channel +} + +// Log returns the logger of the WAL. +func (w *WALHelper) Log() *log.MLogger { + return w.logger +} diff --git a/internal/streamingnode/server/wal/helper/wal_helper_test.go b/internal/streamingnode/server/wal/helper/wal_helper_test.go new file mode 100644 index 000000000000..497bfbafef2f --- /dev/null +++ b/internal/streamingnode/server/wal/helper/wal_helper_test.go @@ -0,0 +1,24 @@ +package helper + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" +) + +func TestWALHelper(t *testing.T) { + h := NewWALHelper(&walimpls.OpenOption{ + Channel: &streamingpb.PChannelInfo{ + Name: "test", + Term: 1, + ServerID: 1, + VChannelInfos: []*streamingpb.VChannelInfo{}, + }, + }) + assert.NotNil(t, h.Channel()) + assert.Equal(t, h.Channel().Name, "test") + assert.NotNil(t, h.Log()) +} diff --git a/internal/streamingnode/server/wal/registry/registry.go b/internal/streamingnode/server/wal/registry/registry.go new file mode 100644 index 000000000000..5594e09c4abb --- /dev/null +++ b/internal/streamingnode/server/wal/registry/registry.go @@ -0,0 +1,33 @@ +package registry + +import ( + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/adaptor" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// builders is a map of registered wal builders. +var builders typeutil.ConcurrentMap[string, wal.OpenerBuilder] + +// Register registers the wal builder. +// +// NOTE: this function must only be called during initialization time (i.e. in +// an init() function), name of builder is lowercase. If multiple Builder are +// registered with the same name, panic will occur. +func RegisterBuilder(b walimpls.OpenerBuilderImpls) { + bb := adaptor.AdaptImplsToBuilder(b) + _, loaded := builders.GetOrInsert(bb.Name(), bb) + if loaded { + panic("wal builder already registered: " + b.Name()) + } +} + +// MustGetBuilder returns the wal builder by name. +func MustGetBuilder(name string) wal.OpenerBuilder { + b, ok := builders.Get(name) + if !ok { + panic("wal builder not found: " + name) + } + return b +} diff --git a/internal/streamingnode/server/wal/registry/wal_test.go b/internal/streamingnode/server/wal/registry/wal_test.go new file mode 100644 index 000000000000..5a3ec9ce4a56 --- /dev/null +++ b/internal/streamingnode/server/wal/registry/wal_test.go @@ -0,0 +1,48 @@ +package registry + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/mock_walimpls" +) + +func TestRegister(t *testing.T) { + name := "mock" + b := mock_walimpls.NewMockOpenerBuilderImpls(t) + b.EXPECT().Name().Return(name) + + RegisterBuilder(b) + b2 := MustGetBuilder(name) + assert.Equal(t, b.Name(), b2.Name()) + + // Panic if register twice. + assert.Panics(t, func() { + RegisterBuilder(b) + }) + + // Panic if get not exist builder. + assert.Panics(t, func() { + MustGetBuilder("not exist") + }) + + // Test concurrent. + wg := sync.WaitGroup{} + count := 10 + wg.Add(count) + for i := 0; i < count; i++ { + go func(i int) { + defer wg.Done() + name := fmt.Sprintf("mock_%d", i) + b := mock_walimpls.NewMockOpenerBuilderImpls(t) + b.EXPECT().Name().Return(name) + RegisterBuilder(b) + b2 := MustGetBuilder(name) + assert.Equal(t, b.Name(), b2.Name()) + }(i) + } + wg.Wait() +} diff --git a/internal/streamingnode/server/wal/scanner.go b/internal/streamingnode/server/wal/scanner.go new file mode 100644 index 000000000000..761b3e93b120 --- /dev/null +++ b/internal/streamingnode/server/wal/scanner.go @@ -0,0 +1,29 @@ +package wal + +import ( + "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/internal/util/streamingutil/options" +) + +// ReadOption is the option for reading records from the wal. +type ReadOption struct { + DeliverPolicy options.DeliverPolicy + DeliverOrder options.DeliverOrder +} + +// Scanner is the interface for reading records from the wal. +type Scanner interface { + // Chan returns the channel of message. + Chan() <-chan message.ImmutableMessage + + // Error returns the error of scanner failed. + // Will block until scanner is closed or Chan is dry out. + Error() error + + // Done returns a channel which will be closed when scanner is finished or closed. + Done() <-chan struct{} + + // Close the scanner, release the underlying resources. + // Return the error same with `Error` + Close() error +} diff --git a/internal/streamingnode/server/wal/wal.go b/internal/streamingnode/server/wal/wal.go new file mode 100644 index 000000000000..904976b41a75 --- /dev/null +++ b/internal/streamingnode/server/wal/wal.go @@ -0,0 +1,28 @@ +package wal + +import ( + "context" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/util/streamingutil/message" +) + +// WAL is the WAL framework interface. +// !!! Don't implement it directly, implement walimpls.WAL instead. +type WAL interface { + // Channel returns the channel assignment info of the wal. + // Should be read-only. + Channel() *streamingpb.PChannelInfo + + // Append writes a record to the log. + Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) + + // Append a record to the log asynchronously. + AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error)) + + // Read returns a scanner for reading records from the wal. + Read(ctx context.Context, deliverPolicy ReadOption) (Scanner, error) + + // Close closes the wal instance. + Close() +} diff --git a/internal/streamingnode/server/wal/walimpls/builder.go b/internal/streamingnode/server/wal/walimpls/builder.go new file mode 100644 index 000000000000..4a41a7491436 --- /dev/null +++ b/internal/streamingnode/server/wal/walimpls/builder.go @@ -0,0 +1,10 @@ +package walimpls + +// OpenerBuilderImpls is the interface for building wal opener impls. +type OpenerBuilderImpls interface { + // Name of the wal builder, should be a lowercase string. + Name() string + + // Build build a opener impls instance. + Build() (OpenerImpls, error) +} diff --git a/internal/streamingnode/server/wal/walimpls/interceptor.go b/internal/streamingnode/server/wal/walimpls/interceptor.go new file mode 100644 index 000000000000..8e7d2e0ff4cf --- /dev/null +++ b/internal/streamingnode/server/wal/walimpls/interceptor.go @@ -0,0 +1,64 @@ +package walimpls + +import ( + "context" + + "github.com/milvus-io/milvus/internal/util/streamingutil/message" +) + +type ( + // Append is the common function to append a msg to the wal. + Append = func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) + // Read is the common function to read a msg from the wal. + Read = func(ctx context.Context, opt ReadOption) (ScannerImpls, error) +) + +// InterceptorBuilder is the interface to build a interceptor. +// 1. InterceptorBuilder is concurrent safe. +// 2. InterceptorBuilder can used to build a interceptor with cross-wal shared resources. +type InterceptorBuilder interface { + // Build build a interceptor with wal that interceptor will work on. + // the wal object will be sent to the interceptor builder when the wal is constructed with all interceptors. + Build(wal <-chan WALImpls) BasicInterceptor +} + +type BasicInterceptor interface { + // Close the interceptor release the resources. + Close() +} + +type Interceptor interface { + AppendInterceptor + + BasicInterceptor +} + +// AppendInterceptor is the interceptor for Append functions. +// All wal extra operations should be done by these function, such as +// 1. time tick setup. +// 2. unique primary key filter and build. +// 3. index builder. +// 4. cache sync up. +// AppendInterceptor should be lazy initialized and fast execution. +type AppendInterceptor interface { + // Execute the append operation with interceptor. + DoAppend(ctx context.Context, msg message.MutableMessage, append Append) (message.MessageID, error) +} + +type InterceptorReady interface { + // Ready check if interceptor is ready. + // Close of Interceptor would not notify the ready (closed interceptor is not ready). + // So always apply timeout when waiting for ready. + // Some append interceptor may be stateful, such as index builder and unique primary key filter, + // so it need to implement the recovery logic from crash by itself before notifying ready. + // Append operation will block until ready or canceled. + // Consumer do not blocked by it. + Ready() <-chan struct{} +} + +// Some interceptor may need to wait for some resource to be ready or recovery process. +type InterceptorWithReady interface { + Interceptor + + InterceptorReady +} diff --git a/internal/streamingnode/server/wal/walimpls/opener.go b/internal/streamingnode/server/wal/walimpls/opener.go new file mode 100644 index 000000000000..835fd886d844 --- /dev/null +++ b/internal/streamingnode/server/wal/walimpls/opener.go @@ -0,0 +1,21 @@ +package walimpls + +import ( + "context" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" +) + +// OpenOption is the option for allocating wal impls instance. +type OpenOption struct { + Channel *streamingpb.PChannelInfo // Channel to open. +} + +// OpenerImpls is the interface for build WALImpls instance. +type OpenerImpls interface { + // Open open a WALImpls instance. + Open(ctx context.Context, opt *OpenOption) (WALImpls, error) + + // Close release the resources. + Close() +} diff --git a/internal/streamingnode/server/wal/walimpls/scanner.go b/internal/streamingnode/server/wal/walimpls/scanner.go new file mode 100644 index 000000000000..7152eda1cfd8 --- /dev/null +++ b/internal/streamingnode/server/wal/walimpls/scanner.go @@ -0,0 +1,31 @@ +package walimpls + +import ( + "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/internal/util/streamingutil/options" +) + +type ReadOption struct { + Name string + DeliverPolicy options.DeliverPolicy +} + +// ScannerImpls is the interface for reading records from the wal. +type ScannerImpls interface { + // Name returns the name of scanner. + Name() string + + // Chan returns the channel of message. + Chan() <-chan message.ImmutableMessage + + // Error returns the error of scanner failed. + // Will block until scanner is closed or Chan is dry out. + Error() error + + // Done returns a channel which will be closed when scanner is finished or closed. + Done() <-chan struct{} + + // Close the scanner, release the underlying resources. + // Return the error same with `Error` + Close() error +} diff --git a/internal/streamingnode/server/wal/walimpls/test_framework.go b/internal/streamingnode/server/wal/walimpls/test_framework.go new file mode 100644 index 000000000000..bcb8782a569b --- /dev/null +++ b/internal/streamingnode/server/wal/walimpls/test_framework.go @@ -0,0 +1,259 @@ +//go:build test +// +build test + +package walimpls + +import ( + "context" + "fmt" + "math/rand" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/remeh/sizedwaitgroup" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/internal/util/streamingutil/options" +) + +var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randString(l int) string { + builder := strings.Builder{} + for i := 0; i < l; i++ { + builder.WriteRune(letters[rand.Intn(len(letters))]) + } + return builder.String() +} + +type walImplsTestFramework struct { + b OpenerBuilderImpls + t *testing.T + messageCount int +} + +func NewWALImplsTestFramework(t *testing.T, messageCount int, b OpenerBuilderImpls) *walImplsTestFramework { + return &walImplsTestFramework{ + b: b, + t: t, + messageCount: messageCount, + } +} + +// Run runs the test framework. +// if test failed, a error will be returned. +func (f walImplsTestFramework) Run() { + // create opener. + o, err := f.b.Build() + assert.NoError(f.t, err) + assert.NotNil(f.t, o) + defer o.Close() + + // construct pChannel + name := "test_" + randString(4) + pChannel := &streamingpb.PChannelInfo{ + Name: name, + Term: 1, + ServerID: 1, + VChannelInfos: []*streamingpb.VChannelInfo{}, + } + ctx := context.Background() + + // create a wal. + w, err := o.Open(ctx, &OpenOption{ + Channel: pChannel, + }) + assert.NoError(f.t, err) + assert.NotNil(f.t, w) + defer w.Close() + + f.testReadAndWrite(ctx, w) +} + +func (f walImplsTestFramework) testReadAndWrite(ctx context.Context, w WALImpls) { + // Test read and write. + wg := sync.WaitGroup{} + wg.Add(3) + + var written []message.ImmutableMessage + var read1, read2 []message.ImmutableMessage + go func() { + defer wg.Done() + var err error + written, err = f.testAppend(ctx, w) + assert.NoError(f.t, err) + }() + go func() { + defer wg.Done() + var err error + read1, err = f.testRead(ctx, w, "scanner1") + assert.NoError(f.t, err) + }() + go func() { + defer wg.Done() + var err error + read2, err = f.testRead(ctx, w, "scanner2") + assert.NoError(f.t, err) + }() + + wg.Wait() + + f.assertSortedMessageList(read1) + f.assertSortedMessageList(read2) + sort.Sort(sortByMessageID(written)) + f.assertEqualMessageList(written, read1) + f.assertEqualMessageList(written, read2) + + // Test different scan policy, StartFrom. + readFromIdx := len(read1) / 2 + readFromMsgID := read1[readFromIdx].MessageID() + s, err := w.Read(ctx, ReadOption{ + Name: "scanner_deliver_start_from", + DeliverPolicy: options.DeliverPolicyStartFrom(readFromMsgID), + }) + assert.NoError(f.t, err) + for i := readFromIdx; i < len(read1); i++ { + msg, ok := <-s.Chan() + assert.NotNil(f.t, msg) + assert.True(f.t, ok) + assert.True(f.t, msg.MessageID().EQ(read1[i].MessageID())) + } + s.Close() + + // Test different scan policy, StartAfter. + s, err = w.Read(ctx, ReadOption{ + Name: "scanner_deliver_start_after", + DeliverPolicy: options.DeliverPolicyStartAfter(readFromMsgID), + }) + assert.NoError(f.t, err) + for i := readFromIdx + 1; i < len(read1); i++ { + msg, ok := <-s.Chan() + assert.NotNil(f.t, msg) + assert.True(f.t, ok) + assert.True(f.t, msg.MessageID().EQ(read1[i].MessageID())) + } + s.Close() + + // Test different scan policy, Latest. + s, err = w.Read(ctx, ReadOption{ + Name: "scanner_deliver_latest", + DeliverPolicy: options.DeliverPolicyLatest(), + }) + assert.NoError(f.t, err) + timeoutCh := time.After(1 * time.Second) + select { + case <-s.Chan(): + f.t.Errorf("should be blocked") + case <-timeoutCh: + } + s.Close() +} + +func (f walImplsTestFramework) assertSortedMessageList(msgs []message.ImmutableMessage) { + for i := 1; i < len(msgs); i++ { + assert.True(f.t, msgs[i-1].MessageID().LT(msgs[i].MessageID())) + } +} + +func (f walImplsTestFramework) assertEqualMessageList(msgs1 []message.ImmutableMessage, msgs2 []message.ImmutableMessage) { + assert.Equal(f.t, f.messageCount, len(msgs1)) + assert.Equal(f.t, f.messageCount, len(msgs2)) + for i := 0; i < len(msgs1); i++ { + assert.True(f.t, msgs1[i].MessageID().EQ(msgs2[i].MessageID())) + // assert.True(f.t, bytes.Equal(msgs1[i].Payload(), msgs2[i].Payload())) + id1, ok1 := msgs1[i].Properties().Get("id") + id2, ok2 := msgs2[i].Properties().Get("id") + assert.True(f.t, ok1) + assert.True(f.t, ok2) + assert.Equal(f.t, id1, id2) + id1, ok1 = msgs1[i].Properties().Get("const") + id2, ok2 = msgs2[i].Properties().Get("const") + assert.True(f.t, ok1) + assert.True(f.t, ok2) + assert.Equal(f.t, id1, id2) + } +} + +func (f walImplsTestFramework) testAppend(ctx context.Context, w WALImpls) ([]message.ImmutableMessage, error) { + ids := make([]message.ImmutableMessage, f.messageCount) + swg := sizedwaitgroup.New(5) + for i := 0; i < f.messageCount; i++ { + swg.Add() + go func(i int) { + defer swg.Done() + // ...rocksmq has a dirty implement of properties, + // without commonpb.MsgHeader, it can not work. + header := commonpb.MsgHeader{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Insert, + MsgID: int64(i), + }, + } + payload, err := proto.Marshal(&header) + if err != nil { + panic(err) + } + properties := map[string]string{ + "id": fmt.Sprintf("%d", i), + "const": "t", + } + typ := message.MessageTypeUnknown + msg := message.NewBuilder(). + WithPayload(payload). + WithProperties(properties). + WithMessageType(typ). + BuildMutable() + id, err := w.Append(ctx, msg) + assert.NoError(f.t, err) + assert.NotNil(f.t, id) + ids[i] = message.NewBuilder(). + WithPayload(payload). + WithProperties(properties). + WithMessageID(id). + WithMessageType(typ). + BuildImmutable() + }(i) + } + swg.Wait() + return ids, nil +} + +func (f walImplsTestFramework) testRead(ctx context.Context, w WALImpls, name string) ([]message.ImmutableMessage, error) { + s, err := w.Read(ctx, ReadOption{ + Name: name, + DeliverPolicy: options.DeliverPolicyAll(), + }) + assert.NoError(f.t, err) + assert.Equal(f.t, name, s.Name()) + defer s.Close() + + msgs := make([]message.ImmutableMessage, 0, f.messageCount) + for i := 0; i < f.messageCount; i++ { + msg, ok := <-s.Chan() + assert.NotNil(f.t, msg) + assert.True(f.t, ok) + msgs = append(msgs, msg) + } + return msgs, nil +} + +type sortByMessageID []message.ImmutableMessage + +func (a sortByMessageID) Len() int { + return len(a) +} + +func (a sortByMessageID) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a sortByMessageID) Less(i, j int) bool { + return a[i].MessageID().LT(a[j].MessageID()) +} diff --git a/internal/streamingnode/server/wal/walimpls/wal.go b/internal/streamingnode/server/wal/walimpls/wal.go new file mode 100644 index 000000000000..40e8e4752314 --- /dev/null +++ b/internal/streamingnode/server/wal/walimpls/wal.go @@ -0,0 +1,23 @@ +package walimpls + +import ( + "context" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/util/streamingutil/message" +) + +type WALImpls interface { + // Channel returns the channel assignment info of the wal. + // Should be read-only. + Channel() *streamingpb.PChannelInfo + + // Append writes a record to the log. + Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) + + // Read returns a scanner for reading records from the wal. + Read(ctx context.Context, opts ReadOption) (ScannerImpls, error) + + // Close closes the wal instance. + Close() +} diff --git a/internal/streamingnode/server/wal/walimplstest/builder.go b/internal/streamingnode/server/wal/walimplstest/builder.go new file mode 100644 index 000000000000..0442d7f38fd6 --- /dev/null +++ b/internal/streamingnode/server/wal/walimplstest/builder.go @@ -0,0 +1,32 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + "github.com/milvus-io/milvus/internal/util/streamingutil/message" +) + +const ( + walName = "test" +) + +func init() { + // register the builder to the registry. + registry.RegisterBuilder(&openerBuilder{}) + message.RegisterMessageIDUnmsarshaler(walName, UnmarshalTestMessageID) +} + +var _ walimpls.OpenerBuilderImpls = &openerBuilder{} + +type openerBuilder struct{} + +func (o *openerBuilder) Name() string { + return walName +} + +func (o *openerBuilder) Build() (walimpls.OpenerImpls, error) { + return &opener{}, nil +} diff --git a/internal/streamingnode/server/wal/walimplstest/message_id.go b/internal/streamingnode/server/wal/walimplstest/message_id.go new file mode 100644 index 000000000000..82a086a2f0ac --- /dev/null +++ b/internal/streamingnode/server/wal/walimplstest/message_id.go @@ -0,0 +1,63 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "strconv" + + "github.com/milvus-io/milvus/internal/util/streamingutil/message" +) + +var _ message.MessageID = testMessageID(0) + +// NewTestMessageID create a new test message id. +func NewTestMessageID(id int64) message.MessageID { + return testMessageID(id) +} + +// UnmarshalTestMessageID unmarshal the message id. +func UnmarshalTestMessageID(data []byte) (message.MessageID, error) { + id, err := unmarshalTestMessageID(data) + if err != nil { + return nil, err + } + return id, nil +} + +// unmashalTestMessageID unmarshal the message id. +func unmarshalTestMessageID(data []byte) (testMessageID, error) { + id, err := strconv.ParseInt(string(data), 10, 64) + if err != nil { + return 0, err + } + return testMessageID(id), nil +} + +// testMessageID is the message id for rmq. +type testMessageID int64 + +// WALName returns the name of message id related wal. +func (id testMessageID) WALName() string { + return walName +} + +// LT less than. +func (id testMessageID) LT(other message.MessageID) bool { + return id < other.(testMessageID) +} + +// LTE less than or equal to. +func (id testMessageID) LTE(other message.MessageID) bool { + return id <= other.(testMessageID) +} + +// EQ Equal to. +func (id testMessageID) EQ(other message.MessageID) bool { + return id == other.(testMessageID) +} + +// Marshal marshal the message id. +func (id testMessageID) Marshal() []byte { + return []byte(strconv.FormatInt(int64(id), 10)) +} diff --git a/internal/streamingnode/server/wal/walimplstest/message_log.go b/internal/streamingnode/server/wal/walimplstest/message_log.go new file mode 100644 index 000000000000..6556d9a5d542 --- /dev/null +++ b/internal/streamingnode/server/wal/walimplstest/message_log.go @@ -0,0 +1,64 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "context" + "sync" + + "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var logs = typeutil.NewConcurrentMap[string, *messageLog]() + +func getOrCreateLogs(name string) *messageLog { + l := newMessageLog() + l, _ = logs.GetOrInsert(name, l) + return l +} + +func newMessageLog() *messageLog { + return &messageLog{ + cond: syncutil.NewContextCond(&sync.Mutex{}), + id: 0, + logs: make([]message.ImmutableMessage, 0), + } +} + +type messageLog struct { + cond *syncutil.ContextCond + id int64 + logs []message.ImmutableMessage +} + +func (l *messageLog) Append(_ context.Context, msg message.MutableMessage) (message.MessageID, error) { + l.cond.LockAndBroadcast() + defer l.cond.L.Unlock() + newMessageID := NewTestMessageID(l.id) + l.id++ + l.logs = append(l.logs, msg.IntoImmutableMessage(newMessageID)) + return newMessageID, nil +} + +func (l *messageLog) ReadAt(ctx context.Context, idx int) (message.ImmutableMessage, error) { + var msg message.ImmutableMessage + l.cond.L.Lock() + for idx >= len(l.logs) { + if err := l.cond.Wait(ctx); err != nil { + return nil, err + } + } + msg = l.logs[idx] + l.cond.L.Unlock() + + return msg, nil +} + +func (l *messageLog) Len() int64 { + l.cond.L.Lock() + defer l.cond.L.Unlock() + return int64(len(l.logs)) +} diff --git a/internal/streamingnode/server/wal/walimplstest/opener.go b/internal/streamingnode/server/wal/walimplstest/opener.go new file mode 100644 index 000000000000..69f9f0bf8f79 --- /dev/null +++ b/internal/streamingnode/server/wal/walimplstest/opener.go @@ -0,0 +1,26 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "context" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/helper" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" +) + +var _ walimpls.OpenerImpls = &opener{} + +type opener struct{} + +func (*opener) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) { + l := getOrCreateLogs(opt.Channel.GetName()) + return &walImpls{ + WALHelper: *helper.NewWALHelper(opt), + datas: l, + }, nil +} + +func (*opener) Close() { +} diff --git a/internal/streamingnode/server/wal/walimplstest/scanner.go b/internal/streamingnode/server/wal/walimplstest/scanner.go new file mode 100644 index 000000000000..75be64619284 --- /dev/null +++ b/internal/streamingnode/server/wal/walimplstest/scanner.go @@ -0,0 +1,51 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/helper" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + "github.com/milvus-io/milvus/internal/util/streamingutil/message" +) + +var _ walimpls.ScannerImpls = &scannerImpls{} + +func newScannerImpls(opts walimpls.ReadOption, data *messageLog, offset int) *scannerImpls { + s := &scannerImpls{ + ScannerHelper: helper.NewScannerHelper(opts.Name), + datas: data, + ch: make(chan message.ImmutableMessage), + offset: offset, + } + go s.executeConsume() + return s +} + +type scannerImpls struct { + *helper.ScannerHelper + datas *messageLog + ch chan message.ImmutableMessage + offset int +} + +func (s *scannerImpls) executeConsume() { + defer close(s.ch) + for { + msg, err := s.datas.ReadAt(s.Context(), s.offset) + if err != nil { + s.Finish(nil) + return + } + s.ch <- msg + s.offset++ + } +} + +func (s *scannerImpls) Chan() <-chan message.ImmutableMessage { + return s.ch +} + +func (s *scannerImpls) Close() error { + return s.ScannerHelper.Close() +} diff --git a/internal/streamingnode/server/wal/walimplstest/wal.go b/internal/streamingnode/server/wal/walimplstest/wal.go new file mode 100644 index 000000000000..47b1a887af00 --- /dev/null +++ b/internal/streamingnode/server/wal/walimplstest/wal.go @@ -0,0 +1,52 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "context" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/helper" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + "github.com/milvus-io/milvus/internal/util/streamingutil/message" +) + +var _ walimpls.WALImpls = &walImpls{} + +type walImpls struct { + helper.WALHelper + datas *messageLog +} + +func (w *walImpls) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + return w.datas.Append(ctx, msg) +} + +func (w *walImpls) Read(ctx context.Context, opts walimpls.ReadOption) (walimpls.ScannerImpls, error) { + offset := int64(0) + switch policy := opts.DeliverPolicy.Policy.(type) { + case *streamingpb.DeliverPolicy_All: + offset = 0 + case *streamingpb.DeliverPolicy_Latest: + offset = w.datas.Len() + case *streamingpb.DeliverPolicy_StartFrom: + id, err := unmarshalTestMessageID(policy.StartFrom.Id) + if err != nil { + return nil, err + } + offset = int64(id) + case *streamingpb.DeliverPolicy_StartAfter: + id, err := unmarshalTestMessageID(policy.StartAfter.Id) + if err != nil { + return nil, err + } + offset = int64(id + 1) + } + return newScannerImpls( + opts, w.datas, int(offset), + ), nil +} + +func (w *walImpls) Close() { +} diff --git a/internal/streamingnode/server/wal/walimplstest/wal_test.go b/internal/streamingnode/server/wal/walimplstest/wal_test.go new file mode 100644 index 000000000000..57a18ac4c4f0 --- /dev/null +++ b/internal/streamingnode/server/wal/walimplstest/wal_test.go @@ -0,0 +1,11 @@ +package walimplstest + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" +) + +func TestWALImplsTest(t *testing.T) { + walimpls.NewWALImplsTestFramework(t, 100, &openerBuilder{}).Run() +} diff --git a/internal/streamingservice/.mockery.yaml b/internal/streamingservice/.mockery.yaml new file mode 100644 index 000000000000..9b4b311a9927 --- /dev/null +++ b/internal/streamingservice/.mockery.yaml @@ -0,0 +1,28 @@ +quiet: False +with-expecter: True +filename: "mock_{{.InterfaceName}}.go" +dir: "internal/mocks/{{trimPrefix .PackagePath \"github.com/milvus-io/milvus/internal\" | dir }}/mock_{{.PackageName}}" +mockname: "Mock{{.InterfaceName}}" +outpkg: "mock_{{.PackageName}}" +packages: + github.com/milvus-io/milvus/internal/util/streamingutil/message: + interfaces: + MessageID: + ImmutableMessage: + MutableMessage: + RProperties: + github.com/milvus-io/milvus/internal/streamingnode/server/wal: + interfaces: + OpenerBuilder: + Opener: + Scanner: + WAL: + github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls: + interfaces: + OpenerBuilderImpls: + OpenerImpls: + ScannerImpls: + WALImpls: + Interceptor: + InterceptorWithReady: + InterceptorBuilder: \ No newline at end of file diff --git a/internal/util/logserviceutil/message/builder.go b/internal/util/streamingutil/message/builder.go similarity index 100% rename from internal/util/logserviceutil/message/builder.go rename to internal/util/streamingutil/message/builder.go diff --git a/internal/util/logserviceutil/message/message.go b/internal/util/streamingutil/message/message.go similarity index 90% rename from internal/util/logserviceutil/message/message.go rename to internal/util/streamingutil/message/message.go index 776dd8107329..2234b0693c24 100644 --- a/internal/util/logserviceutil/message/message.go +++ b/internal/util/streamingutil/message/message.go @@ -33,6 +33,9 @@ type MutableMessage interface { // Properties returns the message properties. Properties() Properties + + // IntoImmutableMessage converts the mutable message to immutable message. + IntoImmutableMessage(msgID MessageID) ImmutableMessage } // ImmutableMessage is the read-only message interface. @@ -63,7 +66,7 @@ type ImmutableMessage interface { Properties() RProperties // Version returns the message format version. - // 0: old version before lognode. - // from 1: new version after lognode. + // 0: old version before streamingnode. + // from 1: new version after streamingnode. Version() Version } diff --git a/internal/util/logserviceutil/message/message_builder_test.go b/internal/util/streamingutil/message/message_builder_test.go similarity index 95% rename from internal/util/logserviceutil/message/message_builder_test.go rename to internal/util/streamingutil/message/message_builder_test.go index c937c3d5d26b..b5911d9ae10e 100644 --- a/internal/util/logserviceutil/message/message_builder_test.go +++ b/internal/util/streamingutil/message/message_builder_test.go @@ -7,8 +7,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message" - "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + "github.com/milvus-io/milvus/internal/mocks/util/streamingutil/mock_message" + "github.com/milvus-io/milvus/internal/util/streamingutil/message" ) func TestMessage(t *testing.T) { diff --git a/internal/util/logserviceutil/message/message_handler.go b/internal/util/streamingutil/message/message_handler.go similarity index 100% rename from internal/util/logserviceutil/message/message_handler.go rename to internal/util/streamingutil/message/message_handler.go diff --git a/internal/util/logserviceutil/message/message_handler_test.go b/internal/util/streamingutil/message/message_handler_test.go similarity index 100% rename from internal/util/logserviceutil/message/message_handler_test.go rename to internal/util/streamingutil/message/message_handler_test.go diff --git a/internal/util/logserviceutil/message/message_id.go b/internal/util/streamingutil/message/message_id.go similarity index 82% rename from internal/util/logserviceutil/message/message_id.go rename to internal/util/streamingutil/message/message_id.go index 910338ea88cb..d68ab616a1e8 100644 --- a/internal/util/logserviceutil/message/message_id.go +++ b/internal/util/streamingutil/message/message_id.go @@ -1,11 +1,17 @@ package message import ( + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) -// messageIDUnmarshaler is the map for message id unmarshaler. -var messageIDUnmarshaler typeutil.ConcurrentMap[string, MessageIDUnmarshaler] +var ( + // messageIDUnmarshaler is the map for message id unmarshaler. + messageIDUnmarshaler typeutil.ConcurrentMap[string, MessageIDUnmarshaler] + + ErrInvalidMessageID = errors.New("invalid message id") +) // RegisterMessageIDUnmsarshaler register the message id unmarshaler. func RegisterMessageIDUnmsarshaler(name string, unmarshaler MessageIDUnmarshaler) { diff --git a/internal/util/logserviceutil/message/message_id_test.go b/internal/util/streamingutil/message/message_id_test.go similarity index 86% rename from internal/util/logserviceutil/message/message_id_test.go rename to internal/util/streamingutil/message/message_id_test.go index 5aef58bddd8c..4a4bcf806a0c 100644 --- a/internal/util/logserviceutil/message/message_id_test.go +++ b/internal/util/streamingutil/message/message_id_test.go @@ -7,8 +7,8 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message" - "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + "github.com/milvus-io/milvus/internal/mocks/util/streamingutil/mock_message" + "github.com/milvus-io/milvus/internal/util/streamingutil/message" ) func TestRegisterMessageIDUnmarshaler(t *testing.T) { diff --git a/internal/util/logserviceutil/message/message_impl.go b/internal/util/streamingutil/message/message_impl.go similarity index 92% rename from internal/util/logserviceutil/message/message_impl.go rename to internal/util/streamingutil/message/message_impl.go index f1f9de5bdaa0..e2ad94feaa4e 100644 --- a/internal/util/logserviceutil/message/message_impl.go +++ b/internal/util/streamingutil/message/message_impl.go @@ -49,6 +49,14 @@ func (m *messageImpl) WithLastConfirmed(id MessageID) MutableMessage { return m } +// IntoImmutableMessage converts current message to immutable message. +func (m *messageImpl) IntoImmutableMessage(id MessageID) ImmutableMessage { + return &immutableMessageImpl{ + messageImpl: *m, + id: id, + } +} + type immutableMessageImpl struct { messageImpl id MessageID diff --git a/internal/util/logserviceutil/message/message_test.go b/internal/util/streamingutil/message/message_test.go similarity index 100% rename from internal/util/logserviceutil/message/message_test.go rename to internal/util/streamingutil/message/message_test.go diff --git a/internal/util/logserviceutil/message/message_type.go b/internal/util/streamingutil/message/message_type.go similarity index 100% rename from internal/util/logserviceutil/message/message_type.go rename to internal/util/streamingutil/message/message_type.go diff --git a/internal/util/logserviceutil/message/properties.go b/internal/util/streamingutil/message/properties.go similarity index 100% rename from internal/util/logserviceutil/message/properties.go rename to internal/util/streamingutil/message/properties.go diff --git a/internal/util/logserviceutil/message/version.go b/internal/util/streamingutil/message/version.go similarity index 87% rename from internal/util/logserviceutil/message/version.go rename to internal/util/streamingutil/message/version.go index 1e99e51f33de..3c2ae7fde580 100644 --- a/internal/util/logserviceutil/message/version.go +++ b/internal/util/streamingutil/message/version.go @@ -3,7 +3,7 @@ package message import "strconv" var ( - VersionOld Version = 0 // old version before lognode. + VersionOld Version = 0 // old version before streamingnode. VersionV1 Version = 1 ) diff --git a/internal/util/streamingutil/options/deliver.go b/internal/util/streamingutil/options/deliver.go new file mode 100644 index 000000000000..46e59524b2ab --- /dev/null +++ b/internal/util/streamingutil/options/deliver.go @@ -0,0 +1,45 @@ +package options + +import ( + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/util/streamingutil/message" +) + +const ( + deliverOrderTimetick DeliverOrder = 1 +) + +// DeliverOrder is the order of delivering messages. +type ( + DeliverOrder int + DeliverPolicy *streamingpb.DeliverPolicy +) + +// DeliverPolicyAll delivers all messages. +func DeliverPolicyAll() DeliverPolicy { + return streamingpb.NewDeliverAll() +} + +// DeliverLatest delivers the latest message. +func DeliverPolicyLatest() DeliverPolicy { + return streamingpb.NewDeliverLatest() +} + +// DeliverEarliest delivers the earliest message. +func DeliverPolicyStartFrom(messageID message.MessageID) DeliverPolicy { + return streamingpb.NewDeliverStartFrom(&streamingpb.MessageID{ + Id: messageID.Marshal(), + }) +} + +// DeliverPolicyStartAfter delivers the message after the specified message. +func DeliverPolicyStartAfter(messageID message.MessageID) DeliverPolicy { + return streamingpb.NewDeliverStartAfter(&streamingpb.MessageID{ + Id: messageID.Marshal(), + }) +} + +// DeliverOrderTimeTick delivers messages by time tick. +func DeliverOrderTimeTick() DeliverOrder { + return deliverOrderTimetick +} diff --git a/scripts/generate_proto.sh b/scripts/generate_proto.sh index 03c4e9f687b0..47c31e92d721 100755 --- a/scripts/generate_proto.sh +++ b/scripts/generate_proto.sh @@ -57,7 +57,7 @@ mkdir -p indexpb mkdir -p datapb mkdir -p querypb mkdir -p planpb -mkdir -p logpb +mkdir -p streamingpb mkdir -p $ROOT_DIR/cmd/tools/migration/legacy/legacypb @@ -75,7 +75,7 @@ ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./querypb query_coord. ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./planpb plan.proto|| { echo 'generate plan.proto failed'; exit 1; } ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./segcorepb segcore.proto|| { echo 'generate segcore.proto failed'; exit 1; } ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./clusteringpb clustering.proto|| { echo 'generate clustering.proto failed'; exit 1; } -${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./logpb log.proto|| { echo 'generate logpb.proto failed'; exit 1; } +${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./streamingpb streaming.proto|| { echo 'generate streamingpb.proto failed'; exit 1; } ${protoc_opt} --proto_path=$ROOT_DIR/cmd/tools/migration/legacy/ \ --go_out=plugins=grpc,paths=source_relative:../../cmd/tools/migration/legacy/legacypb legacy.proto || { echo 'generate legacy.proto failed'; exit 1; }