From 2a7bcd7f0cded60cf6f08894735297a35d3081f6 Mon Sep 17 00:00:00 2001 From: chyezh Date: Tue, 25 Jun 2024 16:12:34 +0800 Subject: [PATCH 1/3] enhance: implement wal managerment Signed-off-by: chyezh --- internal/proto/streamingpb/extends.go | 1 - .../server/walmanager/manager.go | 29 +++ .../server/walmanager/manager_impl.go | 152 +++++++++++ .../server/walmanager/manager_impl_test.go | 113 +++++++++ .../server/walmanager/wal_lifetime.go | 161 ++++++++++++ .../server/walmanager/wal_lifetime_test.go | 106 ++++++++ .../server/walmanager/wal_state.go | 238 ++++++++++++++++++ .../server/walmanager/wal_state_pair.go | 71 ++++++ .../server/walmanager/wal_state_pair_test.go | 83 ++++++ .../server/walmanager/wal_state_test.go | 183 ++++++++++++++ .../util/streamingutil/util/wal_selector.go | 75 ++++++ .../streamingutil/util/wal_selector_test.go | 33 +++ pkg/streaming/util/types/pchannel_info.go | 4 + pkg/streaming/walimpls/impls/rmq/builder.go | 2 +- 14 files changed, 1249 insertions(+), 2 deletions(-) create mode 100644 internal/streamingnode/server/walmanager/manager.go create mode 100644 internal/streamingnode/server/walmanager/manager_impl.go create mode 100644 internal/streamingnode/server/walmanager/manager_impl_test.go create mode 100644 internal/streamingnode/server/walmanager/wal_lifetime.go create mode 100644 internal/streamingnode/server/walmanager/wal_lifetime_test.go create mode 100644 internal/streamingnode/server/walmanager/wal_state.go create mode 100644 internal/streamingnode/server/walmanager/wal_state_pair.go create mode 100644 internal/streamingnode/server/walmanager/wal_state_pair_test.go create mode 100644 internal/streamingnode/server/walmanager/wal_state_test.go create mode 100644 internal/util/streamingutil/util/wal_selector.go create mode 100644 internal/util/streamingutil/util/wal_selector_test.go diff --git a/internal/proto/streamingpb/extends.go b/internal/proto/streamingpb/extends.go index fce24d319b3c..5d0f3fd85d58 100644 --- a/internal/proto/streamingpb/extends.go +++ b/internal/proto/streamingpb/extends.go @@ -2,5 +2,4 @@ package streamingpb const ( ServiceMethodPrefix = "/milvus.proto.log" - InitialTerm = int64(-1) ) diff --git a/internal/streamingnode/server/walmanager/manager.go b/internal/streamingnode/server/walmanager/manager.go new file mode 100644 index 000000000000..c7a5d01b8081 --- /dev/null +++ b/internal/streamingnode/server/walmanager/manager.go @@ -0,0 +1,29 @@ +package walmanager + +import ( + "context" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/pkg/streaming/util/types" +) + +var _ Manager = (*managerImpl)(nil) + +// Manager is the interface for managing the wal instances. +type Manager interface { + // Open opens a wal instance for the channel on this Manager. + Open(ctx context.Context, channel types.PChannelInfo) error + + // GetAvailableWAL returns a available wal instance for the channel. + // Return nil if the wal instance is not found. + GetAvailableWAL(channelName string, term int64) (wal.WAL, error) + + // GetAllAvailableWALInfo returns all available channel info. + GetAllAvailableChannels() ([]types.PChannelInfo, error) + + // Remove removes the wal instance for the channel. + Remove(ctx context.Context, channel string, term int64) error + + // Close these manager and release all managed WAL. + Close() +} diff --git a/internal/streamingnode/server/walmanager/manager_impl.go b/internal/streamingnode/server/walmanager/manager_impl.go new file mode 100644 index 000000000000..1ca77fa4a3ad --- /dev/null +++ b/internal/streamingnode/server/walmanager/manager_impl.go @@ -0,0 +1,152 @@ +package walmanager + +import ( + "context" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/internal/util/streamingutil/util" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// OpenManager create a wal manager. +func OpenManager() (Manager, error) { + walName := util.MustSelectWALName() + log.Info("open wal manager", zap.String("walName", walName)) + opener, err := registry.MustGetBuilder(walName).Build() + if err != nil { + return nil, err + } + return newManager(opener), nil +} + +// newManager create a wal manager. +func newManager(opener wal.Opener) Manager { + return &managerImpl{ + lifetime: lifetime.NewLifetime(lifetime.Working), + wltMap: typeutil.NewConcurrentMap[string, *walLifetime](), + opener: opener, + } +} + +// All management operation for a wal will be serialized with order of term. +type managerImpl struct { + lifetime lifetime.Lifetime[lifetime.State] + + wltMap *typeutil.ConcurrentMap[string, *walLifetime] + opener wal.Opener // wal allocator +} + +// Open opens a wal instance for the channel on this Manager. +func (m *managerImpl) Open(ctx context.Context, channel types.PChannelInfo) (err error) { + // reject operation if manager is closing. + if m.lifetime.Add(lifetime.IsWorking) != nil { + return status.NewOnShutdownError("wal manager is closed") + } + defer func() { + m.lifetime.Done() + if err != nil { + log.Warn("open wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) + return + } + log.Info("open wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) + }() + + return m.getWALLifetime(channel.Name).Open(ctx, channel) +} + +// Remove removes the wal instance for the channel. +func (m *managerImpl) Remove(ctx context.Context, channel string, term int64) (err error) { + // reject operation if manager is closing. + if m.lifetime.Add(lifetime.IsWorking) != nil { + return status.NewOnShutdownError("wal manager is closed") + } + defer func() { + m.lifetime.Done() + if err != nil { + log.Warn("remove wal failed", zap.Error(err), zap.String("channel", channel), zap.Int64("term", term)) + } + log.Info("remove wal success", zap.String("channel", channel), zap.Int64("term", term)) + }() + + return m.getWALLifetime(channel).Remove(ctx, term) +} + +// GetAvailableWAL returns a available wal instance for the channel. +// Return nil if the wal instance is not found. +func (m *managerImpl) GetAvailableWAL(channelName string, term int64) (wal.WAL, error) { + // reject operation if manager is closing. + if m.lifetime.Add(lifetime.IsWorking) != nil { + return nil, status.NewOnShutdownError("wal manager is closed") + } + defer m.lifetime.Done() + + l := m.getWALLifetime(channelName).GetWAL() + if l == nil { + return nil, status.NewChannelNotExist(channelName) + } + + channelTerm := l.Channel().Term + if channelTerm != term { + return nil, status.NewUnmatchedChannelTerm(channelName, term, channelTerm) + } + return l, nil +} + +// GetAllAvailableChannels returns all available channel info. +func (m *managerImpl) GetAllAvailableChannels() ([]types.PChannelInfo, error) { + // reject operation if manager is closing. + if m.lifetime.Add(lifetime.IsWorking) != nil { + return nil, status.NewOnShutdownError("wal manager is closed") + } + defer m.lifetime.Done() + + // collect all available wal info. + infos := make([]types.PChannelInfo, 0) + m.wltMap.Range(func(channel string, lt *walLifetime) bool { + if l := lt.GetWAL(); l != nil { + info := l.Channel() + infos = append(infos, info) + } + return true + }) + return infos, nil +} + +// Close these manager and release all managed WAL. +func (m *managerImpl) Close() { + m.lifetime.SetState(lifetime.Stopped) + m.lifetime.Wait() + m.lifetime.Close() + + // close all underlying walLifetime. + m.wltMap.Range(func(channel string, wlt *walLifetime) bool { + wlt.Close() + return true + }) + + // close all underlying wal instance by allocator if there's resource leak. + m.opener.Close() +} + +// getWALLifetime returns the wal lifetime for the channel. +func (m *managerImpl) getWALLifetime(channel string) *walLifetime { + if wlt, loaded := m.wltMap.Get(channel); loaded { + return wlt + } + + // Perform a cas here. + newWLT := newWALLifetime(m.opener, channel) + wlt, loaded := m.wltMap.GetOrInsert(channel, newWLT) + // if loaded, lifetime is exist, close the redundant lifetime. + if loaded { + newWLT.Close() + } + return wlt +} diff --git a/internal/streamingnode/server/walmanager/manager_impl_test.go b/internal/streamingnode/server/walmanager/manager_impl_test.go new file mode 100644 index 000000000000..9885eac188fc --- /dev/null +++ b/internal/streamingnode/server/walmanager/manager_impl_test.go @@ -0,0 +1,113 @@ +package walmanager + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal" + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestMain(m *testing.M) { + paramtable.Init() + m.Run() +} + +func TestManager(t *testing.T) { + opener := mock_wal.NewMockOpener(t) + opener.EXPECT().Open(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, oo *wal.OpenOption) (wal.WAL, error) { + l := mock_wal.NewMockWAL(t) + l.EXPECT().Channel().Return(oo.Channel) + l.EXPECT().Close().Return() + return l, nil + }) + opener.EXPECT().Close().Return() + + m := newManager(opener) + channelName := "ch1" + + l, err := m.GetAvailableWAL(channelName, 1) + assertErrorChannelNotExist(t, err) + assert.Nil(t, l) + + h, err := m.GetAllAvailableChannels() + assert.NoError(t, err) + assert.Len(t, h, 0) + + err = m.Remove(context.Background(), channelName, 1) + assert.NoError(t, err) + + l, err = m.GetAvailableWAL(channelName, 1) + assertErrorChannelNotExist(t, err) + assert.Nil(t, l) + + err = m.Open(context.Background(), types.PChannelInfo{ + Name: channelName, + Term: 1, + }) + assertErrorTermExpired(t, err) + + err = m.Open(context.Background(), types.PChannelInfo{ + Name: channelName, + Term: 2, + }) + assert.NoError(t, err) + + err = m.Remove(context.Background(), channelName, 1) + assertErrorTermExpired(t, err) + + l, err = m.GetAvailableWAL(channelName, 1) + assertErrorTermExpired(t, err) + assert.Nil(t, l) + + l, err = m.GetAvailableWAL(channelName, 2) + assert.NoError(t, err) + assert.NotNil(t, l) + + h, err = m.GetAllAvailableChannels() + assert.NoError(t, err) + assert.Len(t, h, 1) + + err = m.Open(context.Background(), types.PChannelInfo{ + Name: "term2", + Term: 3, + }) + assert.NoError(t, err) + + h, err = m.GetAllAvailableChannels() + assert.NoError(t, err) + assert.Len(t, h, 2) + + m.Close() + + h, err = m.GetAllAvailableChannels() + assertShutdownError(t, err) + assert.Len(t, h, 0) + + err = m.Open(context.Background(), types.PChannelInfo{ + Name: "term2", + Term: 4, + }) + assertShutdownError(t, err) + + err = m.Remove(context.Background(), channelName, 2) + assertShutdownError(t, err) + + l, err = m.GetAvailableWAL(channelName, 2) + assertShutdownError(t, err) + assert.Nil(t, l) +} + +func assertShutdownError(t *testing.T, err error) { + assert.Error(t, err) + e := status.AsStreamingError(err) + assert.Equal(t, e.Code, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN) +} diff --git a/internal/streamingnode/server/walmanager/wal_lifetime.go b/internal/streamingnode/server/walmanager/wal_lifetime.go new file mode 100644 index 000000000000..d00ddc37d717 --- /dev/null +++ b/internal/streamingnode/server/walmanager/wal_lifetime.go @@ -0,0 +1,161 @@ +package walmanager + +import ( + "context" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/types" +) + +// newWALLifetime create a WALLifetime with opener. +func newWALLifetime(opener wal.Opener, channel string) *walLifetime { + ctx, cancel := context.WithCancel(context.Background()) + l := &walLifetime{ + ctx: ctx, + cancel: cancel, + finish: make(chan struct{}), + opener: opener, + statePair: newWALStatePair(), + logger: log.With(zap.String("channel", channel)), + } + go l.backgroundTask() + return l +} + +// walLifetime is the lifetime management of a wal. +// It promise a wal is keep state consistency in distributed environment. +// All operation on wal management will be sorted with following rules: +// (term, available) illuminate the state of wal. +// term is always increasing, available is always before unavailable in same term, such as: +// (-1, false) -> (0, true) -> (1, true) -> (2, true) -> (3, false) -> (7, true) -> ... +type walLifetime struct { + ctx context.Context + cancel context.CancelFunc + + finish chan struct{} + opener wal.Opener + statePair *walStatePair + logger *log.MLogger +} + +// GetWAL returns a available wal instance for the channel. +// Return nil if the wal is not available now. +func (w *walLifetime) GetWAL() wal.WAL { + return w.statePair.GetWAL() +} + +// Open opens a wal instance for the channel on this Manager. +func (w *walLifetime) Open(ctx context.Context, channel types.PChannelInfo) error { + // Set expected WAL state to available at given term. + expected := newAvailableExpectedState(ctx, channel) + if !w.statePair.SetExpectedState(expected) { + return status.NewUnmatchedChannelTerm("expired term, cannot change expected state for open") + } + + // Wait until the WAL state is ready or term expired or error occurs. + return w.statePair.WaitCurrentStateReachExpected(ctx, expected) +} + +// Remove removes the wal instance for the channel on this Manager. +func (w *walLifetime) Remove(ctx context.Context, term int64) error { + // Set expected WAL state to unavailable at given term. + expected := newUnavailableExpectedState(term) + if !w.statePair.SetExpectedState(expected) { + return status.NewUnmatchedChannelTerm("expired term, cannot change expected state for remove") + } + + // Wait until the WAL state is ready or term expired or error occurs. + return w.statePair.WaitCurrentStateReachExpected(ctx, expected) +} + +// Close closes the wal lifetime. +func (w *walLifetime) Close() { + // Close all background task. + w.cancel() + <-w.finish + + // No background task is running now, close current wal if needed. + currentState := w.statePair.GetCurrentState() + logger := log.With(zap.String("current", toStateString(currentState))) + if oldWAL := currentState.GetWAL(); oldWAL != nil { + oldWAL.Close() + logger.Info("close current term wal done at wal life time close") + } + logger.Info("wal lifetime closed") +} + +// backgroundTask is the background task for wal manager. +// wal open/close operation is executed in background task with single goroutine. +func (w *walLifetime) backgroundTask() { + defer func() { + w.logger.Info("wal lifetime background task exit") + close(w.finish) + }() + + // wait for expectedState change. + expectedState := initialExpectedWALState + for { + // single wal open/close operation should be serialized. + if err := w.statePair.WaitExpectedStateChanged(w.ctx, expectedState); err != nil { + // context canceled. break the background task. + return + } + expectedState = w.statePair.GetExpectedState() + w.logger.Info("expected state changed, do a life cycle", zap.String("expected", toStateString(expectedState))) + w.doLifetimeChanged(expectedState) + } +} + +// doLifetimeChanged executes the wal open/close operation once. +func (w *walLifetime) doLifetimeChanged(expectedState expectedWALState) { + currentState := w.statePair.GetCurrentState() + logger := w.logger.With(zap.String("expected", toStateString(expectedState)), zap.String("current", toStateString(currentState))) + + // Filter the expired expectedState. + if !isStateBefore(currentState, expectedState) { + // Happen at: the unavailable expected state at current term, but current wal open operation is failed. + logger.Info("current state is not before expected state, do nothing") + return + } + + // !!! Even if the expected state is canceled (context.Context.Err()), following operation must be executed. + // Otherwise a dead lock may be caused by unexpected rpc sequence. + // because new Current state after these operation must be same or greater than expected state. + + // term must be increasing or available -> unavailable, close current term wal is always applied. + term := currentState.Term() + if oldWAL := currentState.GetWAL(); oldWAL != nil { + oldWAL.Close() + logger.Info("close current term wal done") + // Push term to current state unavailable and open a new wal. + // -> (currentTerm,false) + w.statePair.SetCurrentState(newUnavailableCurrentState(term, nil)) + } + + // If expected state is unavailable, change term to expected state and return. + if !expectedState.Available() { + // -> (expectedTerm,false) + w.statePair.SetCurrentState(newUnavailableCurrentState(expectedState.Term(), nil)) + return + } + + // If expected state is available, open a new wal. + // TODO: merge the expectedState and expected state context together. + l, err := w.opener.Open(expectedState.Context(), &wal.OpenOption{ + Channel: expectedState.GetPChannelInfo(), + }) + if err != nil { + logger.Warn("open new wal fail", zap.Error(err)) + // Open new wal at expected term failed, push expected term to current state unavailable. + // -> (expectedTerm,false) + w.statePair.SetCurrentState(newUnavailableCurrentState(expectedState.Term(), err)) + return + } + logger.Info("open new wal done") + // -> (expectedTerm,true) + w.statePair.SetCurrentState(newAvailableCurrentState(l)) +} diff --git a/internal/streamingnode/server/walmanager/wal_lifetime_test.go b/internal/streamingnode/server/walmanager/wal_lifetime_test.go new file mode 100644 index 000000000000..11feed078973 --- /dev/null +++ b/internal/streamingnode/server/walmanager/wal_lifetime_test.go @@ -0,0 +1,106 @@ +package walmanager + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/pkg/streaming/util/types" +) + +func TestWALLifetime(t *testing.T) { + channel := "test" + opener := mock_wal.NewMockOpener(t) + opener.EXPECT().Open(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, oo *wal.OpenOption) (wal.WAL, error) { + l := mock_wal.NewMockWAL(t) + l.EXPECT().Channel().Return(oo.Channel) + l.EXPECT().Close().Return() + return l, nil + }) + + wlt := newWALLifetime(opener, channel) + assert.Nil(t, wlt.GetWAL()) + + // Test open. + err := wlt.Open(context.Background(), types.PChannelInfo{ + Name: channel, + Term: 2, + }) + assert.NoError(t, err) + assert.NotNil(t, wlt.GetWAL()) + assert.Equal(t, channel, wlt.GetWAL().Channel().Name) + assert.Equal(t, int64(2), wlt.GetWAL().Channel().Term) + + // Test expired term remove. + err = wlt.Remove(context.Background(), 1) + assertErrorTermExpired(t, err) + assert.NotNil(t, wlt.GetWAL()) + assert.Equal(t, channel, wlt.GetWAL().Channel().Name) + assert.Equal(t, int64(2), wlt.GetWAL().Channel().Term) + + // Test remove. + err = wlt.Remove(context.Background(), 2) + assert.NoError(t, err) + assert.Nil(t, wlt.GetWAL()) + + // Test expired term open. + err = wlt.Open(context.Background(), types.PChannelInfo{ + Name: channel, + Term: 1, + }) + assertErrorTermExpired(t, err) + assert.Nil(t, wlt.GetWAL()) + + // Test open after close. + err = wlt.Open(context.Background(), types.PChannelInfo{ + Name: channel, + Term: 5, + }) + assert.NoError(t, err) + assert.NotNil(t, wlt.GetWAL()) + assert.Equal(t, channel, wlt.GetWAL().Channel().Name) + assert.Equal(t, int64(5), wlt.GetWAL().Channel().Term) + + // Test overwrite open. + err = wlt.Open(context.Background(), types.PChannelInfo{ + Name: channel, + Term: 10, + }) + assert.NoError(t, err) + assert.NotNil(t, wlt.GetWAL()) + assert.Equal(t, channel, wlt.GetWAL().Channel().Name) + assert.Equal(t, int64(10), wlt.GetWAL().Channel().Term) + + // Test context canceled. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err = wlt.Open(ctx, types.PChannelInfo{ + Name: channel, + Term: 11, + }) + assert.ErrorIs(t, err, context.Canceled) + + err = wlt.Remove(ctx, 11) + assert.ErrorIs(t, err, context.Canceled) + + err = wlt.Open(context.Background(), types.PChannelInfo{ + Name: channel, + Term: 11, + }) + assertErrorTermExpired(t, err) + + wlt.Open(context.Background(), types.PChannelInfo{ + Name: channel, + Term: 12, + }) + assert.NotNil(t, wlt.GetWAL()) + assert.Equal(t, channel, wlt.GetWAL().Channel().Name) + assert.Equal(t, int64(12), wlt.GetWAL().Channel().Term) + + wlt.Close() +} diff --git a/internal/streamingnode/server/walmanager/wal_state.go b/internal/streamingnode/server/walmanager/wal_state.go new file mode 100644 index 000000000000..5ff2ede0aaa2 --- /dev/null +++ b/internal/streamingnode/server/walmanager/wal_state.go @@ -0,0 +1,238 @@ +package walmanager + +import ( + "context" + "fmt" + "sync" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/syncutil" +) + +var ( + _ currentWALState = (*availableCurrentWALState)(nil) + _ currentWALState = (*unavailableCurrentWALState)(nil) + _ expectedWALState = (*availableExpectedWALState)(nil) + _ expectedWALState = (*unavailableExpectedWALState)(nil) + + initialExpectedWALState expectedWALState = &unavailableExpectedWALState{ + term: types.InitialTerm, + } + initialCurrentWALState currentWALState = &unavailableCurrentWALState{ + term: types.InitialTerm, + err: nil, + } +) + +// newAvailableCurrentState creates a new available current state. +func newAvailableCurrentState(l wal.WAL) currentWALState { + return availableCurrentWALState{ + l: l, + } +} + +// newUnavailableCurrentState creates a new unavailable current state. +func newUnavailableCurrentState(term int64, err error) currentWALState { + return unavailableCurrentWALState{ + term: term, + err: err, + } +} + +// newAvailableExpectedState creates a new available expected state. +func newAvailableExpectedState(ctx context.Context, channel types.PChannelInfo) expectedWALState { + return availableExpectedWALState{ + ctx: ctx, + channel: channel, + } +} + +// newUnavailableExpectedState creates a new unavailable expected state. +func newUnavailableExpectedState(term int64) expectedWALState { + return unavailableExpectedWALState{ + term: term, + } +} + +// walState describe the state of a wal. +type walState interface { + // Term returns the term of the wal. + Term() int64 + + // Available returns whether the wal is available. + Available() bool +} + +// currentWALState is the current (exactly status) state of a wal. +type currentWALState interface { + walState + + // GetWAL returns the current wal. + // Return empty if the wal is not available now. + GetWAL() wal.WAL + + // GetLastError returns the last error of wal management. + GetLastError() error +} + +// expectedWALState is the expected state (which is sent from log coord) of a wal. +type expectedWALState interface { + walState + + // GetPChannelInfo returns the expected pchannel info of the wal. + // Return nil if the expected wal state is unavailable. + GetPChannelInfo() types.PChannelInfo + + // Context returns the context of the expected wal state. + Context() context.Context +} + +// availableCurrentWALState is a available wal state of current wal. +type availableCurrentWALState struct { + l wal.WAL +} + +func (s availableCurrentWALState) Term() int64 { + return s.l.Channel().Term +} + +func (s availableCurrentWALState) Available() bool { + return true +} + +func (s availableCurrentWALState) GetWAL() wal.WAL { + return s.l +} + +func (s availableCurrentWALState) GetLastError() error { + return nil +} + +// unavailableCurrentWALState is a unavailable state of current wal. +type unavailableCurrentWALState struct { + term int64 + err error +} + +func (s unavailableCurrentWALState) Term() int64 { + return s.term +} + +func (s unavailableCurrentWALState) Available() bool { + return false +} + +func (s unavailableCurrentWALState) GetWAL() wal.WAL { + return nil +} + +func (s unavailableCurrentWALState) GetLastError() error { + return s.err +} + +type availableExpectedWALState struct { + ctx context.Context + channel types.PChannelInfo +} + +func (s availableExpectedWALState) Term() int64 { + return s.channel.Term +} + +func (s availableExpectedWALState) Available() bool { + return true +} + +func (s availableExpectedWALState) Context() context.Context { + return s.ctx +} + +func (s availableExpectedWALState) GetPChannelInfo() types.PChannelInfo { + return s.channel +} + +type unavailableExpectedWALState struct { + term int64 +} + +func (s unavailableExpectedWALState) Term() int64 { + return s.term +} + +func (s unavailableExpectedWALState) Available() bool { + return false +} + +func (s unavailableExpectedWALState) GetPChannelInfo() types.PChannelInfo { + return types.PChannelInfo{} +} + +func (s unavailableExpectedWALState) Context() context.Context { + return context.Background() +} + +// newWALStateWithCond creates new walStateWithCond. +func newWALStateWithCond[T walState](state T) walStateWithCond[T] { + return walStateWithCond[T]{ + state: state, + cond: syncutil.NewContextCond(&sync.Mutex{}), + } +} + +// walStateWithCond is the walState with cv. +type walStateWithCond[T walState] struct { + state T + cond *syncutil.ContextCond +} + +// GetState returns the state of the wal. +func (w *walStateWithCond[T]) GetState() T { + w.cond.L.Lock() + defer w.cond.L.Unlock() + + // Copy the state, all state should be value type but not pointer type. + return w.state +} + +// SetStateAndNotify sets the state of the wal. +// Return false if the state is not changed. +func (w *walStateWithCond[T]) SetStateAndNotify(s T) bool { + w.cond.LockAndBroadcast() + defer w.cond.L.Unlock() + if isStateBefore(w.state, s) { + // Only update state when current state is before new state. + w.state = s + return true + } + return false +} + +// WatchChanged waits until the state is changed. +func (w *walStateWithCond[T]) WatchChanged(ctx context.Context, s walState) error { + w.cond.L.Lock() + for w.state.Term() == s.Term() && w.state.Available() == s.Available() { + if err := w.cond.Wait(ctx); err != nil { + return err + } + } + w.cond.L.Unlock() + return nil +} + +// isStateBefore returns whether s1 is before s2. +func isStateBefore(s1, s2 walState) bool { + // w1 is before w2 if term of w1 is less than w2. + // or w1 is available and w2 is not available in same term. + // because wal should always be available before unavailable in same term. + // (1, true) -> (1, false) is allowed. + // (1, true) -> (2, false) is allowed. + // (1, false) -> (2, true) is allowed. + // (1, false) -> (1, true) is not allowed. + return s1.Term() < s2.Term() || (s1.Term() == s2.Term() && s1.Available() && !s2.Available()) +} + +// toStateString returns the string representation of wal state. +func toStateString(s walState) string { + return fmt.Sprintf("(%d,%t)", s.Term(), s.Available()) +} diff --git a/internal/streamingnode/server/walmanager/wal_state_pair.go b/internal/streamingnode/server/walmanager/wal_state_pair.go new file mode 100644 index 000000000000..04db475d92cf --- /dev/null +++ b/internal/streamingnode/server/walmanager/wal_state_pair.go @@ -0,0 +1,71 @@ +package walmanager + +import ( + "context" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" +) + +// newWALStatePair create a new walStatePair +func newWALStatePair() *walStatePair { + return &walStatePair{ + currentState: newWALStateWithCond(initialCurrentWALState), // current state of wal, should always be same or greater (e.g. open wal failure) than expected state finally. + expectedState: newWALStateWithCond(initialExpectedWALState), // finial state expected of wal. + } +} + +// walStatePair is a wal with its state pair. +// a state pair is consist of current state and expected state. +type walStatePair struct { + currentState walStateWithCond[currentWALState] + expectedState walStateWithCond[expectedWALState] +} + +// WaitCurrentStateReachExpected waits until the current state is reach the expected state. +func (w *walStatePair) WaitCurrentStateReachExpected(ctx context.Context, expected expectedWALState) error { + current := w.currentState.GetState() + for isStateBefore(current, expected) { + if err := w.currentState.WatchChanged(ctx, current); err != nil { + // context canceled. + return err + } + current = w.currentState.GetState() + } + // Request term is a expired term, return term error. + if current.Term() > expected.Term() { + return status.NewUnmatchedChannelTerm("request term is expired, expected: %d, actual: %d", expected.Term(), current.Term()) + } + // Check if the wal is as expected. + return current.GetLastError() +} + +// GetExpectedState returns the expected state of the wal. +func (w *walStatePair) GetExpectedState() expectedWALState { + return w.expectedState.GetState() +} + +// GetCurrentState returns the current state of the wal. +func (w *walStatePair) GetCurrentState() currentWALState { + return w.currentState.GetState() +} + +// WaitExpectedStateChanged waits until the expected state is changed. +func (w *walStatePair) WaitExpectedStateChanged(ctx context.Context, oldExpected walState) error { + return w.expectedState.WatchChanged(ctx, oldExpected) +} + +// SetExpectedState sets the expected state of the wal. +func (w *walStatePair) SetExpectedState(s expectedWALState) bool { + return w.expectedState.SetStateAndNotify(s) +} + +// SetCurrentState sets the current state of the wal. +func (w *walStatePair) SetCurrentState(s currentWALState) bool { + return w.currentState.SetStateAndNotify(s) +} + +// GetWAL returns the current wal. +func (w *walStatePair) GetWAL() wal.WAL { + return w.currentState.GetState().GetWAL() +} diff --git a/internal/streamingnode/server/walmanager/wal_state_pair_test.go b/internal/streamingnode/server/walmanager/wal_state_pair_test.go new file mode 100644 index 000000000000..d23290456d7f --- /dev/null +++ b/internal/streamingnode/server/walmanager/wal_state_pair_test.go @@ -0,0 +1,83 @@ +package walmanager + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal" + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/pkg/streaming/util/types" +) + +func TestStatePair(t *testing.T) { + statePair := newWALStatePair() + currentState := statePair.GetCurrentState() + expectedState := statePair.GetExpectedState() + assert.Equal(t, initialCurrentWALState, currentState) + assert.Equal(t, initialExpectedWALState, expectedState) + assert.Nil(t, statePair.GetWAL()) + + statePair.SetExpectedState(newAvailableExpectedState(context.Background(), types.PChannelInfo{ + Term: 1, + })) + assert.Equal(t, "(1,true)", toStateString(statePair.GetExpectedState())) + + statePair.SetExpectedState(newUnavailableExpectedState(1)) + assert.Equal(t, "(1,false)", toStateString(statePair.GetExpectedState())) + + l := mock_wal.NewMockWAL(t) + l.EXPECT().Channel().Return(types.PChannelInfo{ + Term: 1, + }).Maybe() + statePair.SetCurrentState(newAvailableCurrentState(l)) + assert.Equal(t, "(1,true)", toStateString(statePair.GetCurrentState())) + + statePair.SetCurrentState(newUnavailableCurrentState(1, nil)) + assert.Equal(t, "(1,false)", toStateString(statePair.GetCurrentState())) + + assert.NoError(t, statePair.WaitExpectedStateChanged(context.Background(), newAvailableExpectedState(context.Background(), types.PChannelInfo{ + Term: 1, + }))) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + assert.ErrorIs(t, statePair.WaitExpectedStateChanged(ctx, newUnavailableExpectedState(1)), context.DeadlineExceeded) + + assert.NoError(t, statePair.WaitCurrentStateReachExpected(context.Background(), newUnavailableExpectedState(1))) + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + assert.ErrorIs(t, statePair.WaitCurrentStateReachExpected(ctx, newUnavailableExpectedState(2)), context.DeadlineExceeded) + + ch := make(chan struct{}) + go func() { + defer close(ch) + + err := statePair.WaitCurrentStateReachExpected(context.Background(), newUnavailableExpectedState(3)) + assertErrorTermExpired(t, err) + }() + + statePair.SetCurrentState(newUnavailableCurrentState(2, nil)) + time.Sleep(100 * time.Millisecond) + statePair.SetCurrentState(newUnavailableCurrentState(4, nil)) + + select { + case <-ch: + case <-time.After(1 * time.Second): + t.Error("WaitCurrentStateReachExpected should not block") + } +} + +func assertErrorTermExpired(t *testing.T, err error) { + assert.Error(t, err) + logErr := status.AsStreamingError(err) + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM, logErr.Code) +} + +func assertErrorChannelNotExist(t *testing.T, err error) { + assert.Error(t, err) + logErr := status.AsStreamingError(err) + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_CHANNEL_NOT_EXIST, logErr.Code) +} diff --git a/internal/streamingnode/server/walmanager/wal_state_test.go b/internal/streamingnode/server/walmanager/wal_state_test.go new file mode 100644 index 000000000000..e02b9adb4ba5 --- /dev/null +++ b/internal/streamingnode/server/walmanager/wal_state_test.go @@ -0,0 +1,183 @@ +package walmanager + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal" + "github.com/milvus-io/milvus/pkg/streaming/util/types" +) + +func TestInitialWALState(t *testing.T) { + currentState := initialCurrentWALState + + assert.Equal(t, types.InitialTerm, currentState.Term()) + assert.False(t, currentState.Available()) + assert.Nil(t, currentState.GetWAL()) + assert.NoError(t, currentState.GetLastError()) + + assert.Equal(t, toStateString(currentState), "(-1,false)") + + expectedState := initialExpectedWALState + assert.Equal(t, types.InitialTerm, expectedState.Term()) + assert.False(t, expectedState.Available()) + assert.Zero(t, expectedState.GetPChannelInfo()) + assert.Equal(t, context.Background(), expectedState.Context()) + assert.Equal(t, toStateString(expectedState), "(-1,false)") +} + +func TestAvailableCurrentWALState(t *testing.T) { + l := mock_wal.NewMockWAL(t) + l.EXPECT().Channel().Return(types.PChannelInfo{ + Term: 1, + }) + + state := newAvailableCurrentState(l) + assert.Equal(t, int64(1), state.Term()) + assert.True(t, state.Available()) + assert.Equal(t, l, state.GetWAL()) + assert.Nil(t, state.GetLastError()) + + assert.Equal(t, toStateString(state), "(1,true)") +} + +func TestUnavailableCurrentWALState(t *testing.T) { + err := errors.New("test") + state := newUnavailableCurrentState(1, err) + + assert.Equal(t, int64(1), state.Term()) + assert.False(t, state.Available()) + assert.Nil(t, state.GetWAL()) + assert.ErrorIs(t, state.GetLastError(), err) + + assert.Equal(t, toStateString(state), "(1,false)") +} + +func TestAvailableExpectedWALState(t *testing.T) { + channel := types.PChannelInfo{} + state := newAvailableExpectedState(context.Background(), channel) + + assert.Equal(t, int64(0), state.Term()) + assert.True(t, state.Available()) + assert.Equal(t, context.Background(), state.Context()) + assert.Equal(t, channel, state.GetPChannelInfo()) + + assert.Equal(t, toStateString(state), "(0,true)") +} + +func TestUnavailableExpectedWALState(t *testing.T) { + state := newUnavailableExpectedState(1) + + assert.Equal(t, int64(1), state.Term()) + assert.False(t, state.Available()) + assert.Zero(t, state.GetPChannelInfo()) + assert.Equal(t, context.Background(), state.Context()) + + assert.Equal(t, toStateString(state), "(1,false)") +} + +func TestIsStateBefore(t *testing.T) { + // initial state comparison. + assert.False(t, isStateBefore(initialCurrentWALState, initialExpectedWALState)) + assert.False(t, isStateBefore(initialExpectedWALState, initialCurrentWALState)) + + l := mock_wal.NewMockWAL(t) + l.EXPECT().Channel().Return(types.PChannelInfo{ + Term: 1, + }) + + cases := []walState{ + newAvailableCurrentState(l), + newUnavailableCurrentState(1, nil), + newAvailableExpectedState(context.Background(), types.PChannelInfo{ + Term: 3, + }), + newUnavailableExpectedState(5), + } + for _, s := range cases { + assert.True(t, isStateBefore(initialCurrentWALState, s)) + assert.True(t, isStateBefore(initialExpectedWALState, s)) + assert.False(t, isStateBefore(s, initialCurrentWALState)) + assert.False(t, isStateBefore(s, initialExpectedWALState)) + } + for i, s1 := range cases { + for _, s2 := range cases[:i] { + assert.True(t, isStateBefore(s2, s1)) + assert.False(t, isStateBefore(s1, s2)) + } + } +} + +func TestStateWithCond(t *testing.T) { + stateCond := newWALStateWithCond(initialCurrentWALState) + assert.Equal(t, initialCurrentWALState, stateCond.GetState()) + + // test notification. + wg := sync.WaitGroup{} + targetState := newUnavailableCurrentState(10, nil) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + oldState := stateCond.GetState() + for { + if !isStateBefore(oldState, targetState) { + break + } + + err := stateCond.WatchChanged(context.Background(), oldState) + assert.NoError(t, err) + newState := stateCond.GetState() + assert.True(t, isStateBefore(oldState, newState)) + oldState = newState + } + }() + wg.Add(1) + go func() { + defer wg.Done() + + oldState := stateCond.GetState() + for i := int64(0); i < 10; i++ { + var newState currentWALState + if i%2 == 0 { + l := mock_wal.NewMockWAL(t) + l.EXPECT().Channel().Return(types.PChannelInfo{ + Term: i % 2, + }).Maybe() + newState = newAvailableCurrentState(l) + } else { + newState = newUnavailableCurrentState(i%3, nil) + } + stateCond.SetStateAndNotify(newState) + + // updated state should never before old state. + stateNow := stateCond.GetState() + assert.False(t, isStateBefore(stateNow, oldState)) + oldState = stateNow + } + stateCond.SetStateAndNotify(targetState) + }() + } + + ch := make(chan struct{}) + go func() { + wg.Wait() + close(ch) + }() + select { + case <-time.After(time.Second * 3): + t.Errorf("test should never block") + case <-ch: + } + + // test cancel. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + err := stateCond.WatchChanged(ctx, targetState) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} diff --git a/internal/util/streamingutil/util/wal_selector.go b/internal/util/streamingutil/util/wal_selector.go new file mode 100644 index 000000000000..cbb24db45748 --- /dev/null +++ b/internal/util/streamingutil/util/wal_selector.go @@ -0,0 +1,75 @@ +package util + +import ( + "github.com/cockroachdb/errors" + "go.uber.org/atomic" + + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +const ( + walTypeDefault = "default" + walTypeNatsmq = "natsmq" + walTypeRocksmq = "rocksmq" + walTypeKafka = "kafka" + walTypePulsar = "pulsar" +) + +type walEnable struct { + Rocksmq bool + Natsmq bool + Pulsar bool + Kafka bool +} + +var isStandAlone = atomic.NewBool(false) + +// EnableStandAlone enable standalone mode. +func EnableStandAlone(standalone bool) { + isStandAlone.Store(standalone) +} + +// MustSelectWALName select wal name. +func MustSelectWALName() string { + standalone := isStandAlone.Load() + params := paramtable.Get() + return mustSelectWALName(standalone, params.MQCfg.Type.GetValue(), walEnable{ + params.RocksmqEnable(), + params.NatsmqEnable(), + params.PulsarEnable(), + params.KafkaEnable(), + }) +} + +// mustSelectWALName select wal name. +func mustSelectWALName(standalone bool, mqType string, enable walEnable) string { + if mqType != walTypeDefault { + if err := validateWALName(standalone, mqType); err != nil { + panic(err) + } + return mqType + } + if standalone { + if enable.Rocksmq { + return walTypeRocksmq + } + } + if enable.Pulsar { + return walTypePulsar + } + if enable.Kafka { + return walTypeKafka + } + panic(errors.Errorf("no available wal config found, %s, enable: %+v", mqType, enable)) +} + +// Validate mq type. +func validateWALName(standalone bool, mqType string) error { + // we may register more mq type by plugin. + // so we should not check all mq type here. + // only check standalone type. + if !standalone && (mqType == walTypeRocksmq || mqType == walTypeNatsmq) { + return errors.Newf("mq %s is only valid in standalone mode") + } + return nil +} diff --git a/internal/util/streamingutil/util/wal_selector_test.go b/internal/util/streamingutil/util/wal_selector_test.go new file mode 100644 index 000000000000..6343eaf1b371 --- /dev/null +++ b/internal/util/streamingutil/util/wal_selector_test.go @@ -0,0 +1,33 @@ +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidateWALType(t *testing.T) { + assert.Error(t, validateWALName(false, walTypeNatsmq)) + assert.Error(t, validateWALName(false, walTypeRocksmq)) +} + +func TestSelectWALType(t *testing.T) { + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{true, true, true, true}), walTypeRocksmq) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, true}), walTypeKafka) + assert.Panics(t, func() { mustSelectWALName(true, walTypeDefault, walEnable{false, false, false, false}) }) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{true, true, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, true, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, true}), walTypeKafka) + assert.Panics(t, func() { mustSelectWALName(false, walTypeDefault, walEnable{false, false, false, false}) }) + assert.Equal(t, mustSelectWALName(true, walTypeRocksmq, walEnable{true, true, true, true}), walTypeRocksmq) + assert.Equal(t, mustSelectWALName(true, walTypeNatsmq, walEnable{true, true, true, true}), walTypeNatsmq) + assert.Equal(t, mustSelectWALName(true, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(true, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka) + assert.Panics(t, func() { mustSelectWALName(false, walTypeRocksmq, walEnable{true, true, true, true}) }) + assert.Panics(t, func() { mustSelectWALName(false, walTypeNatsmq, walEnable{true, true, true, true}) }) + assert.Equal(t, mustSelectWALName(false, walTypePulsar, walEnable{true, true, true, true}), walTypePulsar) + assert.Equal(t, mustSelectWALName(false, walTypeKafka, walEnable{true, true, true, true}), walTypeKafka) +} diff --git a/pkg/streaming/util/types/pchannel_info.go b/pkg/streaming/util/types/pchannel_info.go index 952a69581cb8..66656450c6f9 100644 --- a/pkg/streaming/util/types/pchannel_info.go +++ b/pkg/streaming/util/types/pchannel_info.go @@ -1,5 +1,9 @@ package types +const ( + InitialTerm int64 = -1 +) + // PChannelInfo is the struct for pchannel info. type PChannelInfo struct { Name string // name of pchannel. diff --git a/pkg/streaming/walimpls/impls/rmq/builder.go b/pkg/streaming/walimpls/impls/rmq/builder.go index 2fd66a04ced3..33d1bbe2245c 100644 --- a/pkg/streaming/walimpls/impls/rmq/builder.go +++ b/pkg/streaming/walimpls/impls/rmq/builder.go @@ -9,7 +9,7 @@ import ( ) const ( - walName = "rmq" + walName = "rocksmq" ) func init() { From 89e22f3012e4714c70f5251a3300784f57acf225 Mon Sep 17 00:00:00 2001 From: chyezh Date: Thu, 4 Jul 2024 16:00:41 +0800 Subject: [PATCH 2/3] fix: scanner refine and bug fix Signed-off-by: chyezh --- .../server/wal/adaptor/scanner_adaptor.go | 20 ++++++++++++++----- .../server/wal/utility/reorder_buffer.go | 14 +++++++++++-- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go index c15996527b7a..90a718a48532 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -1,6 +1,9 @@ package adaptor import ( + "github.com/milvus-io/milvus/pkg/log" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" "github.com/milvus-io/milvus/pkg/streaming/util/message" @@ -18,6 +21,7 @@ func newScannerAdaptor( cleanup func(), ) wal.Scanner { s := &scannerAdaptorImpl{ + logger: log.With(zap.String("name", name), zap.String("channel", l.Channel().Name)), innerWAL: l, readOption: readOption, sendingCh: make(chan message.ImmutableMessage, 1), @@ -33,6 +37,7 @@ func newScannerAdaptor( // scannerAdaptorImpl is a wrapper of ScannerImpls to extend it into a Scanner interface. type scannerAdaptorImpl struct { *helper.ScannerHelper + logger *log.MLogger innerWAL walimpls.WALImpls readOption wal.ReadOption sendingCh chan message.ImmutableMessage @@ -96,8 +101,9 @@ func (s *scannerAdaptorImpl) getEventCh(scanner walimpls.ScannerImpls) (<-chan m // we always need to recv message from upstream to avoid starve. return scanner.Chan(), nil } - // TODO: configurable pending count. - if s.pendingQueue.Len()+s.reorderBuffer.Len() > 1024 { + // TODO: configurable pending buffer count. + // If the pending queue is full, we need to wait until it's consumed to avoid scanner overloading. + if s.pendingQueue.Len() > 16 { return nil, s.sendingCh } return scanner.Chan(), s.sendingCh @@ -107,8 +113,6 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { if msg.MessageType() == message.MessageTypeTimeTick { // If the time tick message incoming, // the reorder buffer can be consumed into a pending queue with latest timetick. - - // TODO: !!! should we drop the unexpected broken timetick rule message. s.pendingQueue.Add(s.reorderBuffer.PopUtilTimeTick(msg.TimeTick())) return } @@ -117,5 +121,11 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { return } // otherwise add message into reorder buffer directly. - s.reorderBuffer.Push(msg) + if err := s.reorderBuffer.Push(msg); err != nil { + s.logger.Warn("failed to push message into reorder buffer", + zap.Any("msgID", msg.MessageID()), + zap.Uint64("timetick", msg.TimeTick()), + zap.String("vchannel", msg.VChannel()), + zap.Error(err)) + } } diff --git a/internal/streamingnode/server/wal/utility/reorder_buffer.go b/internal/streamingnode/server/wal/utility/reorder_buffer.go index e45a335c9cd1..0862855840ac 100644 --- a/internal/streamingnode/server/wal/utility/reorder_buffer.go +++ b/internal/streamingnode/server/wal/utility/reorder_buffer.go @@ -1,13 +1,16 @@ package utility import ( + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/util/typeutil" ) // ReOrderByTimeTickBuffer is a buffer that stores messages and pops them in order of time tick. type ReOrderByTimeTickBuffer struct { - messageHeap typeutil.Heap[message.ImmutableMessage] + messageHeap typeutil.Heap[message.ImmutableMessage] + lastPopTimeTick uint64 } // NewReOrderBuffer creates a new ReOrderBuffer. @@ -18,8 +21,14 @@ func NewReOrderBuffer() *ReOrderByTimeTickBuffer { } // Push pushes a message into the buffer. -func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) { +func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) error { + // !!! Drop the unexpected broken timetick rule message. + // It will be enabled until the first timetick coming. + if msg.TimeTick() < r.lastPopTimeTick { + return errors.Errorf("message time tick is less than last pop time tick: %d", r.lastPopTimeTick) + } r.messageHeap.Push(msg) + return nil } // PopUtilTimeTick pops all messages whose time tick is less than or equal to the given time tick. @@ -29,6 +38,7 @@ func (r *ReOrderByTimeTickBuffer) PopUtilTimeTick(timetick uint64) []message.Imm for r.messageHeap.Len() > 0 && r.messageHeap.Peek().TimeTick() <= timetick { res = append(res, r.messageHeap.Pop()) } + r.lastPopTimeTick = timetick return res } From fcfdc6b61e8f47c50c9a1663a498c3e751aea0aa Mon Sep 17 00:00:00 2001 From: chyezh Date: Thu, 4 Jul 2024 17:21:05 +0800 Subject: [PATCH 3/3] fix: use types.Pchannel at remove operation Signed-off-by: chyezh --- .../server/wal/adaptor/scanner_adaptor_test.go | 2 ++ internal/streamingnode/server/walmanager/manager.go | 2 +- internal/streamingnode/server/walmanager/manager_impl.go | 8 ++++---- .../streamingnode/server/walmanager/manager_impl_test.go | 6 +++--- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go index 4b24fbbaf70c..319f8a2345d8 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go @@ -10,12 +10,14 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls" "github.com/milvus-io/milvus/pkg/streaming/util/options" + "github.com/milvus-io/milvus/pkg/streaming/util/types" ) func TestScannerAdaptorReadError(t *testing.T) { err := errors.New("read error") l := mock_walimpls.NewMockWALImpls(t) l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err) + l.EXPECT().Channel().Return(types.PChannelInfo{}) s := newScannerAdaptor("scanner", l, wal.ReadOption{ DeliverPolicy: options.DeliverPolicyAll(), diff --git a/internal/streamingnode/server/walmanager/manager.go b/internal/streamingnode/server/walmanager/manager.go index c7a5d01b8081..33892fc3add4 100644 --- a/internal/streamingnode/server/walmanager/manager.go +++ b/internal/streamingnode/server/walmanager/manager.go @@ -22,7 +22,7 @@ type Manager interface { GetAllAvailableChannels() ([]types.PChannelInfo, error) // Remove removes the wal instance for the channel. - Remove(ctx context.Context, channel string, term int64) error + Remove(ctx context.Context, channel types.PChannelInfo) error // Close these manager and release all managed WAL. Close() diff --git a/internal/streamingnode/server/walmanager/manager_impl.go b/internal/streamingnode/server/walmanager/manager_impl.go index 1ca77fa4a3ad..d6aee61f2086 100644 --- a/internal/streamingnode/server/walmanager/manager_impl.go +++ b/internal/streamingnode/server/walmanager/manager_impl.go @@ -62,7 +62,7 @@ func (m *managerImpl) Open(ctx context.Context, channel types.PChannelInfo) (err } // Remove removes the wal instance for the channel. -func (m *managerImpl) Remove(ctx context.Context, channel string, term int64) (err error) { +func (m *managerImpl) Remove(ctx context.Context, channel types.PChannelInfo) (err error) { // reject operation if manager is closing. if m.lifetime.Add(lifetime.IsWorking) != nil { return status.NewOnShutdownError("wal manager is closed") @@ -70,12 +70,12 @@ func (m *managerImpl) Remove(ctx context.Context, channel string, term int64) (e defer func() { m.lifetime.Done() if err != nil { - log.Warn("remove wal failed", zap.Error(err), zap.String("channel", channel), zap.Int64("term", term)) + log.Warn("remove wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) } - log.Info("remove wal success", zap.String("channel", channel), zap.Int64("term", term)) + log.Info("remove wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term)) }() - return m.getWALLifetime(channel).Remove(ctx, term) + return m.getWALLifetime(channel.Name).Remove(ctx, channel.Term) } // GetAvailableWAL returns a available wal instance for the channel. diff --git a/internal/streamingnode/server/walmanager/manager_impl_test.go b/internal/streamingnode/server/walmanager/manager_impl_test.go index 9885eac188fc..93c90c6b282b 100644 --- a/internal/streamingnode/server/walmanager/manager_impl_test.go +++ b/internal/streamingnode/server/walmanager/manager_impl_test.go @@ -42,7 +42,7 @@ func TestManager(t *testing.T) { assert.NoError(t, err) assert.Len(t, h, 0) - err = m.Remove(context.Background(), channelName, 1) + err = m.Remove(context.Background(), types.PChannelInfo{Name: channelName, Term: 1}) assert.NoError(t, err) l, err = m.GetAvailableWAL(channelName, 1) @@ -61,7 +61,7 @@ func TestManager(t *testing.T) { }) assert.NoError(t, err) - err = m.Remove(context.Background(), channelName, 1) + err = m.Remove(context.Background(), types.PChannelInfo{Name: channelName, Term: 1}) assertErrorTermExpired(t, err) l, err = m.GetAvailableWAL(channelName, 1) @@ -98,7 +98,7 @@ func TestManager(t *testing.T) { }) assertShutdownError(t, err) - err = m.Remove(context.Background(), channelName, 2) + err = m.Remove(context.Background(), types.PChannelInfo{Name: channelName, Term: 2}) assertShutdownError(t, err) l, err = m.GetAvailableWAL(channelName, 2)