Skip to content

Commit

Permalink
enhance: add walimplstest
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <chyezh@outlook.com>
  • Loading branch information
chyezh committed Jun 13, 2024
1 parent 60be4f7 commit 417e931
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 26 deletions.
32 changes: 32 additions & 0 deletions internal/lognode/server/wal/walimplstest/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//go:build test
// +build test

package walimplstest

import (
"github.com/milvus-io/milvus/internal/lognode/server/wal/registry"
"github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls"
"github.com/milvus-io/milvus/internal/util/logserviceutil/message"
)

const (
walName = "test"
)

func init() {
// register the builder to the registry.
registry.RegisterBuilder(&openerBuilder{})
message.RegisterMessageIDUnmsarshaler(walName, UnmarshalTestMessageID)
}

var _ walimpls.OpenerBuilderImpls = &openerBuilder{}

type openerBuilder struct{}

func (o *openerBuilder) Name() string {
return walName
}

func (o *openerBuilder) Build() (walimpls.OpenerImpls, error) {
return &opener{}, nil
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
//go:build test
// +build test

package message
package walimplstest

import (
"strconv"
)

var _ MessageID = testMessageID(0)

const testWALName = "test"
"github.com/milvus-io/milvus/internal/util/logserviceutil/message"
)

func init() {
RegisterMessageIDUnmsarshaler(testWALName, UnmarshalTestMessageID)
}
var _ message.MessageID = testMessageID(0)

// NewTestMessageID create a new test message id.
func NewTestMessageID(id int64) MessageID {
func NewTestMessageID(id int64) message.MessageID {
return testMessageID(id)
}

// UnmarshalTestMessageID unmarshal the message id.
func UnmarshalTestMessageID(data []byte) (MessageID, error) {
func UnmarshalTestMessageID(data []byte) (message.MessageID, error) {
id, err := unmarshalTestMessageID(data)
if err != nil {
return nil, err
Expand All @@ -43,21 +39,21 @@ type testMessageID int64

// WALName returns the name of message id related wal.
func (id testMessageID) WALName() string {
return testWALName
return walName
}

// LT less than.
func (id testMessageID) LT(other MessageID) bool {
func (id testMessageID) LT(other message.MessageID) bool {
return id < other.(testMessageID)
}

// LTE less than or equal to.
func (id testMessageID) LTE(other MessageID) bool {
func (id testMessageID) LTE(other message.MessageID) bool {
return id <= other.(testMessageID)
}

// EQ Equal to.
func (id testMessageID) EQ(other MessageID) bool {
func (id testMessageID) EQ(other message.MessageID) bool {
return id == other.(testMessageID)
}

Expand Down
64 changes: 64 additions & 0 deletions internal/lognode/server/wal/walimplstest/message_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//go:build test
// +build test

package walimplstest

import (
"context"
"sync"

"github.com/milvus-io/milvus/internal/util/logserviceutil/message"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var logs = typeutil.NewConcurrentMap[string, *messageLog]()

func getOrCreateLogs(name string) *messageLog {
l := newMessageLog()
l, _ = logs.GetOrInsert(name, l)
return l
}

func newMessageLog() *messageLog {
return &messageLog{
cond: syncutil.NewContextCond(&sync.Mutex{}),
id: 0,
logs: make([]message.ImmutableMessage, 0),
}
}

type messageLog struct {
cond *syncutil.ContextCond
id int64
logs []message.ImmutableMessage
}

func (l *messageLog) Append(_ context.Context, msg message.MutableMessage) (message.MessageID, error) {
l.cond.LockAndBroadcast()
defer l.cond.L.Unlock()
newMessageID := NewTestMessageID(l.id)
l.id++
l.logs = append(l.logs, msg.IntoImmutableMessage(newMessageID))
return newMessageID, nil
}

func (l *messageLog) ReadAt(ctx context.Context, idx int) (message.ImmutableMessage, error) {
var msg message.ImmutableMessage
l.cond.L.Lock()
for idx >= len(l.logs) {
if err := l.cond.Wait(ctx); err != nil {
return nil, err
}
}
msg = l.logs[idx]
l.cond.L.Unlock()

return msg, nil
}

func (l *messageLog) Len() int64 {
l.cond.L.Lock()
defer l.cond.L.Unlock()
return int64(len(l.logs))
}
26 changes: 26 additions & 0 deletions internal/lognode/server/wal/walimplstest/opener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//go:build test
// +build test

package walimplstest

import (
"context"

"github.com/milvus-io/milvus/internal/lognode/server/wal/helper"
"github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls"
)

var _ walimpls.OpenerImpls = &opener{}

type opener struct{}

func (*opener) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) {
l := getOrCreateLogs(opt.Channel.GetName())
return &walImpls{
WALHelper: *helper.NewWALHelper(opt),
datas: l,
}, nil
}

func (*opener) Close() {
}
51 changes: 51 additions & 0 deletions internal/lognode/server/wal/walimplstest/scanner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//go:build test
// +build test

package walimplstest

import (
"github.com/milvus-io/milvus/internal/lognode/server/wal/helper"
"github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls"
"github.com/milvus-io/milvus/internal/util/logserviceutil/message"
)

var _ walimpls.ScannerImpls = &scannerImpls{}

func newScannerImpls(opts walimpls.ReadOption, data *messageLog, offset int) *scannerImpls {
s := &scannerImpls{
ScannerHelper: helper.NewScannerHelper(opts.Name),
datas: data,
ch: make(chan message.ImmutableMessage, 0),
offset: offset,
}
go s.executeConsume()
return s
}

type scannerImpls struct {
*helper.ScannerHelper
datas *messageLog
ch chan message.ImmutableMessage
offset int
}

func (s *scannerImpls) executeConsume() {
defer close(s.ch)
for {
msg, err := s.datas.ReadAt(s.Context(), s.offset)
if err != nil {
s.Finish(nil)
return
}
s.ch <- msg
s.offset++
}
}

func (s *scannerImpls) Chan() <-chan message.ImmutableMessage {
return s.ch
}

func (s *scannerImpls) Close() error {
return s.ScannerHelper.Close()
}
52 changes: 52 additions & 0 deletions internal/lognode/server/wal/walimplstest/wal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//go:build test
// +build test

package walimplstest

import (
"context"

"github.com/milvus-io/milvus/internal/lognode/server/wal/helper"
"github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls"
"github.com/milvus-io/milvus/internal/proto/logpb"
"github.com/milvus-io/milvus/internal/util/logserviceutil/message"
)

var _ walimpls.WALImpls = &walImpls{}

type walImpls struct {
helper.WALHelper
datas *messageLog
}

func (w *walImpls) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
return w.datas.Append(ctx, msg)
}

func (w *walImpls) Read(ctx context.Context, opts walimpls.ReadOption) (walimpls.ScannerImpls, error) {
offset := int64(0)
switch policy := opts.DeliverPolicy.Policy.(type) {
case *logpb.DeliverPolicy_All:
offset = 0
case *logpb.DeliverPolicy_Latest:
offset = w.datas.Len()
case *logpb.DeliverPolicy_StartFrom:
id, err := unmarshalTestMessageID(policy.StartFrom.Id)
if err != nil {
return nil, err
}
offset = int64(id)
case *logpb.DeliverPolicy_StartAfter:
id, err := unmarshalTestMessageID(policy.StartAfter.Id)
if err != nil {
return nil, err
}
offset = int64(id + 1)
}
return newScannerImpls(
opts, w.datas, int(offset),
), nil
}

func (w *walImpls) Close() {
}
11 changes: 11 additions & 0 deletions internal/lognode/server/wal/walimplstest/wal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package walimplstest

import (
"testing"

"github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls"
)

func TestWALImplsTest(t *testing.T) {
walimpls.NewWALImplsTestFramework(t, 100, &openerBuilder{}).Run()
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/util/logserviceutil/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type MutableMessage interface {

// Properties returns the message properties.
Properties() Properties

// IntoImmutableMessage converts the mutable message to immutable message.
IntoImmutableMessage(msgID MessageID) ImmutableMessage
}

// ImmutableMessage is the read-only message interface.
Expand Down
Loading

0 comments on commit 417e931

Please sign in to comment.