From 60be4f704b1497956b68660209ea003acbd36175 Mon Sep 17 00:00:00 2001 From: chyezh Date: Tue, 11 Jun 2024 11:10:29 +0800 Subject: [PATCH] enhance: wal interface definition Signed-off-by: chyezh --- Makefile | 3 + go.mod | 3 +- go.sum | 2 + internal/lognode/server/wal/RAEDME.md | 66 +++++ .../lognode/server/wal/adaptor/builder.go | 32 +++ internal/lognode/server/wal/builder.go | 31 +++ .../server/wal/helper/scanner_helper.go | 58 ++++ .../server/wal/helper/scanner_helper_test.go | 58 ++++ .../lognode/server/wal/helper/wal_helper.go | 33 +++ .../server/wal/helper/wal_helper_test.go | 24 ++ .../lognode/server/wal/registry/registry.go | 33 +++ .../lognode/server/wal/registry/wal_test.go | 48 ++++ internal/lognode/server/wal/scanner.go | 29 ++ internal/lognode/server/wal/wal.go | 28 ++ .../lognode/server/wal/walimpls/builder.go | 10 + .../server/wal/walimpls/interceptor.go | 64 +++++ .../lognode/server/wal/walimpls/opener.go | 21 ++ .../lognode/server/wal/walimpls/scanner.go | 31 +++ .../server/wal/walimpls/test_framework.go | 259 +++++++++++++++++ internal/lognode/server/wal/walimpls/wal.go | 23 ++ internal/logservice/.mockery.yaml | 17 +- .../lognode/server/mock_wal/mock_Opener.go | 124 +++++++++ .../server/mock_wal/mock_OpenerBuilder.go | 129 +++++++++ .../lognode/server/mock_wal/mock_Scanner.go | 203 ++++++++++++++ .../mocks/lognode/server/mock_wal/mock_WAL.go | 261 ++++++++++++++++++ .../wal/mock_walimpls/mock_Interceptor.go | 125 +++++++++ .../mock_walimpls/mock_InterceptorBuilder.go | 79 ++++++ .../mock_InterceptorWithReady.go | 168 +++++++++++ .../mock_walimpls/mock_OpenerBuilderImpls.go | 129 +++++++++ .../wal/mock_walimpls/mock_OpenerImpls.go | 124 +++++++++ .../wal/mock_walimpls/mock_ScannerImpls.go | 244 ++++++++++++++++ .../server/wal/mock_walimpls/mock_WALImpls.go | 226 +++++++++++++++ .../mock_message/mock_MutableMessage.go | 44 +++ internal/proto/log.proto | 27 ++ internal/proto/logpb/extends.go | 42 +++ .../message/message_builder_test.go | 21 +- .../util/logserviceutil/message/message_id.go | 10 +- .../logserviceutil/message/message_id_test.go | 24 +- .../logserviceutil/message/test_message_id.go | 67 +++++ .../util/logserviceutil/options/deliver.go | 45 +++ 40 files changed, 2928 insertions(+), 37 deletions(-) create mode 100644 internal/lognode/server/wal/RAEDME.md create mode 100644 internal/lognode/server/wal/adaptor/builder.go create mode 100644 internal/lognode/server/wal/builder.go create mode 100644 internal/lognode/server/wal/helper/scanner_helper.go create mode 100644 internal/lognode/server/wal/helper/scanner_helper_test.go create mode 100644 internal/lognode/server/wal/helper/wal_helper.go create mode 100644 internal/lognode/server/wal/helper/wal_helper_test.go create mode 100644 internal/lognode/server/wal/registry/registry.go create mode 100644 internal/lognode/server/wal/registry/wal_test.go create mode 100644 internal/lognode/server/wal/scanner.go create mode 100644 internal/lognode/server/wal/wal.go create mode 100644 internal/lognode/server/wal/walimpls/builder.go create mode 100644 internal/lognode/server/wal/walimpls/interceptor.go create mode 100644 internal/lognode/server/wal/walimpls/opener.go create mode 100644 internal/lognode/server/wal/walimpls/scanner.go create mode 100644 internal/lognode/server/wal/walimpls/test_framework.go create mode 100644 internal/lognode/server/wal/walimpls/wal.go create mode 100644 internal/mocks/lognode/server/mock_wal/mock_Opener.go create mode 100644 internal/mocks/lognode/server/mock_wal/mock_OpenerBuilder.go create mode 100644 internal/mocks/lognode/server/mock_wal/mock_Scanner.go create mode 100644 internal/mocks/lognode/server/mock_wal/mock_WAL.go create mode 100644 internal/mocks/lognode/server/wal/mock_walimpls/mock_Interceptor.go create mode 100644 internal/mocks/lognode/server/wal/mock_walimpls/mock_InterceptorBuilder.go create mode 100644 internal/mocks/lognode/server/wal/mock_walimpls/mock_InterceptorWithReady.go create mode 100644 internal/mocks/lognode/server/wal/mock_walimpls/mock_OpenerBuilderImpls.go create mode 100644 internal/mocks/lognode/server/wal/mock_walimpls/mock_OpenerImpls.go create mode 100644 internal/mocks/lognode/server/wal/mock_walimpls/mock_ScannerImpls.go create mode 100644 internal/mocks/lognode/server/wal/mock_walimpls/mock_WALImpls.go create mode 100644 internal/proto/logpb/extends.go create mode 100644 internal/util/logserviceutil/message/test_message_id.go create mode 100644 internal/util/logserviceutil/options/deliver.go diff --git a/Makefile b/Makefile index 2a3372bad495..5f770e276c08 100644 --- a/Makefile +++ b/Makefile @@ -516,6 +516,9 @@ generate-mockery-log: generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg generate-mockery-log +generate-mockery-lognode: getdeps + $(INSTALL_PATH)/mockery --config $(PWD)/internal/lognode/.mockery.yaml + generate-yaml: milvus-tools @echo "Updating milvus config yaml" @$(PWD)/bin/tools/config gen-yaml && mv milvus.yaml configs/milvus.yaml diff --git a/go.mod b/go.mod index 653c0798af39..166f21a01293 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,9 @@ require ( github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/pkg/errors v0.9.1 + github.com/remeh/sizedwaitgroup v1.0.0 github.com/zeebo/xxh3 v1.0.2 + google.golang.org/protobuf v1.33.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -238,7 +240,6 @@ require ( google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect - google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 70b031afdde7..f52d95b12cdc 100644 --- a/go.sum +++ b/go.sum @@ -762,6 +762,8 @@ github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/quasilyte/go-ruleguard/dsl v0.3.22 h1:wd8zkOhSNr+I+8Qeciml08ivDt1pSXe60+5DqOpCjPE= github.com/quasilyte/go-ruleguard/dsl v0.3.22/go.mod h1:KeCP03KrjuSO0H1kTuZQCWlQPulDV6YMIXmpQss17rU= +github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E= +github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ= diff --git a/internal/lognode/server/wal/RAEDME.md b/internal/lognode/server/wal/RAEDME.md new file mode 100644 index 000000000000..e013d9e30c23 --- /dev/null +++ b/internal/lognode/server/wal/RAEDME.md @@ -0,0 +1,66 @@ +# WAL + +`wal` package is the basic defination of wal interface of milvus lognode. + +## Project arrangement + +- `/`: only define exposed interfaces. +- `/walimpls/`: define the underlying message system interfaces need to be implemented. +- `/registry/`: A static lifetime registry to regsiter new implementation for inverting dependency. +- `/adaptor/`: adaptors to implement `wal` interface from `walimpls` interface +- `/helper/`: A utility used to help developer to implement `walimpls` conveniently. +- `/utility/`: A utility code for common logic or data structure. + +## Lifetime Of Interfaces + +- `OpenerBuilder` has a static lifetime in a programs: +- `Opener` keep same lifetime with underlying resources (such as mq client). +- `WAL` keep same lifetime with underlying writer of wal, and it's lifetime is always included in related `Opener`. +- `Scanner` keep same lifetime with underlying reader of wal, and it's lifetime is always included in related `WAL`. + +## Add New Implemetation Of WAL + +developper who want to add a new implementation of `wal` should implements the `walimpls` package interfaces. following interfaces is required: + +- `walimpls.OpenerBuilderImpls` +- `walimpls.OpenerImpls` +- `walimpls.ScannerImpls` +- `walimpls.WALImpls` + +`OpenerBuilderImpls` create `OpenerImpls`; `OpenerImpls` creates `WALImpls`; `WALImpls` create `ScannerImpls`. +Then register the implmentation of `walimpls.OpenerBuilderImpls` into `registry` package. + +``` +var _ OpenerBuilderImpls = b{}; +registry.RegisterBuilder(b{}) +``` + +All things have been done. + +## Use WAL + +``` +name := "your builder name" +var yourCh *logpb.PChannelInfo + +opener, err := registry.MustGetBuilder(name).Build() +if err != nil { + panic(err) +} +ctx := context.Background() +logger, err := opener.Open(ctx, wal.OpenOption{ + Channel: yourCh +}) +if err != nil { + panic(err) +} +``` +## Adaptor + +package `adaptor` is used to adapt `walimpls` and `wal` together. +common wal function should be implement by it. Such as: + +- lifetime management +- interceptor implementation +- scanner wrapped up +- write ahead cache implementation diff --git a/internal/lognode/server/wal/adaptor/builder.go b/internal/lognode/server/wal/adaptor/builder.go new file mode 100644 index 000000000000..2569057c6086 --- /dev/null +++ b/internal/lognode/server/wal/adaptor/builder.go @@ -0,0 +1,32 @@ +package adaptor + +import ( + "github.com/milvus-io/milvus/internal/lognode/server/wal" + "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" +) + +var _ wal.OpenerBuilder = (*builderAdaptorImpl)(nil) + +func AdaptImplsToBuilder(builder walimpls.OpenerBuilderImpls) wal.OpenerBuilder { + return builderAdaptorImpl{ + builder: builder, + } +} + +type builderAdaptorImpl struct { + builder walimpls.OpenerBuilderImpls +} + +func (b builderAdaptorImpl) Name() string { + return b.builder.Name() +} + +func (b builderAdaptorImpl) Build() (wal.Opener, error) { + _, err := b.builder.Build() + if err != nil { + return nil, err + } + return nil, nil + // TODO: wait for implementation. + // return adaptImplsToOpener(o), nil +} diff --git a/internal/lognode/server/wal/builder.go b/internal/lognode/server/wal/builder.go new file mode 100644 index 000000000000..614a566d3739 --- /dev/null +++ b/internal/lognode/server/wal/builder.go @@ -0,0 +1,31 @@ +package wal + +import ( + "context" + + "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" + "github.com/milvus-io/milvus/internal/proto/logpb" +) + +// OpenerBuilder is the interface for build wal opener. +type OpenerBuilder interface { + // Name of the wal builder, should be a lowercase string. + Name() string + + Build() (Opener, error) +} + +// OpenOption is the option for allocating wal instance. +type OpenOption struct { + Channel *logpb.PChannelInfo + InterceptorBuilders []walimpls.InterceptorBuilder // Interceptor builders to build when open. +} + +// Opener is the interface for build wal instance. +type Opener interface { + // Open open a wal instance. + Open(ctx context.Context, opt *OpenOption) (WAL, error) + + // Close closes the opener resources. + Close() +} diff --git a/internal/lognode/server/wal/helper/scanner_helper.go b/internal/lognode/server/wal/helper/scanner_helper.go new file mode 100644 index 000000000000..3d6ef34408e1 --- /dev/null +++ b/internal/lognode/server/wal/helper/scanner_helper.go @@ -0,0 +1,58 @@ +package helper + +import "context" + +// NewScannerHelper creates a new ScannerHelper. +func NewScannerHelper(scannerName string) *ScannerHelper { + ctx, cancel := context.WithCancel(context.Background()) + return &ScannerHelper{ + scannerName: scannerName, + ctx: ctx, + cancel: cancel, + finishCh: make(chan struct{}), + err: nil, + } +} + +// ScannerHelper is a helper for scanner implementation. +type ScannerHelper struct { + scannerName string + ctx context.Context + cancel context.CancelFunc + finishCh chan struct{} + err error +} + +// Context returns the context of the scanner, which will cancel when the scanner helper is closed. +func (s *ScannerHelper) Context() context.Context { + return s.ctx +} + +// Name returns the name of the scanner. +func (s *ScannerHelper) Name() string { + return s.scannerName +} + +// Error returns the error of the scanner. +func (s *ScannerHelper) Error() error { + <-s.finishCh + return s.err +} + +// Done returns a channel that will be closed when the scanner is finished. +func (s *ScannerHelper) Done() <-chan struct{} { + return s.finishCh +} + +// Close closes the scanner, block until the Finish is called. +func (s *ScannerHelper) Close() error { + s.cancel() + <-s.finishCh + return s.err +} + +// Finish finishes the scanner with an error. +func (s *ScannerHelper) Finish(err error) { + s.err = err + close(s.finishCh) +} diff --git a/internal/lognode/server/wal/helper/scanner_helper_test.go b/internal/lognode/server/wal/helper/scanner_helper_test.go new file mode 100644 index 000000000000..e304e32cae75 --- /dev/null +++ b/internal/lognode/server/wal/helper/scanner_helper_test.go @@ -0,0 +1,58 @@ +package helper + +import ( + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" +) + +func TestScanner(t *testing.T) { + h := NewScannerHelper("test") + assert.NotNil(t, h.Context()) + assert.Equal(t, h.Name(), "test") + assert.NotNil(t, h.Context()) + + select { + case <-h.Done(): + t.Errorf("should not done") + return + case <-h.Context().Done(): + t.Error("should not cancel") + return + default: + } + + finishErr := errors.New("test") + + ch := make(chan struct{}) + go func() { + defer close(ch) + done := false + cancel := false + cancelCh := h.Context().Done() + doneCh := h.Done() + for i := 0; ; i += 1 { + select { + case <-doneCh: + done = true + doneCh = nil + case <-cancelCh: + cancel = true + cancelCh = nil + h.Finish(finishErr) + } + if cancel && done { + return + } + if i == 0 { + assert.True(t, cancel && !done) + } else if i == 1 { + assert.True(t, cancel && done) + } + } + }() + h.Close() + assert.ErrorIs(t, h.Error(), finishErr) + <-ch +} diff --git a/internal/lognode/server/wal/helper/wal_helper.go b/internal/lognode/server/wal/helper/wal_helper.go new file mode 100644 index 000000000000..ec7668dfe412 --- /dev/null +++ b/internal/lognode/server/wal/helper/wal_helper.go @@ -0,0 +1,33 @@ +package helper + +import ( + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" + "github.com/milvus-io/milvus/internal/proto/logpb" + "github.com/milvus-io/milvus/pkg/log" +) + +// NewWALHelper creates a new WALHelper. +func NewWALHelper(opt *walimpls.OpenOption) *WALHelper { + return &WALHelper{ + logger: log.With(zap.Any("channel", opt.Channel)), + channel: opt.Channel, + } +} + +// WALHelper is a helper for WAL implementation. +type WALHelper struct { + logger *log.MLogger + channel *logpb.PChannelInfo +} + +// Channel returns the channel of the WAL. +func (w *WALHelper) Channel() *logpb.PChannelInfo { + return w.channel +} + +// Log returns the logger of the WAL. +func (w *WALHelper) Log() *log.MLogger { + return w.logger +} diff --git a/internal/lognode/server/wal/helper/wal_helper_test.go b/internal/lognode/server/wal/helper/wal_helper_test.go new file mode 100644 index 000000000000..8afd089ace1a --- /dev/null +++ b/internal/lognode/server/wal/helper/wal_helper_test.go @@ -0,0 +1,24 @@ +package helper + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" + "github.com/milvus-io/milvus/internal/proto/logpb" +) + +func TestWALHelper(t *testing.T) { + h := NewWALHelper(&walimpls.OpenOption{ + Channel: &logpb.PChannelInfo{ + Name: "test", + Term: 1, + ServerID: 1, + VChannelInfos: []*logpb.VChannelInfo{}, + }, + }) + assert.NotNil(t, h.Channel()) + assert.Equal(t, h.Channel().Name, "test") + assert.NotNil(t, h.Log()) +} diff --git a/internal/lognode/server/wal/registry/registry.go b/internal/lognode/server/wal/registry/registry.go new file mode 100644 index 000000000000..e36c49338e8b --- /dev/null +++ b/internal/lognode/server/wal/registry/registry.go @@ -0,0 +1,33 @@ +package registry + +import ( + "github.com/milvus-io/milvus/internal/lognode/server/wal" + "github.com/milvus-io/milvus/internal/lognode/server/wal/adaptor" + "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// builders is a map of registered wal builders. +var builders typeutil.ConcurrentMap[string, wal.OpenerBuilder] + +// Register registers the wal builder. +// +// NOTE: this function must only be called during initialization time (i.e. in +// an init() function), name of builder is lowercase. If multiple Builder are +// registered with the same name, panic will occur. +func RegisterBuilder(b walimpls.OpenerBuilderImpls) { + bb := adaptor.AdaptImplsToBuilder(b) + _, loaded := builders.GetOrInsert(bb.Name(), bb) + if loaded { + panic("wal builder already registered: " + b.Name()) + } +} + +// MustGetBuilder returns the wal builder by name. +func MustGetBuilder(name string) wal.OpenerBuilder { + b, ok := builders.Get(name) + if !ok { + panic("wal builder not found: " + name) + } + return b +} diff --git a/internal/lognode/server/wal/registry/wal_test.go b/internal/lognode/server/wal/registry/wal_test.go new file mode 100644 index 000000000000..6b2fa6af3d51 --- /dev/null +++ b/internal/lognode/server/wal/registry/wal_test.go @@ -0,0 +1,48 @@ +package registry + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/mocks/lognode/server/wal/mock_walimpls" +) + +func TestRegister(t *testing.T) { + name := "mock" + b := mock_walimpls.NewMockOpenerBuilderImpls(t) + b.EXPECT().Name().Return(name) + + RegisterBuilder(b) + b2 := MustGetBuilder(name) + assert.Equal(t, b.Name(), b2.Name()) + + // Panic if register twice. + assert.Panics(t, func() { + RegisterBuilder(b) + }) + + // Panic if get not exist builder. + assert.Panics(t, func() { + MustGetBuilder("not exist") + }) + + // Test concurrent. + wg := sync.WaitGroup{} + count := 10 + wg.Add(count) + for i := 0; i < count; i++ { + go func(i int) { + defer wg.Done() + name := fmt.Sprintf("mock_%d", i) + b := mock_walimpls.NewMockOpenerBuilderImpls(t) + b.EXPECT().Name().Return(name) + RegisterBuilder(b) + b2 := MustGetBuilder(name) + assert.Equal(t, b.Name(), b2.Name()) + }(i) + } + wg.Wait() +} diff --git a/internal/lognode/server/wal/scanner.go b/internal/lognode/server/wal/scanner.go new file mode 100644 index 000000000000..eab5d1200fa3 --- /dev/null +++ b/internal/lognode/server/wal/scanner.go @@ -0,0 +1,29 @@ +package wal + +import ( + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + "github.com/milvus-io/milvus/internal/util/logserviceutil/options" +) + +// ReadOption is the option for reading records from the wal. +type ReadOption struct { + DeliverPolicy options.DeliverPolicy + DeliverOrder options.DeliverOrder +} + +// Scanner is the interface for reading records from the wal. +type Scanner interface { + // Chan returns the channel of message. + Chan() <-chan message.ImmutableMessage + + // Error returns the error of scanner failed. + // Will block until scanner is closed or Chan is dry out. + Error() error + + // Done returns a channel which will be closed when scanner is finished or closed. + Done() <-chan struct{} + + // Close the scanner, release the underlying resources. + // Return the error same with `Error` + Close() error +} diff --git a/internal/lognode/server/wal/wal.go b/internal/lognode/server/wal/wal.go new file mode 100644 index 000000000000..3d54dca6559b --- /dev/null +++ b/internal/lognode/server/wal/wal.go @@ -0,0 +1,28 @@ +package wal + +import ( + "context" + + "github.com/milvus-io/milvus/internal/proto/logpb" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" +) + +// WAL is the WAL framework interface. +// !!! Don't implement it directly, implement walimpls.WAL instead. +type WAL interface { + // Channel returns the channel assignment info of the wal. + // Should be read-only. + Channel() *logpb.PChannelInfo + + // Append writes a record to the log. + Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) + + // Append a record to the log asynchronously. + AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error)) + + // Read returns a scanner for reading records from the wal. + Read(ctx context.Context, deliverPolicy ReadOption) (Scanner, error) + + // Close closes the wal instance. + Close() +} diff --git a/internal/lognode/server/wal/walimpls/builder.go b/internal/lognode/server/wal/walimpls/builder.go new file mode 100644 index 000000000000..4a41a7491436 --- /dev/null +++ b/internal/lognode/server/wal/walimpls/builder.go @@ -0,0 +1,10 @@ +package walimpls + +// OpenerBuilderImpls is the interface for building wal opener impls. +type OpenerBuilderImpls interface { + // Name of the wal builder, should be a lowercase string. + Name() string + + // Build build a opener impls instance. + Build() (OpenerImpls, error) +} diff --git a/internal/lognode/server/wal/walimpls/interceptor.go b/internal/lognode/server/wal/walimpls/interceptor.go new file mode 100644 index 000000000000..fddd425fa1a8 --- /dev/null +++ b/internal/lognode/server/wal/walimpls/interceptor.go @@ -0,0 +1,64 @@ +package walimpls + +import ( + "context" + + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" +) + +type ( + // Append is the common function to append a msg to the wal. + Append = func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) + // Read is the common function to read a msg from the wal. + Read = func(ctx context.Context, opt ReadOption) (ScannerImpls, error) +) + +// InterceptorBuilder is the interface to build a interceptor. +// 1. InterceptorBuilder is concurrent safe. +// 2. InterceptorBuilder can used to build a interceptor with cross-wal shared resources. +type InterceptorBuilder interface { + // Build build a interceptor with wal that interceptor will work on. + // the wal object will be sent to the interceptor builder when the wal is constructed with all interceptors. + Build(wal <-chan WALImpls) BasicInterceptor +} + +type BasicInterceptor interface { + // Close the interceptor release the resources. + Close() +} + +type Interceptor interface { + AppendInterceptor + + BasicInterceptor +} + +// AppendInterceptor is the interceptor for Append functions. +// All wal extra operations should be done by these function, such as +// 1. time tick setup. +// 2. unique primary key filter and build. +// 3. index builder. +// 4. cache sync up. +// AppendInterceptor should be lazy initialized and fast execution. +type AppendInterceptor interface { + // Execute the append operation with interceptor. + DoAppend(ctx context.Context, msg message.MutableMessage, append Append) (message.MessageID, error) +} + +type InterceptorReady interface { + // Ready check if interceptor is ready. + // Close of Interceptor would not notify the ready (closed interceptor is not ready). + // So always apply timeout when waiting for ready. + // Some append interceptor may be stateful, such as index builder and unique primary key filter, + // so it need to implement the recovery logic from crash by itself before notifying ready. + // Append operation will block until ready or canceled. + // Consumer do not blocked by it. + Ready() <-chan struct{} +} + +// Some interceptor may need to wait for some resource to be ready or recovery process. +type InterceptorWithReady interface { + Interceptor + + InterceptorReady +} diff --git a/internal/lognode/server/wal/walimpls/opener.go b/internal/lognode/server/wal/walimpls/opener.go new file mode 100644 index 000000000000..60e50625d9af --- /dev/null +++ b/internal/lognode/server/wal/walimpls/opener.go @@ -0,0 +1,21 @@ +package walimpls + +import ( + "context" + + "github.com/milvus-io/milvus/internal/proto/logpb" +) + +// OpenOption is the option for allocating wal impls instance. +type OpenOption struct { + Channel *logpb.PChannelInfo // Channel to open. +} + +// OpenerImpls is the interface for build WALImpls instance. +type OpenerImpls interface { + // Open open a WALImpls instance. + Open(ctx context.Context, opt *OpenOption) (WALImpls, error) + + // Close release the resources. + Close() +} diff --git a/internal/lognode/server/wal/walimpls/scanner.go b/internal/lognode/server/wal/walimpls/scanner.go new file mode 100644 index 000000000000..c0a67d899ff1 --- /dev/null +++ b/internal/lognode/server/wal/walimpls/scanner.go @@ -0,0 +1,31 @@ +package walimpls + +import ( + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + "github.com/milvus-io/milvus/internal/util/logserviceutil/options" +) + +type ReadOption struct { + Name string + DeliverPolicy options.DeliverPolicy +} + +// ScannerImpls is the interface for reading records from the wal. +type ScannerImpls interface { + // Name returns the name of scanner. + Name() string + + // Chan returns the channel of message. + Chan() <-chan message.ImmutableMessage + + // Error returns the error of scanner failed. + // Will block until scanner is closed or Chan is dry out. + Error() error + + // Done returns a channel which will be closed when scanner is finished or closed. + Done() <-chan struct{} + + // Close the scanner, release the underlying resources. + // Return the error same with `Error` + Close() error +} diff --git a/internal/lognode/server/wal/walimpls/test_framework.go b/internal/lognode/server/wal/walimpls/test_framework.go new file mode 100644 index 000000000000..83d439da0e94 --- /dev/null +++ b/internal/lognode/server/wal/walimpls/test_framework.go @@ -0,0 +1,259 @@ +//go:build test +// +build test + +package walimpls + +import ( + "context" + "fmt" + "math/rand" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/remeh/sizedwaitgroup" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/logpb" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + "github.com/milvus-io/milvus/internal/util/logserviceutil/options" +) + +var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randString(l int) string { + builder := strings.Builder{} + for i := 0; i < l; i++ { + builder.WriteRune(letters[rand.Intn(len(letters))]) + } + return builder.String() +} + +type walImplsTestFramework struct { + b OpenerBuilderImpls + t *testing.T + messageCount int +} + +func NewWALImplsTestFramework(t *testing.T, messageCount int, b OpenerBuilderImpls) *walImplsTestFramework { + return &walImplsTestFramework{ + b: b, + t: t, + messageCount: messageCount, + } +} + +// Run runs the test framework. +// if test failed, a error will be returned. +func (f walImplsTestFramework) Run() { + // create opener. + o, err := f.b.Build() + assert.NoError(f.t, err) + assert.NotNil(f.t, o) + defer o.Close() + + // construct pChannel + name := "test_" + randString(4) + pChannel := &logpb.PChannelInfo{ + Name: name, + Term: 1, + ServerID: 1, + VChannelInfos: []*logpb.VChannelInfo{}, + } + ctx := context.Background() + + // create a wal. + w, err := o.Open(ctx, &OpenOption{ + Channel: pChannel, + }) + assert.NoError(f.t, err) + assert.NotNil(f.t, w) + defer w.Close() + + f.testReadAndWrite(ctx, w) +} + +func (f walImplsTestFramework) testReadAndWrite(ctx context.Context, w WALImpls) { + // Test read and write. + wg := sync.WaitGroup{} + wg.Add(3) + + var written []message.ImmutableMessage + var read1, read2 []message.ImmutableMessage + go func() { + defer wg.Done() + var err error + written, err = f.testAppend(ctx, w) + assert.NoError(f.t, err) + }() + go func() { + defer wg.Done() + var err error + read1, err = f.testRead(ctx, w, "scanner1") + assert.NoError(f.t, err) + }() + go func() { + defer wg.Done() + var err error + read2, err = f.testRead(ctx, w, "scanner2") + assert.NoError(f.t, err) + }() + + wg.Wait() + + f.assertSortedMessageList(read1) + f.assertSortedMessageList(read2) + sort.Sort(sortByMessageID(written)) + f.assertEqualMessageList(written, read1) + f.assertEqualMessageList(written, read2) + + // Test different scan policy, StartFrom. + readFromIdx := len(read1) / 2 + readFromMsgID := read1[readFromIdx].MessageID() + s, err := w.Read(ctx, ReadOption{ + Name: "scanner_deliver_start_from", + DeliverPolicy: options.DeliverPolicyStartFrom(readFromMsgID), + }) + assert.NoError(f.t, err) + for i := readFromIdx; i < len(read1); i++ { + msg, ok := <-s.Chan() + assert.NotNil(f.t, msg) + assert.True(f.t, ok) + assert.True(f.t, msg.MessageID().EQ(read1[i].MessageID())) + } + s.Close() + + // Test different scan policy, StartAfter. + s, err = w.Read(ctx, ReadOption{ + Name: "scanner_deliver_start_after", + DeliverPolicy: options.DeliverPolicyStartAfter(readFromMsgID), + }) + assert.NoError(f.t, err) + for i := readFromIdx + 1; i < len(read1); i++ { + msg, ok := <-s.Chan() + assert.NotNil(f.t, msg) + assert.True(f.t, ok) + assert.True(f.t, msg.MessageID().EQ(read1[i].MessageID())) + } + s.Close() + + // Test different scan policy, Latest. + s, err = w.Read(ctx, ReadOption{ + Name: "scanner_deliver_latest", + DeliverPolicy: options.DeliverPolicyLatest(), + }) + assert.NoError(f.t, err) + timeoutCh := time.After(1 * time.Second) + select { + case <-s.Chan(): + f.t.Errorf("should be blocked") + case <-timeoutCh: + } + s.Close() +} + +func (f walImplsTestFramework) assertSortedMessageList(msgs []message.ImmutableMessage) { + for i := 1; i < len(msgs); i++ { + assert.True(f.t, msgs[i-1].MessageID().LT(msgs[i].MessageID())) + } +} + +func (f walImplsTestFramework) assertEqualMessageList(msgs1 []message.ImmutableMessage, msgs2 []message.ImmutableMessage) { + assert.Equal(f.t, f.messageCount, len(msgs1)) + assert.Equal(f.t, f.messageCount, len(msgs2)) + for i := 0; i < len(msgs1); i++ { + assert.True(f.t, msgs1[i].MessageID().EQ(msgs2[i].MessageID())) + // assert.True(f.t, bytes.Equal(msgs1[i].Payload(), msgs2[i].Payload())) + id1, ok1 := msgs1[i].Properties().Get("id") + id2, ok2 := msgs2[i].Properties().Get("id") + assert.True(f.t, ok1) + assert.True(f.t, ok2) + assert.Equal(f.t, id1, id2) + id1, ok1 = msgs1[i].Properties().Get("const") + id2, ok2 = msgs2[i].Properties().Get("const") + assert.True(f.t, ok1) + assert.True(f.t, ok2) + assert.Equal(f.t, id1, id2) + } +} + +func (f walImplsTestFramework) testAppend(ctx context.Context, w WALImpls) ([]message.ImmutableMessage, error) { + ids := make([]message.ImmutableMessage, f.messageCount) + swg := sizedwaitgroup.New(5) + for i := 0; i < f.messageCount; i++ { + swg.Add() + go func(i int) { + defer swg.Done() + // ...rocksmq has a dirty implement of properties, + // without commonpb.MsgHeader, it can not work. + header := commonpb.MsgHeader{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Insert, + MsgID: int64(i), + }, + } + payload, err := proto.Marshal(&header) + if err != nil { + panic(err) + } + properties := map[string]string{ + "id": fmt.Sprintf("%d", i), + "const": "t", + } + typ := message.MessageTypeUnknown + msg := message.NewBuilder(). + WithPayload(payload). + WithProperties(properties). + WithMessageType(typ). + BuildMutable() + id, err := w.Append(ctx, msg) + assert.NoError(f.t, err) + assert.NotNil(f.t, id) + ids[i] = message.NewBuilder(). + WithPayload(payload). + WithProperties(properties). + WithMessageID(id). + WithMessageType(typ). + BuildImmutable() + }(i) + } + swg.Wait() + return ids, nil +} + +func (f walImplsTestFramework) testRead(ctx context.Context, w WALImpls, name string) ([]message.ImmutableMessage, error) { + s, err := w.Read(ctx, ReadOption{ + Name: name, + DeliverPolicy: options.DeliverPolicyAll(), + }) + assert.NoError(f.t, err) + assert.Equal(f.t, name, s.Name()) + defer s.Close() + + msgs := make([]message.ImmutableMessage, 0, f.messageCount) + for i := 0; i < f.messageCount; i++ { + msg, ok := <-s.Chan() + assert.NotNil(f.t, msg) + assert.True(f.t, ok) + msgs = append(msgs, msg) + } + return msgs, nil +} + +type sortByMessageID []message.ImmutableMessage + +func (a sortByMessageID) Len() int { + return len(a) +} + +func (a sortByMessageID) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a sortByMessageID) Less(i, j int) bool { + return a[i].MessageID().LT(a[j].MessageID()) +} diff --git a/internal/lognode/server/wal/walimpls/wal.go b/internal/lognode/server/wal/walimpls/wal.go new file mode 100644 index 000000000000..bdb4220e02e6 --- /dev/null +++ b/internal/lognode/server/wal/walimpls/wal.go @@ -0,0 +1,23 @@ +package walimpls + +import ( + "context" + + "github.com/milvus-io/milvus/internal/proto/logpb" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" +) + +type WALImpls interface { + // Channel returns the channel assignment info of the wal. + // Should be read-only. + Channel() *logpb.PChannelInfo + + // Append writes a record to the log. + Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) + + // Read returns a scanner for reading records from the wal. + Read(ctx context.Context, opts ReadOption) (ScannerImpls, error) + + // Close closes the wal instance. + Close() +} diff --git a/internal/logservice/.mockery.yaml b/internal/logservice/.mockery.yaml index 22c293d1556a..9a5c825a2692 100644 --- a/internal/logservice/.mockery.yaml +++ b/internal/logservice/.mockery.yaml @@ -10,4 +10,19 @@ packages: MessageID: ImmutableMessage: MutableMessage: - RProperties: \ No newline at end of file + RProperties: + github.com/milvus-io/milvus/internal/lognode/server/wal: + interfaces: + OpenerBuilder: + Opener: + Scanner: + WAL: + github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls: + interfaces: + OpenerBuilderImpls: + OpenerImpls: + ScannerImpls: + WALImpls: + Interceptor: + InterceptorWithReady: + InterceptorBuilder: \ No newline at end of file diff --git a/internal/mocks/lognode/server/mock_wal/mock_Opener.go b/internal/mocks/lognode/server/mock_wal/mock_Opener.go new file mode 100644 index 000000000000..320caa705e1c --- /dev/null +++ b/internal/mocks/lognode/server/mock_wal/mock_Opener.go @@ -0,0 +1,124 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_wal + +import ( + context "context" + + wal "github.com/milvus-io/milvus/internal/lognode/server/wal" + mock "github.com/stretchr/testify/mock" +) + +// MockOpener is an autogenerated mock type for the Opener type +type MockOpener struct { + mock.Mock +} + +type MockOpener_Expecter struct { + mock *mock.Mock +} + +func (_m *MockOpener) EXPECT() *MockOpener_Expecter { + return &MockOpener_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *MockOpener) Close() { + _m.Called() +} + +// MockOpener_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockOpener_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockOpener_Expecter) Close() *MockOpener_Close_Call { + return &MockOpener_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockOpener_Close_Call) Run(run func()) *MockOpener_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpener_Close_Call) Return() *MockOpener_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockOpener_Close_Call) RunAndReturn(run func()) *MockOpener_Close_Call { + _c.Call.Return(run) + return _c +} + +// Open provides a mock function with given fields: ctx, opt +func (_m *MockOpener) Open(ctx context.Context, opt *wal.OpenOption) (wal.WAL, error) { + ret := _m.Called(ctx, opt) + + var r0 wal.WAL + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *wal.OpenOption) (wal.WAL, error)); ok { + return rf(ctx, opt) + } + if rf, ok := ret.Get(0).(func(context.Context, *wal.OpenOption) wal.WAL); ok { + r0 = rf(ctx, opt) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(wal.WAL) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *wal.OpenOption) error); ok { + r1 = rf(ctx, opt) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockOpener_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' +type MockOpener_Open_Call struct { + *mock.Call +} + +// Open is a helper method to define mock.On call +// - ctx context.Context +// - opt *wal.OpenOption +func (_e *MockOpener_Expecter) Open(ctx interface{}, opt interface{}) *MockOpener_Open_Call { + return &MockOpener_Open_Call{Call: _e.mock.On("Open", ctx, opt)} +} + +func (_c *MockOpener_Open_Call) Run(run func(ctx context.Context, opt *wal.OpenOption)) *MockOpener_Open_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*wal.OpenOption)) + }) + return _c +} + +func (_c *MockOpener_Open_Call) Return(_a0 wal.WAL, _a1 error) *MockOpener_Open_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockOpener_Open_Call) RunAndReturn(run func(context.Context, *wal.OpenOption) (wal.WAL, error)) *MockOpener_Open_Call { + _c.Call.Return(run) + return _c +} + +// NewMockOpener creates a new instance of MockOpener. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockOpener(t interface { + mock.TestingT + Cleanup(func()) +}) *MockOpener { + mock := &MockOpener{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/lognode/server/mock_wal/mock_OpenerBuilder.go b/internal/mocks/lognode/server/mock_wal/mock_OpenerBuilder.go new file mode 100644 index 000000000000..f55df30618c5 --- /dev/null +++ b/internal/mocks/lognode/server/mock_wal/mock_OpenerBuilder.go @@ -0,0 +1,129 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_wal + +import ( + wal "github.com/milvus-io/milvus/internal/lognode/server/wal" + mock "github.com/stretchr/testify/mock" +) + +// MockOpenerBuilder is an autogenerated mock type for the OpenerBuilder type +type MockOpenerBuilder struct { + mock.Mock +} + +type MockOpenerBuilder_Expecter struct { + mock *mock.Mock +} + +func (_m *MockOpenerBuilder) EXPECT() *MockOpenerBuilder_Expecter { + return &MockOpenerBuilder_Expecter{mock: &_m.Mock} +} + +// Build provides a mock function with given fields: +func (_m *MockOpenerBuilder) Build() (wal.Opener, error) { + ret := _m.Called() + + var r0 wal.Opener + var r1 error + if rf, ok := ret.Get(0).(func() (wal.Opener, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() wal.Opener); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(wal.Opener) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockOpenerBuilder_Build_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Build' +type MockOpenerBuilder_Build_Call struct { + *mock.Call +} + +// Build is a helper method to define mock.On call +func (_e *MockOpenerBuilder_Expecter) Build() *MockOpenerBuilder_Build_Call { + return &MockOpenerBuilder_Build_Call{Call: _e.mock.On("Build")} +} + +func (_c *MockOpenerBuilder_Build_Call) Run(run func()) *MockOpenerBuilder_Build_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpenerBuilder_Build_Call) Return(_a0 wal.Opener, _a1 error) *MockOpenerBuilder_Build_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockOpenerBuilder_Build_Call) RunAndReturn(run func() (wal.Opener, error)) *MockOpenerBuilder_Build_Call { + _c.Call.Return(run) + return _c +} + +// Name provides a mock function with given fields: +func (_m *MockOpenerBuilder) Name() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockOpenerBuilder_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name' +type MockOpenerBuilder_Name_Call struct { + *mock.Call +} + +// Name is a helper method to define mock.On call +func (_e *MockOpenerBuilder_Expecter) Name() *MockOpenerBuilder_Name_Call { + return &MockOpenerBuilder_Name_Call{Call: _e.mock.On("Name")} +} + +func (_c *MockOpenerBuilder_Name_Call) Run(run func()) *MockOpenerBuilder_Name_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpenerBuilder_Name_Call) Return(_a0 string) *MockOpenerBuilder_Name_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockOpenerBuilder_Name_Call) RunAndReturn(run func() string) *MockOpenerBuilder_Name_Call { + _c.Call.Return(run) + return _c +} + +// NewMockOpenerBuilder creates a new instance of MockOpenerBuilder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockOpenerBuilder(t interface { + mock.TestingT + Cleanup(func()) +}) *MockOpenerBuilder { + mock := &MockOpenerBuilder{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/lognode/server/mock_wal/mock_Scanner.go b/internal/mocks/lognode/server/mock_wal/mock_Scanner.go new file mode 100644 index 000000000000..886e937ac7d5 --- /dev/null +++ b/internal/mocks/lognode/server/mock_wal/mock_Scanner.go @@ -0,0 +1,203 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_wal + +import ( + message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockScanner is an autogenerated mock type for the Scanner type +type MockScanner struct { + mock.Mock +} + +type MockScanner_Expecter struct { + mock *mock.Mock +} + +func (_m *MockScanner) EXPECT() *MockScanner_Expecter { + return &MockScanner_Expecter{mock: &_m.Mock} +} + +// Chan provides a mock function with given fields: +func (_m *MockScanner) Chan() <-chan message.ImmutableMessage { + ret := _m.Called() + + var r0 <-chan message.ImmutableMessage + if rf, ok := ret.Get(0).(func() <-chan message.ImmutableMessage); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan message.ImmutableMessage) + } + } + + return r0 +} + +// MockScanner_Chan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Chan' +type MockScanner_Chan_Call struct { + *mock.Call +} + +// Chan is a helper method to define mock.On call +func (_e *MockScanner_Expecter) Chan() *MockScanner_Chan_Call { + return &MockScanner_Chan_Call{Call: _e.mock.On("Chan")} +} + +func (_c *MockScanner_Chan_Call) Run(run func()) *MockScanner_Chan_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScanner_Chan_Call) Return(_a0 <-chan message.ImmutableMessage) *MockScanner_Chan_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScanner_Chan_Call) RunAndReturn(run func() <-chan message.ImmutableMessage) *MockScanner_Chan_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockScanner) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockScanner_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockScanner_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockScanner_Expecter) Close() *MockScanner_Close_Call { + return &MockScanner_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockScanner_Close_Call) Run(run func()) *MockScanner_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScanner_Close_Call) Return(_a0 error) *MockScanner_Close_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScanner_Close_Call) RunAndReturn(run func() error) *MockScanner_Close_Call { + _c.Call.Return(run) + return _c +} + +// Done provides a mock function with given fields: +func (_m *MockScanner) Done() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// MockScanner_Done_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Done' +type MockScanner_Done_Call struct { + *mock.Call +} + +// Done is a helper method to define mock.On call +func (_e *MockScanner_Expecter) Done() *MockScanner_Done_Call { + return &MockScanner_Done_Call{Call: _e.mock.On("Done")} +} + +func (_c *MockScanner_Done_Call) Run(run func()) *MockScanner_Done_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScanner_Done_Call) Return(_a0 <-chan struct{}) *MockScanner_Done_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScanner_Done_Call) RunAndReturn(run func() <-chan struct{}) *MockScanner_Done_Call { + _c.Call.Return(run) + return _c +} + +// Error provides a mock function with given fields: +func (_m *MockScanner) Error() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockScanner_Error_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Error' +type MockScanner_Error_Call struct { + *mock.Call +} + +// Error is a helper method to define mock.On call +func (_e *MockScanner_Expecter) Error() *MockScanner_Error_Call { + return &MockScanner_Error_Call{Call: _e.mock.On("Error")} +} + +func (_c *MockScanner_Error_Call) Run(run func()) *MockScanner_Error_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScanner_Error_Call) Return(_a0 error) *MockScanner_Error_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScanner_Error_Call) RunAndReturn(run func() error) *MockScanner_Error_Call { + _c.Call.Return(run) + return _c +} + +// NewMockScanner creates a new instance of MockScanner. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockScanner(t interface { + mock.TestingT + Cleanup(func()) +}) *MockScanner { + mock := &MockScanner{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/lognode/server/mock_wal/mock_WAL.go b/internal/mocks/lognode/server/mock_wal/mock_WAL.go new file mode 100644 index 000000000000..58d625476bdf --- /dev/null +++ b/internal/mocks/lognode/server/mock_wal/mock_WAL.go @@ -0,0 +1,261 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_wal + +import ( + context "context" + + logpb "github.com/milvus-io/milvus/internal/proto/logpb" + message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + + mock "github.com/stretchr/testify/mock" + + wal "github.com/milvus-io/milvus/internal/lognode/server/wal" +) + +// MockWAL is an autogenerated mock type for the WAL type +type MockWAL struct { + mock.Mock +} + +type MockWAL_Expecter struct { + mock *mock.Mock +} + +func (_m *MockWAL) EXPECT() *MockWAL_Expecter { + return &MockWAL_Expecter{mock: &_m.Mock} +} + +// Append provides a mock function with given fields: ctx, msg +func (_m *MockWAL) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + ret := _m.Called(ctx, msg) + + var r0 message.MessageID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (message.MessageID, error)); ok { + return rf(ctx, msg) + } + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) message.MessageID); ok { + r0 = rf(ctx, msg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MessageID) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage) error); ok { + r1 = rf(ctx, msg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWAL_Append_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Append' +type MockWAL_Append_Call struct { + *mock.Call +} + +// Append is a helper method to define mock.On call +// - ctx context.Context +// - msg message.MutableMessage +func (_e *MockWAL_Expecter) Append(ctx interface{}, msg interface{}) *MockWAL_Append_Call { + return &MockWAL_Append_Call{Call: _e.mock.On("Append", ctx, msg)} +} + +func (_c *MockWAL_Append_Call) Run(run func(ctx context.Context, msg message.MutableMessage)) *MockWAL_Append_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MutableMessage)) + }) + return _c +} + +func (_c *MockWAL_Append_Call) Return(_a0 message.MessageID, _a1 error) *MockWAL_Append_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWAL_Append_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (message.MessageID, error)) *MockWAL_Append_Call { + _c.Call.Return(run) + return _c +} + +// AppendAsync provides a mock function with given fields: ctx, msg, cb +func (_m *MockWAL) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error)) { + _m.Called(ctx, msg, cb) +} + +// MockWAL_AppendAsync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AppendAsync' +type MockWAL_AppendAsync_Call struct { + *mock.Call +} + +// AppendAsync is a helper method to define mock.On call +// - ctx context.Context +// - msg message.MutableMessage +// - cb func(message.MessageID , error) +func (_e *MockWAL_Expecter) AppendAsync(ctx interface{}, msg interface{}, cb interface{}) *MockWAL_AppendAsync_Call { + return &MockWAL_AppendAsync_Call{Call: _e.mock.On("AppendAsync", ctx, msg, cb)} +} + +func (_c *MockWAL_AppendAsync_Call) Run(run func(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error))) *MockWAL_AppendAsync_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(message.MessageID, error))) + }) + return _c +} + +func (_c *MockWAL_AppendAsync_Call) Return() *MockWAL_AppendAsync_Call { + _c.Call.Return() + return _c +} + +func (_c *MockWAL_AppendAsync_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(message.MessageID, error))) *MockWAL_AppendAsync_Call { + _c.Call.Return(run) + return _c +} + +// Channel provides a mock function with given fields: +func (_m *MockWAL) Channel() *logpb.PChannelInfo { + ret := _m.Called() + + var r0 *logpb.PChannelInfo + if rf, ok := ret.Get(0).(func() *logpb.PChannelInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*logpb.PChannelInfo) + } + } + + return r0 +} + +// MockWAL_Channel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Channel' +type MockWAL_Channel_Call struct { + *mock.Call +} + +// Channel is a helper method to define mock.On call +func (_e *MockWAL_Expecter) Channel() *MockWAL_Channel_Call { + return &MockWAL_Channel_Call{Call: _e.mock.On("Channel")} +} + +func (_c *MockWAL_Channel_Call) Run(run func()) *MockWAL_Channel_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWAL_Channel_Call) Return(_a0 *logpb.PChannelInfo) *MockWAL_Channel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWAL_Channel_Call) RunAndReturn(run func() *logpb.PChannelInfo) *MockWAL_Channel_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockWAL) Close() { + _m.Called() +} + +// MockWAL_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockWAL_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockWAL_Expecter) Close() *MockWAL_Close_Call { + return &MockWAL_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockWAL_Close_Call) Run(run func()) *MockWAL_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWAL_Close_Call) Return() *MockWAL_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockWAL_Close_Call) RunAndReturn(run func()) *MockWAL_Close_Call { + _c.Call.Return(run) + return _c +} + +// Read provides a mock function with given fields: ctx, deliverPolicy +func (_m *MockWAL) Read(ctx context.Context, deliverPolicy wal.ReadOption) (wal.Scanner, error) { + ret := _m.Called(ctx, deliverPolicy) + + var r0 wal.Scanner + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, wal.ReadOption) (wal.Scanner, error)); ok { + return rf(ctx, deliverPolicy) + } + if rf, ok := ret.Get(0).(func(context.Context, wal.ReadOption) wal.Scanner); ok { + r0 = rf(ctx, deliverPolicy) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(wal.Scanner) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, wal.ReadOption) error); ok { + r1 = rf(ctx, deliverPolicy) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWAL_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read' +type MockWAL_Read_Call struct { + *mock.Call +} + +// Read is a helper method to define mock.On call +// - ctx context.Context +// - deliverPolicy wal.ReadOption +func (_e *MockWAL_Expecter) Read(ctx interface{}, deliverPolicy interface{}) *MockWAL_Read_Call { + return &MockWAL_Read_Call{Call: _e.mock.On("Read", ctx, deliverPolicy)} +} + +func (_c *MockWAL_Read_Call) Run(run func(ctx context.Context, deliverPolicy wal.ReadOption)) *MockWAL_Read_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(wal.ReadOption)) + }) + return _c +} + +func (_c *MockWAL_Read_Call) Return(_a0 wal.Scanner, _a1 error) *MockWAL_Read_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWAL_Read_Call) RunAndReturn(run func(context.Context, wal.ReadOption) (wal.Scanner, error)) *MockWAL_Read_Call { + _c.Call.Return(run) + return _c +} + +// NewMockWAL creates a new instance of MockWAL. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockWAL(t interface { + mock.TestingT + Cleanup(func()) +}) *MockWAL { + mock := &MockWAL{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/lognode/server/wal/mock_walimpls/mock_Interceptor.go b/internal/mocks/lognode/server/wal/mock_walimpls/mock_Interceptor.go new file mode 100644 index 000000000000..f80286cc898f --- /dev/null +++ b/internal/mocks/lognode/server/wal/mock_walimpls/mock_Interceptor.go @@ -0,0 +1,125 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + context "context" + + message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockInterceptor is an autogenerated mock type for the Interceptor type +type MockInterceptor struct { + mock.Mock +} + +type MockInterceptor_Expecter struct { + mock *mock.Mock +} + +func (_m *MockInterceptor) EXPECT() *MockInterceptor_Expecter { + return &MockInterceptor_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *MockInterceptor) Close() { + _m.Called() +} + +// MockInterceptor_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockInterceptor_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockInterceptor_Expecter) Close() *MockInterceptor_Close_Call { + return &MockInterceptor_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockInterceptor_Close_Call) Run(run func()) *MockInterceptor_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockInterceptor_Close_Call) Return() *MockInterceptor_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockInterceptor_Close_Call) RunAndReturn(run func()) *MockInterceptor_Close_Call { + _c.Call.Return(run) + return _c +} + +// DoAppend provides a mock function with given fields: ctx, msg, append +func (_m *MockInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) { + ret := _m.Called(ctx, msg, append) + + var r0 message.MessageID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)); ok { + return rf(ctx, msg, append) + } + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) message.MessageID); ok { + r0 = rf(ctx, msg, append) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MessageID) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) error); ok { + r1 = rf(ctx, msg, append) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockInterceptor_DoAppend_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DoAppend' +type MockInterceptor_DoAppend_Call struct { + *mock.Call +} + +// DoAppend is a helper method to define mock.On call +// - ctx context.Context +// - msg message.MutableMessage +// - append func(context.Context , message.MutableMessage)(message.MessageID , error) +func (_e *MockInterceptor_Expecter) DoAppend(ctx interface{}, msg interface{}, append interface{}) *MockInterceptor_DoAppend_Call { + return &MockInterceptor_DoAppend_Call{Call: _e.mock.On("DoAppend", ctx, msg, append)} +} + +func (_c *MockInterceptor_DoAppend_Call) Run(run func(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error))) *MockInterceptor_DoAppend_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(context.Context, message.MutableMessage) (message.MessageID, error))) + }) + return _c +} + +func (_c *MockInterceptor_DoAppend_Call) Return(_a0 message.MessageID, _a1 error) *MockInterceptor_DoAppend_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockInterceptor_DoAppend_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)) *MockInterceptor_DoAppend_Call { + _c.Call.Return(run) + return _c +} + +// NewMockInterceptor creates a new instance of MockInterceptor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockInterceptor(t interface { + mock.TestingT + Cleanup(func()) +}) *MockInterceptor { + mock := &MockInterceptor{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/lognode/server/wal/mock_walimpls/mock_InterceptorBuilder.go b/internal/mocks/lognode/server/wal/mock_walimpls/mock_InterceptorBuilder.go new file mode 100644 index 000000000000..1b8c14c2c8a1 --- /dev/null +++ b/internal/mocks/lognode/server/wal/mock_walimpls/mock_InterceptorBuilder.go @@ -0,0 +1,79 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + walimpls "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" + mock "github.com/stretchr/testify/mock" +) + +// MockInterceptorBuilder is an autogenerated mock type for the InterceptorBuilder type +type MockInterceptorBuilder struct { + mock.Mock +} + +type MockInterceptorBuilder_Expecter struct { + mock *mock.Mock +} + +func (_m *MockInterceptorBuilder) EXPECT() *MockInterceptorBuilder_Expecter { + return &MockInterceptorBuilder_Expecter{mock: &_m.Mock} +} + +// Build provides a mock function with given fields: wal +func (_m *MockInterceptorBuilder) Build(wal <-chan walimpls.WALImpls) walimpls.BasicInterceptor { + ret := _m.Called(wal) + + var r0 walimpls.BasicInterceptor + if rf, ok := ret.Get(0).(func(<-chan walimpls.WALImpls) walimpls.BasicInterceptor); ok { + r0 = rf(wal) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(walimpls.BasicInterceptor) + } + } + + return r0 +} + +// MockInterceptorBuilder_Build_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Build' +type MockInterceptorBuilder_Build_Call struct { + *mock.Call +} + +// Build is a helper method to define mock.On call +// - wal <-chan walimpls.WALImpls +func (_e *MockInterceptorBuilder_Expecter) Build(wal interface{}) *MockInterceptorBuilder_Build_Call { + return &MockInterceptorBuilder_Build_Call{Call: _e.mock.On("Build", wal)} +} + +func (_c *MockInterceptorBuilder_Build_Call) Run(run func(wal <-chan walimpls.WALImpls)) *MockInterceptorBuilder_Build_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(<-chan walimpls.WALImpls)) + }) + return _c +} + +func (_c *MockInterceptorBuilder_Build_Call) Return(_a0 walimpls.BasicInterceptor) *MockInterceptorBuilder_Build_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockInterceptorBuilder_Build_Call) RunAndReturn(run func(<-chan walimpls.WALImpls) walimpls.BasicInterceptor) *MockInterceptorBuilder_Build_Call { + _c.Call.Return(run) + return _c +} + +// NewMockInterceptorBuilder creates a new instance of MockInterceptorBuilder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockInterceptorBuilder(t interface { + mock.TestingT + Cleanup(func()) +}) *MockInterceptorBuilder { + mock := &MockInterceptorBuilder{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/lognode/server/wal/mock_walimpls/mock_InterceptorWithReady.go b/internal/mocks/lognode/server/wal/mock_walimpls/mock_InterceptorWithReady.go new file mode 100644 index 000000000000..915a67e0e897 --- /dev/null +++ b/internal/mocks/lognode/server/wal/mock_walimpls/mock_InterceptorWithReady.go @@ -0,0 +1,168 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + context "context" + + message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockInterceptorWithReady is an autogenerated mock type for the InterceptorWithReady type +type MockInterceptorWithReady struct { + mock.Mock +} + +type MockInterceptorWithReady_Expecter struct { + mock *mock.Mock +} + +func (_m *MockInterceptorWithReady) EXPECT() *MockInterceptorWithReady_Expecter { + return &MockInterceptorWithReady_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *MockInterceptorWithReady) Close() { + _m.Called() +} + +// MockInterceptorWithReady_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockInterceptorWithReady_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockInterceptorWithReady_Expecter) Close() *MockInterceptorWithReady_Close_Call { + return &MockInterceptorWithReady_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockInterceptorWithReady_Close_Call) Run(run func()) *MockInterceptorWithReady_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockInterceptorWithReady_Close_Call) Return() *MockInterceptorWithReady_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockInterceptorWithReady_Close_Call) RunAndReturn(run func()) *MockInterceptorWithReady_Close_Call { + _c.Call.Return(run) + return _c +} + +// DoAppend provides a mock function with given fields: ctx, msg, append +func (_m *MockInterceptorWithReady) DoAppend(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) { + ret := _m.Called(ctx, msg, append) + + var r0 message.MessageID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)); ok { + return rf(ctx, msg, append) + } + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) message.MessageID); ok { + r0 = rf(ctx, msg, append) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MessageID) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) error); ok { + r1 = rf(ctx, msg, append) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockInterceptorWithReady_DoAppend_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DoAppend' +type MockInterceptorWithReady_DoAppend_Call struct { + *mock.Call +} + +// DoAppend is a helper method to define mock.On call +// - ctx context.Context +// - msg message.MutableMessage +// - append func(context.Context , message.MutableMessage)(message.MessageID , error) +func (_e *MockInterceptorWithReady_Expecter) DoAppend(ctx interface{}, msg interface{}, append interface{}) *MockInterceptorWithReady_DoAppend_Call { + return &MockInterceptorWithReady_DoAppend_Call{Call: _e.mock.On("DoAppend", ctx, msg, append)} +} + +func (_c *MockInterceptorWithReady_DoAppend_Call) Run(run func(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error))) *MockInterceptorWithReady_DoAppend_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(context.Context, message.MutableMessage) (message.MessageID, error))) + }) + return _c +} + +func (_c *MockInterceptorWithReady_DoAppend_Call) Return(_a0 message.MessageID, _a1 error) *MockInterceptorWithReady_DoAppend_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockInterceptorWithReady_DoAppend_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)) *MockInterceptorWithReady_DoAppend_Call { + _c.Call.Return(run) + return _c +} + +// Ready provides a mock function with given fields: +func (_m *MockInterceptorWithReady) Ready() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// MockInterceptorWithReady_Ready_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ready' +type MockInterceptorWithReady_Ready_Call struct { + *mock.Call +} + +// Ready is a helper method to define mock.On call +func (_e *MockInterceptorWithReady_Expecter) Ready() *MockInterceptorWithReady_Ready_Call { + return &MockInterceptorWithReady_Ready_Call{Call: _e.mock.On("Ready")} +} + +func (_c *MockInterceptorWithReady_Ready_Call) Run(run func()) *MockInterceptorWithReady_Ready_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockInterceptorWithReady_Ready_Call) Return(_a0 <-chan struct{}) *MockInterceptorWithReady_Ready_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockInterceptorWithReady_Ready_Call) RunAndReturn(run func() <-chan struct{}) *MockInterceptorWithReady_Ready_Call { + _c.Call.Return(run) + return _c +} + +// NewMockInterceptorWithReady creates a new instance of MockInterceptorWithReady. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockInterceptorWithReady(t interface { + mock.TestingT + Cleanup(func()) +}) *MockInterceptorWithReady { + mock := &MockInterceptorWithReady{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/lognode/server/wal/mock_walimpls/mock_OpenerBuilderImpls.go b/internal/mocks/lognode/server/wal/mock_walimpls/mock_OpenerBuilderImpls.go new file mode 100644 index 000000000000..04921fbbc280 --- /dev/null +++ b/internal/mocks/lognode/server/wal/mock_walimpls/mock_OpenerBuilderImpls.go @@ -0,0 +1,129 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + walimpls "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" + mock "github.com/stretchr/testify/mock" +) + +// MockOpenerBuilderImpls is an autogenerated mock type for the OpenerBuilderImpls type +type MockOpenerBuilderImpls struct { + mock.Mock +} + +type MockOpenerBuilderImpls_Expecter struct { + mock *mock.Mock +} + +func (_m *MockOpenerBuilderImpls) EXPECT() *MockOpenerBuilderImpls_Expecter { + return &MockOpenerBuilderImpls_Expecter{mock: &_m.Mock} +} + +// Build provides a mock function with given fields: +func (_m *MockOpenerBuilderImpls) Build() (walimpls.OpenerImpls, error) { + ret := _m.Called() + + var r0 walimpls.OpenerImpls + var r1 error + if rf, ok := ret.Get(0).(func() (walimpls.OpenerImpls, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() walimpls.OpenerImpls); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(walimpls.OpenerImpls) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockOpenerBuilderImpls_Build_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Build' +type MockOpenerBuilderImpls_Build_Call struct { + *mock.Call +} + +// Build is a helper method to define mock.On call +func (_e *MockOpenerBuilderImpls_Expecter) Build() *MockOpenerBuilderImpls_Build_Call { + return &MockOpenerBuilderImpls_Build_Call{Call: _e.mock.On("Build")} +} + +func (_c *MockOpenerBuilderImpls_Build_Call) Run(run func()) *MockOpenerBuilderImpls_Build_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpenerBuilderImpls_Build_Call) Return(_a0 walimpls.OpenerImpls, _a1 error) *MockOpenerBuilderImpls_Build_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockOpenerBuilderImpls_Build_Call) RunAndReturn(run func() (walimpls.OpenerImpls, error)) *MockOpenerBuilderImpls_Build_Call { + _c.Call.Return(run) + return _c +} + +// Name provides a mock function with given fields: +func (_m *MockOpenerBuilderImpls) Name() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockOpenerBuilderImpls_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name' +type MockOpenerBuilderImpls_Name_Call struct { + *mock.Call +} + +// Name is a helper method to define mock.On call +func (_e *MockOpenerBuilderImpls_Expecter) Name() *MockOpenerBuilderImpls_Name_Call { + return &MockOpenerBuilderImpls_Name_Call{Call: _e.mock.On("Name")} +} + +func (_c *MockOpenerBuilderImpls_Name_Call) Run(run func()) *MockOpenerBuilderImpls_Name_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpenerBuilderImpls_Name_Call) Return(_a0 string) *MockOpenerBuilderImpls_Name_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockOpenerBuilderImpls_Name_Call) RunAndReturn(run func() string) *MockOpenerBuilderImpls_Name_Call { + _c.Call.Return(run) + return _c +} + +// NewMockOpenerBuilderImpls creates a new instance of MockOpenerBuilderImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockOpenerBuilderImpls(t interface { + mock.TestingT + Cleanup(func()) +}) *MockOpenerBuilderImpls { + mock := &MockOpenerBuilderImpls{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/lognode/server/wal/mock_walimpls/mock_OpenerImpls.go b/internal/mocks/lognode/server/wal/mock_walimpls/mock_OpenerImpls.go new file mode 100644 index 000000000000..662197e920b1 --- /dev/null +++ b/internal/mocks/lognode/server/wal/mock_walimpls/mock_OpenerImpls.go @@ -0,0 +1,124 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + context "context" + + walimpls "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" + mock "github.com/stretchr/testify/mock" +) + +// MockOpenerImpls is an autogenerated mock type for the OpenerImpls type +type MockOpenerImpls struct { + mock.Mock +} + +type MockOpenerImpls_Expecter struct { + mock *mock.Mock +} + +func (_m *MockOpenerImpls) EXPECT() *MockOpenerImpls_Expecter { + return &MockOpenerImpls_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *MockOpenerImpls) Close() { + _m.Called() +} + +// MockOpenerImpls_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockOpenerImpls_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockOpenerImpls_Expecter) Close() *MockOpenerImpls_Close_Call { + return &MockOpenerImpls_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockOpenerImpls_Close_Call) Run(run func()) *MockOpenerImpls_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockOpenerImpls_Close_Call) Return() *MockOpenerImpls_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockOpenerImpls_Close_Call) RunAndReturn(run func()) *MockOpenerImpls_Close_Call { + _c.Call.Return(run) + return _c +} + +// Open provides a mock function with given fields: ctx, opt +func (_m *MockOpenerImpls) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) { + ret := _m.Called(ctx, opt) + + var r0 walimpls.WALImpls + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *walimpls.OpenOption) (walimpls.WALImpls, error)); ok { + return rf(ctx, opt) + } + if rf, ok := ret.Get(0).(func(context.Context, *walimpls.OpenOption) walimpls.WALImpls); ok { + r0 = rf(ctx, opt) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(walimpls.WALImpls) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *walimpls.OpenOption) error); ok { + r1 = rf(ctx, opt) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockOpenerImpls_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' +type MockOpenerImpls_Open_Call struct { + *mock.Call +} + +// Open is a helper method to define mock.On call +// - ctx context.Context +// - opt *walimpls.OpenOption +func (_e *MockOpenerImpls_Expecter) Open(ctx interface{}, opt interface{}) *MockOpenerImpls_Open_Call { + return &MockOpenerImpls_Open_Call{Call: _e.mock.On("Open", ctx, opt)} +} + +func (_c *MockOpenerImpls_Open_Call) Run(run func(ctx context.Context, opt *walimpls.OpenOption)) *MockOpenerImpls_Open_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*walimpls.OpenOption)) + }) + return _c +} + +func (_c *MockOpenerImpls_Open_Call) Return(_a0 walimpls.WALImpls, _a1 error) *MockOpenerImpls_Open_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockOpenerImpls_Open_Call) RunAndReturn(run func(context.Context, *walimpls.OpenOption) (walimpls.WALImpls, error)) *MockOpenerImpls_Open_Call { + _c.Call.Return(run) + return _c +} + +// NewMockOpenerImpls creates a new instance of MockOpenerImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockOpenerImpls(t interface { + mock.TestingT + Cleanup(func()) +}) *MockOpenerImpls { + mock := &MockOpenerImpls{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/lognode/server/wal/mock_walimpls/mock_ScannerImpls.go b/internal/mocks/lognode/server/wal/mock_walimpls/mock_ScannerImpls.go new file mode 100644 index 000000000000..190c7c3b7c1a --- /dev/null +++ b/internal/mocks/lognode/server/wal/mock_walimpls/mock_ScannerImpls.go @@ -0,0 +1,244 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + mock "github.com/stretchr/testify/mock" +) + +// MockScannerImpls is an autogenerated mock type for the ScannerImpls type +type MockScannerImpls struct { + mock.Mock +} + +type MockScannerImpls_Expecter struct { + mock *mock.Mock +} + +func (_m *MockScannerImpls) EXPECT() *MockScannerImpls_Expecter { + return &MockScannerImpls_Expecter{mock: &_m.Mock} +} + +// Chan provides a mock function with given fields: +func (_m *MockScannerImpls) Chan() <-chan message.ImmutableMessage { + ret := _m.Called() + + var r0 <-chan message.ImmutableMessage + if rf, ok := ret.Get(0).(func() <-chan message.ImmutableMessage); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan message.ImmutableMessage) + } + } + + return r0 +} + +// MockScannerImpls_Chan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Chan' +type MockScannerImpls_Chan_Call struct { + *mock.Call +} + +// Chan is a helper method to define mock.On call +func (_e *MockScannerImpls_Expecter) Chan() *MockScannerImpls_Chan_Call { + return &MockScannerImpls_Chan_Call{Call: _e.mock.On("Chan")} +} + +func (_c *MockScannerImpls_Chan_Call) Run(run func()) *MockScannerImpls_Chan_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScannerImpls_Chan_Call) Return(_a0 <-chan message.ImmutableMessage) *MockScannerImpls_Chan_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScannerImpls_Chan_Call) RunAndReturn(run func() <-chan message.ImmutableMessage) *MockScannerImpls_Chan_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockScannerImpls) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockScannerImpls_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockScannerImpls_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockScannerImpls_Expecter) Close() *MockScannerImpls_Close_Call { + return &MockScannerImpls_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockScannerImpls_Close_Call) Run(run func()) *MockScannerImpls_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScannerImpls_Close_Call) Return(_a0 error) *MockScannerImpls_Close_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScannerImpls_Close_Call) RunAndReturn(run func() error) *MockScannerImpls_Close_Call { + _c.Call.Return(run) + return _c +} + +// Done provides a mock function with given fields: +func (_m *MockScannerImpls) Done() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// MockScannerImpls_Done_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Done' +type MockScannerImpls_Done_Call struct { + *mock.Call +} + +// Done is a helper method to define mock.On call +func (_e *MockScannerImpls_Expecter) Done() *MockScannerImpls_Done_Call { + return &MockScannerImpls_Done_Call{Call: _e.mock.On("Done")} +} + +func (_c *MockScannerImpls_Done_Call) Run(run func()) *MockScannerImpls_Done_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScannerImpls_Done_Call) Return(_a0 <-chan struct{}) *MockScannerImpls_Done_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScannerImpls_Done_Call) RunAndReturn(run func() <-chan struct{}) *MockScannerImpls_Done_Call { + _c.Call.Return(run) + return _c +} + +// Error provides a mock function with given fields: +func (_m *MockScannerImpls) Error() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockScannerImpls_Error_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Error' +type MockScannerImpls_Error_Call struct { + *mock.Call +} + +// Error is a helper method to define mock.On call +func (_e *MockScannerImpls_Expecter) Error() *MockScannerImpls_Error_Call { + return &MockScannerImpls_Error_Call{Call: _e.mock.On("Error")} +} + +func (_c *MockScannerImpls_Error_Call) Run(run func()) *MockScannerImpls_Error_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScannerImpls_Error_Call) Return(_a0 error) *MockScannerImpls_Error_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScannerImpls_Error_Call) RunAndReturn(run func() error) *MockScannerImpls_Error_Call { + _c.Call.Return(run) + return _c +} + +// Name provides a mock function with given fields: +func (_m *MockScannerImpls) Name() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockScannerImpls_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name' +type MockScannerImpls_Name_Call struct { + *mock.Call +} + +// Name is a helper method to define mock.On call +func (_e *MockScannerImpls_Expecter) Name() *MockScannerImpls_Name_Call { + return &MockScannerImpls_Name_Call{Call: _e.mock.On("Name")} +} + +func (_c *MockScannerImpls_Name_Call) Run(run func()) *MockScannerImpls_Name_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockScannerImpls_Name_Call) Return(_a0 string) *MockScannerImpls_Name_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScannerImpls_Name_Call) RunAndReturn(run func() string) *MockScannerImpls_Name_Call { + _c.Call.Return(run) + return _c +} + +// NewMockScannerImpls creates a new instance of MockScannerImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockScannerImpls(t interface { + mock.TestingT + Cleanup(func()) +}) *MockScannerImpls { + mock := &MockScannerImpls{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/lognode/server/wal/mock_walimpls/mock_WALImpls.go b/internal/mocks/lognode/server/wal/mock_walimpls/mock_WALImpls.go new file mode 100644 index 000000000000..c90b66dee258 --- /dev/null +++ b/internal/mocks/lognode/server/wal/mock_walimpls/mock_WALImpls.go @@ -0,0 +1,226 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_walimpls + +import ( + context "context" + + logpb "github.com/milvus-io/milvus/internal/proto/logpb" + message "github.com/milvus-io/milvus/internal/util/logserviceutil/message" + + mock "github.com/stretchr/testify/mock" + + walimpls "github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls" +) + +// MockWALImpls is an autogenerated mock type for the WALImpls type +type MockWALImpls struct { + mock.Mock +} + +type MockWALImpls_Expecter struct { + mock *mock.Mock +} + +func (_m *MockWALImpls) EXPECT() *MockWALImpls_Expecter { + return &MockWALImpls_Expecter{mock: &_m.Mock} +} + +// Append provides a mock function with given fields: ctx, msg +func (_m *MockWALImpls) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + ret := _m.Called(ctx, msg) + + var r0 message.MessageID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (message.MessageID, error)); ok { + return rf(ctx, msg) + } + if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) message.MessageID); ok { + r0 = rf(ctx, msg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MessageID) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage) error); ok { + r1 = rf(ctx, msg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWALImpls_Append_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Append' +type MockWALImpls_Append_Call struct { + *mock.Call +} + +// Append is a helper method to define mock.On call +// - ctx context.Context +// - msg message.MutableMessage +func (_e *MockWALImpls_Expecter) Append(ctx interface{}, msg interface{}) *MockWALImpls_Append_Call { + return &MockWALImpls_Append_Call{Call: _e.mock.On("Append", ctx, msg)} +} + +func (_c *MockWALImpls_Append_Call) Run(run func(ctx context.Context, msg message.MutableMessage)) *MockWALImpls_Append_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(message.MutableMessage)) + }) + return _c +} + +func (_c *MockWALImpls_Append_Call) Return(_a0 message.MessageID, _a1 error) *MockWALImpls_Append_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWALImpls_Append_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (message.MessageID, error)) *MockWALImpls_Append_Call { + _c.Call.Return(run) + return _c +} + +// Channel provides a mock function with given fields: +func (_m *MockWALImpls) Channel() *logpb.PChannelInfo { + ret := _m.Called() + + var r0 *logpb.PChannelInfo + if rf, ok := ret.Get(0).(func() *logpb.PChannelInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*logpb.PChannelInfo) + } + } + + return r0 +} + +// MockWALImpls_Channel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Channel' +type MockWALImpls_Channel_Call struct { + *mock.Call +} + +// Channel is a helper method to define mock.On call +func (_e *MockWALImpls_Expecter) Channel() *MockWALImpls_Channel_Call { + return &MockWALImpls_Channel_Call{Call: _e.mock.On("Channel")} +} + +func (_c *MockWALImpls_Channel_Call) Run(run func()) *MockWALImpls_Channel_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWALImpls_Channel_Call) Return(_a0 *logpb.PChannelInfo) *MockWALImpls_Channel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWALImpls_Channel_Call) RunAndReturn(run func() *logpb.PChannelInfo) *MockWALImpls_Channel_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockWALImpls) Close() { + _m.Called() +} + +// MockWALImpls_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockWALImpls_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockWALImpls_Expecter) Close() *MockWALImpls_Close_Call { + return &MockWALImpls_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockWALImpls_Close_Call) Run(run func()) *MockWALImpls_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWALImpls_Close_Call) Return() *MockWALImpls_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockWALImpls_Close_Call) RunAndReturn(run func()) *MockWALImpls_Close_Call { + _c.Call.Return(run) + return _c +} + +// Read provides a mock function with given fields: ctx, opts +func (_m *MockWALImpls) Read(ctx context.Context, opts walimpls.ReadOption) (walimpls.ScannerImpls, error) { + ret := _m.Called(ctx, opts) + + var r0 walimpls.ScannerImpls + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, walimpls.ReadOption) (walimpls.ScannerImpls, error)); ok { + return rf(ctx, opts) + } + if rf, ok := ret.Get(0).(func(context.Context, walimpls.ReadOption) walimpls.ScannerImpls); ok { + r0 = rf(ctx, opts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(walimpls.ScannerImpls) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, walimpls.ReadOption) error); ok { + r1 = rf(ctx, opts) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWALImpls_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read' +type MockWALImpls_Read_Call struct { + *mock.Call +} + +// Read is a helper method to define mock.On call +// - ctx context.Context +// - opts walimpls.ReadOption +func (_e *MockWALImpls_Expecter) Read(ctx interface{}, opts interface{}) *MockWALImpls_Read_Call { + return &MockWALImpls_Read_Call{Call: _e.mock.On("Read", ctx, opts)} +} + +func (_c *MockWALImpls_Read_Call) Run(run func(ctx context.Context, opts walimpls.ReadOption)) *MockWALImpls_Read_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(walimpls.ReadOption)) + }) + return _c +} + +func (_c *MockWALImpls_Read_Call) Return(_a0 walimpls.ScannerImpls, _a1 error) *MockWALImpls_Read_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWALImpls_Read_Call) RunAndReturn(run func(context.Context, walimpls.ReadOption) (walimpls.ScannerImpls, error)) *MockWALImpls_Read_Call { + _c.Call.Return(run) + return _c +} + +// NewMockWALImpls creates a new instance of MockWALImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockWALImpls(t interface { + mock.TestingT + Cleanup(func()) +}) *MockWALImpls { + mock := &MockWALImpls{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go b/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go index ab62e2a87d3c..190966c7783c 100644 --- a/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go +++ b/internal/mocks/util/logserviceutil/mock_message/mock_MutableMessage.go @@ -188,6 +188,50 @@ func (_c *MockMutableMessage_Properties_Call) RunAndReturn(run func() message.Pr return _c } +// WithLastConfirmed provides a mock function with given fields: id +func (_m *MockMutableMessage) WithLastConfirmed(id message.MessageID) message.MutableMessage { + ret := _m.Called(id) + + var r0 message.MutableMessage + if rf, ok := ret.Get(0).(func(message.MessageID) message.MutableMessage); ok { + r0 = rf(id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MutableMessage) + } + } + + return r0 +} + +// MockMutableMessage_WithLastConfirmed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WithLastConfirmed' +type MockMutableMessage_WithLastConfirmed_Call struct { + *mock.Call +} + +// WithLastConfirmed is a helper method to define mock.On call +// - id message.MessageID +func (_e *MockMutableMessage_Expecter) WithLastConfirmed(id interface{}) *MockMutableMessage_WithLastConfirmed_Call { + return &MockMutableMessage_WithLastConfirmed_Call{Call: _e.mock.On("WithLastConfirmed", id)} +} + +func (_c *MockMutableMessage_WithLastConfirmed_Call) Run(run func(id message.MessageID)) *MockMutableMessage_WithLastConfirmed_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(message.MessageID)) + }) + return _c +} + +func (_c *MockMutableMessage_WithLastConfirmed_Call) Return(_a0 message.MutableMessage) *MockMutableMessage_WithLastConfirmed_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_WithLastConfirmed_Call) RunAndReturn(run func(message.MessageID) message.MutableMessage) *MockMutableMessage_WithLastConfirmed_Call { + _c.Call.Return(run) + return _c +} + // WithTimeTick provides a mock function with given fields: tt func (_m *MockMutableMessage) WithTimeTick(tt uint64) message.MutableMessage { ret := _m.Called(tt) diff --git a/internal/proto/log.proto b/internal/proto/log.proto index fb9da3a4c741..acaf6faa2eba 100644 --- a/internal/proto/log.proto +++ b/internal/proto/log.proto @@ -11,8 +11,35 @@ import "google/protobuf/empty.proto"; // Common // +// MessageID is the unique identifier of a message. +message MessageID { + bytes id = 1; +} + // Message is the basic unit of communication between publisher and consumer. message Message { bytes payload = 1; // message body map properties = 2; // message properties } + +// PChannelInfo is the information of a pchannel info. +message PChannelInfo { + string name = 1; // channel name + int64 term = 2; // A monotonic increasing term, every time the channel is recovered or moved to another lognode, the term will increase by meta server. + int64 serverID = 3; // The log node id address of the channel. + repeated VChannelInfo vChannelInfos = 4; // PChannel related vchannels. +} + +// VChannelInfo is the information of a vchannel info. +message VChannelInfo { + string name = 1; +} + +message DeliverPolicy { + oneof policy { + google.protobuf.Empty all = 1; // deliver all messages. + google.protobuf.Empty latest = 2; // deliver the latest message. + MessageID startFrom = 3; // deliver message from this message id. [startFrom, ...] + MessageID startAfter = 4; // deliver message after this message id. (startAfter, ...] + } +} diff --git a/internal/proto/logpb/extends.go b/internal/proto/logpb/extends.go new file mode 100644 index 000000000000..0bc80f2b823c --- /dev/null +++ b/internal/proto/logpb/extends.go @@ -0,0 +1,42 @@ +package logpb + +import ( + "google.golang.org/protobuf/types/known/emptypb" +) + +const ( + ServiceMethodPrefix = "/milvus.proto.log" + InitialTerm = int64(-1) +) + +func NewDeliverAll() *DeliverPolicy { + return &DeliverPolicy{ + Policy: &DeliverPolicy_All{ + All: &emptypb.Empty{}, + }, + } +} + +func NewDeliverLatest() *DeliverPolicy { + return &DeliverPolicy{ + Policy: &DeliverPolicy_Latest{ + Latest: &emptypb.Empty{}, + }, + } +} + +func NewDeliverStartFrom(messageID *MessageID) *DeliverPolicy { + return &DeliverPolicy{ + Policy: &DeliverPolicy_StartFrom{ + StartFrom: messageID, + }, + } +} + +func NewDeliverStartAfter(messageID *MessageID) *DeliverPolicy { + return &DeliverPolicy{ + Policy: &DeliverPolicy_StartAfter{ + StartAfter: messageID, + }, + } +} diff --git a/internal/util/logserviceutil/message/message_builder_test.go b/internal/util/logserviceutil/message/message_builder_test.go index c937c3d5d26b..399ca85570ff 100644 --- a/internal/util/logserviceutil/message/message_builder_test.go +++ b/internal/util/logserviceutil/message/message_builder_test.go @@ -1,13 +1,11 @@ package message_test import ( - "fmt" "testing" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message" "github.com/milvus-io/milvus/internal/util/logserviceutil/message" ) @@ -32,22 +30,13 @@ func TestMessage(t *testing.T) { assert.Equal(t, uint64(123), tt) assert.Equal(t, len([]byte(v)), n) - lcMsgID := mock_message.NewMockMessageID(t) - lcMsgID.EXPECT().Marshal().Return([]byte("lcMsgID")) + lcMsgID := message.NewTestMessageID(456) mutableMessage.WithLastConfirmed(lcMsgID) v, ok = mutableMessage.Properties().Get("_lc") assert.True(t, ok) - assert.Equal(t, v, "lcMsgID") + assert.Equal(t, v, "456") - msgID := mock_message.NewMockMessageID(t) - msgID.EXPECT().EQ(msgID).Return(true) - msgID.EXPECT().WALName().Return("testMsgID") - message.RegisterMessageIDUnmsarshaler("testMsgID", func(data []byte) (message.MessageID, error) { - if string(data) == "lcMsgID" { - return msgID, nil - } - panic(fmt.Sprintf("unexpected data: %s", data)) - }) + msgID := message.NewTestMessageID(123) b = message.NewBuilder() immutableMessage := b.WithMessageID(msgID). @@ -57,7 +46,7 @@ func TestMessage(t *testing.T) { "_t": "1", "_tt": string(proto.EncodeVarint(456)), "_v": "1", - "_lc": "lcMsgID", + "_lc": "456", }). BuildImmutable() @@ -68,7 +57,7 @@ func TestMessage(t *testing.T) { assert.Equal(t, "value", v) assert.True(t, ok) assert.Equal(t, message.MessageTypeTimeTick, immutableMessage.MessageType()) - assert.Equal(t, 36, immutableMessage.EstimateSize()) + assert.Equal(t, 32, immutableMessage.EstimateSize()) assert.Equal(t, message.Version(1), immutableMessage.Version()) assert.Equal(t, uint64(456), immutableMessage.TimeTick()) assert.NotNil(t, immutableMessage.LastConfirmedMessageID()) diff --git a/internal/util/logserviceutil/message/message_id.go b/internal/util/logserviceutil/message/message_id.go index 910338ea88cb..d68ab616a1e8 100644 --- a/internal/util/logserviceutil/message/message_id.go +++ b/internal/util/logserviceutil/message/message_id.go @@ -1,11 +1,17 @@ package message import ( + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) -// messageIDUnmarshaler is the map for message id unmarshaler. -var messageIDUnmarshaler typeutil.ConcurrentMap[string, MessageIDUnmarshaler] +var ( + // messageIDUnmarshaler is the map for message id unmarshaler. + messageIDUnmarshaler typeutil.ConcurrentMap[string, MessageIDUnmarshaler] + + ErrInvalidMessageID = errors.New("invalid message id") +) // RegisterMessageIDUnmsarshaler register the message id unmarshaler. func RegisterMessageIDUnmsarshaler(name string, unmarshaler MessageIDUnmarshaler) { diff --git a/internal/util/logserviceutil/message/message_id_test.go b/internal/util/logserviceutil/message/message_id_test.go index 5aef58bddd8c..349a16f2bdcf 100644 --- a/internal/util/logserviceutil/message/message_id_test.go +++ b/internal/util/logserviceutil/message/message_id_test.go @@ -1,4 +1,4 @@ -package message_test +package message import ( "bytes" @@ -6,35 +6,25 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message" - "github.com/milvus-io/milvus/internal/util/logserviceutil/message" ) func TestRegisterMessageIDUnmarshaler(t *testing.T) { - msgID := mock_message.NewMockMessageID(t) - - message.RegisterMessageIDUnmsarshaler("test", func(b []byte) (message.MessageID, error) { - if bytes.Equal(b, []byte("123")) { - return msgID, nil - } - return nil, errors.New("invalid") - }) + msgID := NewTestMessageID(123) - id, err := message.UnmarshalMessageID("test", []byte("123")) - assert.NotNil(t, id) + id, err := UnmarshalMessageID("test", []byte("123")) + assert.True(t, id.EQ(msgID)) assert.NoError(t, err) - id, err = message.UnmarshalMessageID("test", []byte("1234")) + id, err = UnmarshalMessageID("test", []byte("1234a")) assert.Nil(t, id) assert.Error(t, err) assert.Panics(t, func() { - message.UnmarshalMessageID("test1", []byte("123")) + UnmarshalMessageID("test1", []byte("123")) }) assert.Panics(t, func() { - message.RegisterMessageIDUnmsarshaler("test", func(b []byte) (message.MessageID, error) { + RegisterMessageIDUnmsarshaler("test", func(b []byte) (MessageID, error) { if bytes.Equal(b, []byte("123")) { return msgID, nil } diff --git a/internal/util/logserviceutil/message/test_message_id.go b/internal/util/logserviceutil/message/test_message_id.go new file mode 100644 index 000000000000..55d00a3dd931 --- /dev/null +++ b/internal/util/logserviceutil/message/test_message_id.go @@ -0,0 +1,67 @@ +//go:build test +// +build test + +package message + +import ( + "strconv" +) + +var _ MessageID = testMessageID(0) + +const testWALName = "test" + +func init() { + RegisterMessageIDUnmsarshaler(testWALName, UnmarshalTestMessageID) +} + +// NewTestMessageID create a new test message id. +func NewTestMessageID(id int64) MessageID { + return testMessageID(id) +} + +// UnmarshalTestMessageID unmarshal the message id. +func UnmarshalTestMessageID(data []byte) (MessageID, error) { + id, err := unmarshalTestMessageID(data) + if err != nil { + return nil, err + } + return id, nil +} + +// unmashalTestMessageID unmarshal the message id. +func unmarshalTestMessageID(data []byte) (testMessageID, error) { + id, err := strconv.ParseInt(string(data), 10, 64) + if err != nil { + return 0, err + } + return testMessageID(id), nil +} + +// testMessageID is the message id for rmq. +type testMessageID int64 + +// WALName returns the name of message id related wal. +func (id testMessageID) WALName() string { + return testWALName +} + +// LT less than. +func (id testMessageID) LT(other MessageID) bool { + return id < other.(testMessageID) +} + +// LTE less than or equal to. +func (id testMessageID) LTE(other MessageID) bool { + return id <= other.(testMessageID) +} + +// EQ Equal to. +func (id testMessageID) EQ(other MessageID) bool { + return id == other.(testMessageID) +} + +// Marshal marshal the message id. +func (id testMessageID) Marshal() []byte { + return []byte(strconv.FormatInt(int64(id), 10)) +} diff --git a/internal/util/logserviceutil/options/deliver.go b/internal/util/logserviceutil/options/deliver.go new file mode 100644 index 000000000000..1bce65940ab3 --- /dev/null +++ b/internal/util/logserviceutil/options/deliver.go @@ -0,0 +1,45 @@ +package options + +import ( + "github.com/milvus-io/milvus/internal/proto/logpb" + "github.com/milvus-io/milvus/internal/util/logserviceutil/message" +) + +const ( + deliverOrderTimetick DeliverOrder = 1 +) + +// DeliverOrder is the order of delivering messages. +type ( + DeliverOrder int + DeliverPolicy *logpb.DeliverPolicy +) + +// DeliverPolicyAll delivers all messages. +func DeliverPolicyAll() DeliverPolicy { + return logpb.NewDeliverAll() +} + +// DeliverLatest delivers the latest message. +func DeliverPolicyLatest() DeliverPolicy { + return logpb.NewDeliverLatest() +} + +// DeliverEarliest delivers the earliest message. +func DeliverPolicyStartFrom(messageID message.MessageID) DeliverPolicy { + return logpb.NewDeliverStartFrom(&logpb.MessageID{ + Id: messageID.Marshal(), + }) +} + +// DeliverPolicyStartAfter delivers the message after the specified message. +func DeliverPolicyStartAfter(messageID message.MessageID) DeliverPolicy { + return logpb.NewDeliverStartAfter(&logpb.MessageID{ + Id: messageID.Marshal(), + }) +} + +// DeliverOrderTimeTick delivers messages by time tick. +func DeliverOrderTimeTick() DeliverOrder { + return deliverOrderTimetick +}