Skip to content

Commit

Permalink
enhance: implement wal managerment on streaming node (#34153)
Browse files Browse the repository at this point in the history
issue: #33285

- add lifetime control for wal.
- implement distributed-safe wal manager on streaming node.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
  • Loading branch information
chyezh committed Jul 5, 2024
1 parent 6774724 commit ba04981
Show file tree
Hide file tree
Showing 17 changed files with 1,278 additions and 9 deletions.
1 change: 0 additions & 1 deletion internal/proto/streamingpb/extends.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ package streamingpb

const (
ServiceMethodPrefix = "/milvus.proto.log"
InitialTerm = int64(-1)
)
20 changes: 15 additions & 5 deletions internal/streamingnode/server/wal/adaptor/scanner_adaptor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package adaptor

import (
"github.com/milvus-io/milvus/pkg/log"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
Expand All @@ -18,6 +21,7 @@ func newScannerAdaptor(
cleanup func(),
) wal.Scanner {
s := &scannerAdaptorImpl{
logger: log.With(zap.String("name", name), zap.String("channel", l.Channel().Name)),
innerWAL: l,
readOption: readOption,
sendingCh: make(chan message.ImmutableMessage, 1),
Expand All @@ -33,6 +37,7 @@ func newScannerAdaptor(
// scannerAdaptorImpl is a wrapper of ScannerImpls to extend it into a Scanner interface.
type scannerAdaptorImpl struct {
*helper.ScannerHelper
logger *log.MLogger
innerWAL walimpls.WALImpls
readOption wal.ReadOption
sendingCh chan message.ImmutableMessage
Expand Down Expand Up @@ -96,8 +101,9 @@ func (s *scannerAdaptorImpl) getEventCh(scanner walimpls.ScannerImpls) (<-chan m
// we always need to recv message from upstream to avoid starve.
return scanner.Chan(), nil
}
// TODO: configurable pending count.
if s.pendingQueue.Len()+s.reorderBuffer.Len() > 1024 {
// TODO: configurable pending buffer count.
// If the pending queue is full, we need to wait until it's consumed to avoid scanner overloading.
if s.pendingQueue.Len() > 16 {
return nil, s.sendingCh
}
return scanner.Chan(), s.sendingCh
Expand All @@ -107,8 +113,6 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
if msg.MessageType() == message.MessageTypeTimeTick {
// If the time tick message incoming,
// the reorder buffer can be consumed into a pending queue with latest timetick.

// TODO: !!! should we drop the unexpected broken timetick rule message.
s.pendingQueue.Add(s.reorderBuffer.PopUtilTimeTick(msg.TimeTick()))
return
}
Expand All @@ -117,5 +121,11 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
return
}
// otherwise add message into reorder buffer directly.
s.reorderBuffer.Push(msg)
if err := s.reorderBuffer.Push(msg); err != nil {
s.logger.Warn("failed to push message into reorder buffer",
zap.Any("msgID", msg.MessageID()),
zap.Uint64("timetick", msg.TimeTick()),
zap.String("vchannel", msg.VChannel()),
zap.Error(err))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

func TestScannerAdaptorReadError(t *testing.T) {
err := errors.New("read error")
l := mock_walimpls.NewMockWALImpls(t)
l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err)
l.EXPECT().Channel().Return(types.PChannelInfo{})

s := newScannerAdaptor("scanner", l, wal.ReadOption{
DeliverPolicy: options.DeliverPolicyAll(),
Expand Down
14 changes: 12 additions & 2 deletions internal/streamingnode/server/wal/utility/reorder_buffer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package utility

import (
"github.com/cockroachdb/errors"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// ReOrderByTimeTickBuffer is a buffer that stores messages and pops them in order of time tick.
type ReOrderByTimeTickBuffer struct {
messageHeap typeutil.Heap[message.ImmutableMessage]
messageHeap typeutil.Heap[message.ImmutableMessage]
lastPopTimeTick uint64
}

// NewReOrderBuffer creates a new ReOrderBuffer.
Expand All @@ -18,8 +21,14 @@ func NewReOrderBuffer() *ReOrderByTimeTickBuffer {
}

// Push pushes a message into the buffer.
func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) {
func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) error {
// !!! Drop the unexpected broken timetick rule message.
// It will be enabled until the first timetick coming.
if msg.TimeTick() < r.lastPopTimeTick {
return errors.Errorf("message time tick is less than last pop time tick: %d", r.lastPopTimeTick)
}
r.messageHeap.Push(msg)
return nil
}

// PopUtilTimeTick pops all messages whose time tick is less than or equal to the given time tick.
Expand All @@ -29,6 +38,7 @@ func (r *ReOrderByTimeTickBuffer) PopUtilTimeTick(timetick uint64) []message.Imm
for r.messageHeap.Len() > 0 && r.messageHeap.Peek().TimeTick() <= timetick {
res = append(res, r.messageHeap.Pop())
}
r.lastPopTimeTick = timetick
return res
}

Expand Down
29 changes: 29 additions & 0 deletions internal/streamingnode/server/walmanager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package walmanager

import (
"context"

"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

var _ Manager = (*managerImpl)(nil)

// Manager is the interface for managing the wal instances.
type Manager interface {
// Open opens a wal instance for the channel on this Manager.
Open(ctx context.Context, channel types.PChannelInfo) error

// GetAvailableWAL returns a available wal instance for the channel.
// Return nil if the wal instance is not found.
GetAvailableWAL(channelName string, term int64) (wal.WAL, error)

// GetAllAvailableWALInfo returns all available channel info.
GetAllAvailableChannels() ([]types.PChannelInfo, error)

// Remove removes the wal instance for the channel.
Remove(ctx context.Context, channel types.PChannelInfo) error

// Close these manager and release all managed WAL.
Close()
}
152 changes: 152 additions & 0 deletions internal/streamingnode/server/walmanager/manager_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package walmanager

import (
"context"

"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// OpenManager create a wal manager.
func OpenManager() (Manager, error) {
walName := util.MustSelectWALName()
log.Info("open wal manager", zap.String("walName", walName))
opener, err := registry.MustGetBuilder(walName).Build()
if err != nil {
return nil, err
}
return newManager(opener), nil
}

// newManager create a wal manager.
func newManager(opener wal.Opener) Manager {
return &managerImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
wltMap: typeutil.NewConcurrentMap[string, *walLifetime](),
opener: opener,
}
}

// All management operation for a wal will be serialized with order of term.
type managerImpl struct {
lifetime lifetime.Lifetime[lifetime.State]

wltMap *typeutil.ConcurrentMap[string, *walLifetime]
opener wal.Opener // wal allocator
}

// Open opens a wal instance for the channel on this Manager.
func (m *managerImpl) Open(ctx context.Context, channel types.PChannelInfo) (err error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return status.NewOnShutdownError("wal manager is closed")
}
defer func() {
m.lifetime.Done()
if err != nil {
log.Warn("open wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term))
return
}
log.Info("open wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term))
}()

return m.getWALLifetime(channel.Name).Open(ctx, channel)
}

// Remove removes the wal instance for the channel.
func (m *managerImpl) Remove(ctx context.Context, channel types.PChannelInfo) (err error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return status.NewOnShutdownError("wal manager is closed")
}
defer func() {
m.lifetime.Done()
if err != nil {
log.Warn("remove wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term))
}
log.Info("remove wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term))
}()

return m.getWALLifetime(channel.Name).Remove(ctx, channel.Term)
}

// GetAvailableWAL returns a available wal instance for the channel.
// Return nil if the wal instance is not found.
func (m *managerImpl) GetAvailableWAL(channelName string, term int64) (wal.WAL, error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return nil, status.NewOnShutdownError("wal manager is closed")
}
defer m.lifetime.Done()

l := m.getWALLifetime(channelName).GetWAL()
if l == nil {
return nil, status.NewChannelNotExist(channelName)
}

channelTerm := l.Channel().Term
if channelTerm != term {
return nil, status.NewUnmatchedChannelTerm(channelName, term, channelTerm)
}
return l, nil
}

// GetAllAvailableChannels returns all available channel info.
func (m *managerImpl) GetAllAvailableChannels() ([]types.PChannelInfo, error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return nil, status.NewOnShutdownError("wal manager is closed")
}
defer m.lifetime.Done()

// collect all available wal info.
infos := make([]types.PChannelInfo, 0)
m.wltMap.Range(func(channel string, lt *walLifetime) bool {
if l := lt.GetWAL(); l != nil {
info := l.Channel()
infos = append(infos, info)
}
return true
})
return infos, nil
}

// Close these manager and release all managed WAL.
func (m *managerImpl) Close() {
m.lifetime.SetState(lifetime.Stopped)
m.lifetime.Wait()
m.lifetime.Close()

// close all underlying walLifetime.
m.wltMap.Range(func(channel string, wlt *walLifetime) bool {
wlt.Close()
return true
})

// close all underlying wal instance by allocator if there's resource leak.
m.opener.Close()
}

// getWALLifetime returns the wal lifetime for the channel.
func (m *managerImpl) getWALLifetime(channel string) *walLifetime {
if wlt, loaded := m.wltMap.Get(channel); loaded {
return wlt
}

// Perform a cas here.
newWLT := newWALLifetime(m.opener, channel)
wlt, loaded := m.wltMap.GetOrInsert(channel, newWLT)
// if loaded, lifetime is exist, close the redundant lifetime.
if loaded {
newWLT.Close()
}
return wlt
}
Loading

0 comments on commit ba04981

Please sign in to comment.