Skip to content

Commit

Permalink
enhance: implement wal managerment
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <chyezh@outlook.com>
  • Loading branch information
chyezh committed Jun 27, 2024
1 parent 5932f22 commit a629dd4
Show file tree
Hide file tree
Showing 14 changed files with 1,249 additions and 2 deletions.
1 change: 0 additions & 1 deletion internal/proto/streamingpb/extends.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ package streamingpb

const (
ServiceMethodPrefix = "/milvus.proto.log"
InitialTerm = int64(-1)
)
29 changes: 29 additions & 0 deletions internal/streamingnode/server/walmanager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package walmanager

import (
"context"

"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

var _ Manager = (*managerImpl)(nil)

// Manager is the interface for managing the wal instances.
type Manager interface {
// Open opens a wal instance for the channel on this Manager.
Open(ctx context.Context, channel types.PChannelInfo) error

// GetAvailableWAL returns a available wal instance for the channel.
// Return nil if the wal instance is not found.
GetAvailableWAL(channelName string, term int64) (wal.WAL, error)

// GetAllAvailableWALInfo returns all available channel info.
GetAllAvailableChannels() ([]types.PChannelInfo, error)

// Remove removes the wal instance for the channel.
Remove(ctx context.Context, channel string, term int64) error

// Close these manager and release all managed WAL.
Close()
}
152 changes: 152 additions & 0 deletions internal/streamingnode/server/walmanager/manager_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package walmanager

import (
"context"

"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// OpenManager create a wal manager.
func OpenManager() (Manager, error) {
walName := util.MustSelectWALName()
log.Info("open wal manager", zap.String("walName", walName))
opener, err := registry.MustGetBuilder(walName).Build()
if err != nil {
return nil, err
}
return newManager(opener), nil
}

// newManager create a wal manager.
func newManager(opener wal.Opener) Manager {
return &managerImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
wltMap: typeutil.NewConcurrentMap[string, *walLifetime](),
opener: opener,
}
}

// All management operation for a wal will be serialized with order of term.
type managerImpl struct {
lifetime lifetime.Lifetime[lifetime.State]

wltMap *typeutil.ConcurrentMap[string, *walLifetime]
opener wal.Opener // wal allocator
}

// Open opens a wal instance for the channel on this Manager.
func (m *managerImpl) Open(ctx context.Context, channel types.PChannelInfo) (err error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return status.NewOnShutdownError("wal manager is closed")
}
defer func() {
m.lifetime.Done()
if err != nil {
log.Warn("open wal failed", zap.Error(err), zap.String("channel", channel.Name), zap.Int64("term", channel.Term))
return
}
log.Info("open wal success", zap.String("channel", channel.Name), zap.Int64("term", channel.Term))
}()

return m.getWALLifetime(channel.Name).Open(ctx, channel)
}

// Remove removes the wal instance for the channel.
func (m *managerImpl) Remove(ctx context.Context, channel string, term int64) (err error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return status.NewOnShutdownError("wal manager is closed")
}
defer func() {
m.lifetime.Done()
if err != nil {
log.Warn("remove wal failed", zap.Error(err), zap.String("channel", channel), zap.Int64("term", term))
}
log.Info("remove wal success", zap.String("channel", channel), zap.Int64("term", term))
}()

return m.getWALLifetime(channel).Remove(ctx, term)
}

// GetAvailableWAL returns a available wal instance for the channel.
// Return nil if the wal instance is not found.
func (m *managerImpl) GetAvailableWAL(channelName string, term int64) (wal.WAL, error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return nil, status.NewOnShutdownError("wal manager is closed")
}
defer m.lifetime.Done()

l := m.getWALLifetime(channelName).GetWAL()
if l == nil {
return nil, status.NewChannelNotExist(channelName)
}

channelTerm := l.Channel().Term
if channelTerm != term {
return nil, status.NewUnmatchedChannelTerm(channelName, term, channelTerm)
}
return l, nil
}

// GetAllAvailableChannels returns all available channel info.
func (m *managerImpl) GetAllAvailableChannels() ([]types.PChannelInfo, error) {
// reject operation if manager is closing.
if m.lifetime.Add(lifetime.IsWorking) != nil {
return nil, status.NewOnShutdownError("wal manager is closed")
}
defer m.lifetime.Done()

// collect all available wal info.
infos := make([]types.PChannelInfo, 0)
m.wltMap.Range(func(channel string, lt *walLifetime) bool {
if l := lt.GetWAL(); l != nil {
info := l.Channel()
infos = append(infos, info)
}
return true
})
return infos, nil
}

// Close these manager and release all managed WAL.
func (m *managerImpl) Close() {
m.lifetime.SetState(lifetime.Stopped)
m.lifetime.Wait()
m.lifetime.Close()

// close all underlying walLifetime.
m.wltMap.Range(func(channel string, wlt *walLifetime) bool {
wlt.Close()
return true
})

// close all underlying wal instance by allocator if there's resource leak.
m.opener.Close()
}

// getWALLifetime returns the wal lifetime for the channel.
func (m *managerImpl) getWALLifetime(channel string) *walLifetime {
if wlt, loaded := m.wltMap.Get(channel); loaded {
return wlt
}

// Perform a cas here.
newWLT := newWALLifetime(m.opener, channel)
wlt, loaded := m.wltMap.GetOrInsert(channel, newWLT)
// if loaded, lifetime is exist, close the redundant lifetime.
if loaded {
newWLT.Close()
}
return wlt
}
113 changes: 113 additions & 0 deletions internal/streamingnode/server/walmanager/manager_impl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package walmanager

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

func TestMain(m *testing.M) {
paramtable.Init()
m.Run()
}

func TestManager(t *testing.T) {
opener := mock_wal.NewMockOpener(t)
opener.EXPECT().Open(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, oo *wal.OpenOption) (wal.WAL, error) {
l := mock_wal.NewMockWAL(t)
l.EXPECT().Channel().Return(oo.Channel)
l.EXPECT().Close().Return()
return l, nil
})
opener.EXPECT().Close().Return()

m := newManager(opener)
channelName := "ch1"

l, err := m.GetAvailableWAL(channelName, 1)
assertErrorChannelNotExist(t, err)
assert.Nil(t, l)

h, err := m.GetAllAvailableChannels()
assert.NoError(t, err)
assert.Len(t, h, 0)

err = m.Remove(context.Background(), channelName, 1)
assert.NoError(t, err)

l, err = m.GetAvailableWAL(channelName, 1)
assertErrorChannelNotExist(t, err)
assert.Nil(t, l)

err = m.Open(context.Background(), types.PChannelInfo{
Name: channelName,
Term: 1,
})
assertErrorTermExpired(t, err)

err = m.Open(context.Background(), types.PChannelInfo{
Name: channelName,
Term: 2,
})
assert.NoError(t, err)

err = m.Remove(context.Background(), channelName, 1)
assertErrorTermExpired(t, err)

l, err = m.GetAvailableWAL(channelName, 1)
assertErrorTermExpired(t, err)
assert.Nil(t, l)

l, err = m.GetAvailableWAL(channelName, 2)
assert.NoError(t, err)
assert.NotNil(t, l)

h, err = m.GetAllAvailableChannels()
assert.NoError(t, err)
assert.Len(t, h, 1)

err = m.Open(context.Background(), types.PChannelInfo{
Name: "term2",
Term: 3,
})
assert.NoError(t, err)

h, err = m.GetAllAvailableChannels()
assert.NoError(t, err)
assert.Len(t, h, 2)

m.Close()

h, err = m.GetAllAvailableChannels()
assertShutdownError(t, err)
assert.Len(t, h, 0)

err = m.Open(context.Background(), types.PChannelInfo{
Name: "term2",
Term: 4,
})
assertShutdownError(t, err)

err = m.Remove(context.Background(), channelName, 2)
assertShutdownError(t, err)

l, err = m.GetAvailableWAL(channelName, 2)
assertShutdownError(t, err)
assert.Nil(t, l)
}

func assertShutdownError(t *testing.T, err error) {
assert.Error(t, err)
e := status.AsStreamingError(err)
assert.Equal(t, e.Code, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN)
}
Loading

0 comments on commit a629dd4

Please sign in to comment.