Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: implement wal managerment on streaming node #34153

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/proto/streamingpb/extends.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ package streamingpb

const (
ServiceMethodPrefix = "/milvus.proto.log"
InitialTerm = int64(-1)
)
20 changes: 15 additions & 5 deletions internal/streamingnode/server/wal/adaptor/scanner_adaptor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package adaptor

import (
"github.com/milvus-io/milvus/pkg/log"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
Expand All @@ -18,6 +21,7 @@
cleanup func(),
) wal.Scanner {
s := &scannerAdaptorImpl{
logger: log.With(zap.String("name", name), zap.String("channel", l.Channel().Name)),
innerWAL: l,
readOption: readOption,
sendingCh: make(chan message.ImmutableMessage, 1),
Expand All @@ -33,6 +37,7 @@
// scannerAdaptorImpl is a wrapper of ScannerImpls to extend it into a Scanner interface.
type scannerAdaptorImpl struct {
*helper.ScannerHelper
logger *log.MLogger
innerWAL walimpls.WALImpls
readOption wal.ReadOption
sendingCh chan message.ImmutableMessage
Expand Down Expand Up @@ -96,8 +101,9 @@
// we always need to recv message from upstream to avoid starve.
return scanner.Chan(), nil
}
// TODO: configurable pending count.
if s.pendingQueue.Len()+s.reorderBuffer.Len() > 1024 {
// TODO: configurable pending buffer count.
// If the pending queue is full, we need to wait until it's consumed to avoid scanner overloading.
if s.pendingQueue.Len() > 16 {
return nil, s.sendingCh
}
return scanner.Chan(), s.sendingCh
Expand All @@ -107,8 +113,6 @@
if msg.MessageType() == message.MessageTypeTimeTick {
// If the time tick message incoming,
// the reorder buffer can be consumed into a pending queue with latest timetick.

// TODO: !!! should we drop the unexpected broken timetick rule message.
s.pendingQueue.Add(s.reorderBuffer.PopUtilTimeTick(msg.TimeTick()))
return
}
Expand All @@ -117,5 +121,11 @@
return
}
// otherwise add message into reorder buffer directly.
s.reorderBuffer.Push(msg)
if err := s.reorderBuffer.Push(msg); err != nil {
s.logger.Warn("failed to push message into reorder buffer",
zap.Any("msgID", msg.MessageID()),
zap.Uint64("timetick", msg.TimeTick()),
zap.String("vchannel", msg.VChannel()),
zap.Error(err))

Check warning on line 129 in internal/streamingnode/server/wal/adaptor/scanner_adaptor.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/adaptor/scanner_adaptor.go#L125-L129

Added lines #L125 - L129 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

func TestScannerAdaptorReadError(t *testing.T) {
err := errors.New("read error")
l := mock_walimpls.NewMockWALImpls(t)
l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err)
l.EXPECT().Channel().Return(types.PChannelInfo{})

s := newScannerAdaptor("scanner", l, wal.ReadOption{
DeliverPolicy: options.DeliverPolicyAll(),
Expand Down
14 changes: 12 additions & 2 deletions internal/streamingnode/server/wal/utility/reorder_buffer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package utility

import (
"github.com/cockroachdb/errors"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// ReOrderByTimeTickBuffer is a buffer that stores messages and pops them in order of time tick.
type ReOrderByTimeTickBuffer struct {
messageHeap typeutil.Heap[message.ImmutableMessage]
messageHeap typeutil.Heap[message.ImmutableMessage]
lastPopTimeTick uint64
}

// NewReOrderBuffer creates a new ReOrderBuffer.
Expand All @@ -18,8 +21,14 @@
}

// Push pushes a message into the buffer.
func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) {
func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) error {
// !!! Drop the unexpected broken timetick rule message.
// It will be enabled until the first timetick coming.
if msg.TimeTick() < r.lastPopTimeTick {
return errors.Errorf("message time tick is less than last pop time tick: %d", r.lastPopTimeTick)

Check warning on line 28 in internal/streamingnode/server/wal/utility/reorder_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/utility/reorder_buffer.go#L28

Added line #L28 was not covered by tests
}
r.messageHeap.Push(msg)
return nil
}

// PopUtilTimeTick pops all messages whose time tick is less than or equal to the given time tick.
Expand All @@ -29,6 +38,7 @@
for r.messageHeap.Len() > 0 && r.messageHeap.Peek().TimeTick() <= timetick {
res = append(res, r.messageHeap.Pop())
}
r.lastPopTimeTick = timetick
return res
}

Expand Down
29 changes: 29 additions & 0 deletions internal/streamingnode/server/walmanager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package walmanager

import (
"context"

"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

var _ Manager = (*managerImpl)(nil)

// Manager is the interface for managing the wal instances.
type Manager interface {
// Open opens a wal instance for the channel on this Manager.
Open(ctx context.Context, channel types.PChannelInfo) error

// GetAvailableWAL returns a available wal instance for the channel.
// Return nil if the wal instance is not found.
GetAvailableWAL(channelName string, term int64) (wal.WAL, error)

// GetAllAvailableWALInfo returns all available channel info.
GetAllAvailableChannels() ([]types.PChannelInfo, error)

// Remove removes the wal instance for the channel.
Remove(ctx context.Context, channel types.PChannelInfo) error

// Close these manager and release all managed WAL.
Close()
}
152 changes: 152 additions & 0 deletions internal/streamingnode/server/walmanager/manager_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package walmanager

import (
"context"

"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// OpenManager create a wal manager.
func OpenManager() (Manager, error) {
walName := util.MustSelectWALName()
log.Info("open wal manager", zap.String("walName", walName))
opener, err := registry.MustGetBuilder(walName).Build()
if err != nil {
return nil, err

Check warning on line 24 in internal/streamingnode/server/walmanager/manager_impl.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/walmanager/manager_impl.go#L19-L24

Added lines #L19 - L24 were not covered by tests
}
return newManager(opener), nil

Check warning on line 26 in internal/streamingnode/server/walmanager/manager_impl.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/walmanager/manager_impl.go#L26

Added line #L26 was not covered by tests
}

// newManager create a wal manager.
func newManager(opener wal.Opener) Manager {
return &managerImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
wltMap: typeutil.NewConcurrentMap[string, *walLifetime](),
opener: opener,
}
}

// All management operation for a wal will be serialized with order of term.
type managerImpl struct {
lifetime lifetime.Lifetime[lifetime.State]

wltMap *typeutil.ConcurrentMap[string, *walLifetime]
opener wal.Opener // wal allocator
}

// Open opens a wal instance for the channel on this Manager.
func (m *managerImpl) Open(ctx context.Context, channel types.PChannelInfo) (err error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return status.NewOnShutdownError("wal manager is closed")
}
defer func() {
m.lifetime.Done()
if err != nil {
log.Warn("open wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term))
return
}
log.Info("open wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term))
}()

return m.getWALLifetime(channel.Name).Open(ctx, channel)
}

// Remove removes the wal instance for the channel.
func (m *managerImpl) Remove(ctx context.Context, channel types.PChannelInfo) (err error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return status.NewOnShutdownError("wal manager is closed")
}
defer func() {
m.lifetime.Done()
if err != nil {
log.Warn("remove wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term))
}
log.Info("remove wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term))
}()

return m.getWALLifetime(channel.Name).Remove(ctx, channel.Term)
}

// GetAvailableWAL returns a available wal instance for the channel.
// Return nil if the wal instance is not found.
func (m *managerImpl) GetAvailableWAL(channelName string, term int64) (wal.WAL, error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return nil, status.NewOnShutdownError("wal manager is closed")
}
defer m.lifetime.Done()

l := m.getWALLifetime(channelName).GetWAL()
if l == nil {
return nil, status.NewChannelNotExist(channelName)
}

channelTerm := l.Channel().Term
if channelTerm != term {
return nil, status.NewUnmatchedChannelTerm(channelName, term, channelTerm)
}
return l, nil
}

// GetAllAvailableChannels returns all available channel info.
func (m *managerImpl) GetAllAvailableChannels() ([]types.PChannelInfo, error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return nil, status.NewOnShutdownError("wal manager is closed")
}
defer m.lifetime.Done()

// collect all available wal info.
infos := make([]types.PChannelInfo, 0)
m.wltMap.Range(func(channel string, lt *walLifetime) bool {
if l := lt.GetWAL(); l != nil {
info := l.Channel()
infos = append(infos, info)
}
return true
})
return infos, nil
}

// Close these manager and release all managed WAL.
func (m *managerImpl) Close() {
m.lifetime.SetState(lifetime.Stopped)
m.lifetime.Wait()
m.lifetime.Close()

// close all underlying walLifetime.
m.wltMap.Range(func(channel string, wlt *walLifetime) bool {
wlt.Close()
return true
})

// close all underlying wal instance by allocator if there's resource leak.
m.opener.Close()
}

// getWALLifetime returns the wal lifetime for the channel.
func (m *managerImpl) getWALLifetime(channel string) *walLifetime {
if wlt, loaded := m.wltMap.Get(channel); loaded {
return wlt
}

// Perform a cas here.
newWLT := newWALLifetime(m.opener, channel)
wlt, loaded := m.wltMap.GetOrInsert(channel, newWLT)
// if loaded, lifetime is exist, close the redundant lifetime.
if loaded {
newWLT.Close()

Check warning on line 149 in internal/streamingnode/server/walmanager/manager_impl.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/walmanager/manager_impl.go#L149

Added line #L149 was not covered by tests
}
return wlt
}
Loading
Loading