Skip to content

Commit

Permalink
enhance: wal interface definition
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <chyezh@outlook.com>
  • Loading branch information
chyezh committed Jun 11, 2024
1 parent 2b7ee19 commit b839530
Show file tree
Hide file tree
Showing 40 changed files with 2,928 additions and 37 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,9 @@ generate-mockery-log:

generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg generate-mockery-log

generate-mockery-lognode: getdeps
$(INSTALL_PATH)/mockery --config $(PWD)/internal/lognode/.mockery.yaml

generate-yaml: milvus-tools
@echo "Updating milvus config yaml"
@$(PWD)/bin/tools/config gen-yaml && mv milvus.yaml configs/milvus.yaml
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ require (
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/pkg/errors v0.9.1
github.com/remeh/sizedwaitgroup v1.0.0
github.com/zeebo/xxh3 v1.0.2
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -237,7 +239,6 @@ require (
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,8 @@ github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/quasilyte/go-ruleguard/dsl v0.3.22 h1:wd8zkOhSNr+I+8Qeciml08ivDt1pSXe60+5DqOpCjPE=
github.com/quasilyte/go-ruleguard/dsl v0.3.22/go.mod h1:KeCP03KrjuSO0H1kTuZQCWlQPulDV6YMIXmpQss17rU=
github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E=
github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ=
Expand Down
66 changes: 66 additions & 0 deletions internal/lognode/server/wal/RAEDME.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# WAL

`wal` package is the basic defination of wal interface of milvus lognode.

## Project arrangement

- `/`: only define exposed interfaces.
- `/walimpls/`: define the underlying message system interfaces need to be implemented.
- `/registry/`: A static lifetime registry to regsiter new implementation for inverting dependency.
- `/adaptor/`: adaptors to implement `wal` interface from `walimpls` interface
- `/helper/`: A utility used to help developer to implement `walimpls` conveniently.
- `/utility/`: A utility code for common logic or data structure.

## Lifetime Of Interfaces

- `OpenerBuilder` has a static lifetime in a programs:
- `Opener` keep same lifetime with underlying resources (such as mq client).
- `WAL` keep same lifetime with underlying writer of wal, and it's lifetime is always included in related `Opener`.
- `Scanner` keep same lifetime with underlying reader of wal, and it's lifetime is always included in related `WAL`.

## Add New Implemetation Of WAL

developper who want to add a new implementation of `wal` should implements the `walimpls` package interfaces. following interfaces is required:

- `walimpls.OpenerBuilderImpls`
- `walimpls.OpenerImpls`
- `walimpls.ScannerImpls`
- `walimpls.WALImpls`

`OpenerBuilderImpls` create `OpenerImpls`; `OpenerImpls` creates `WALImpls`; `WALImpls` create `ScannerImpls`.
Then register the implmentation of `walimpls.OpenerBuilderImpls` into `registry` package.

```
var _ OpenerBuilderImpls = b{};
registry.RegisterBuilder(b{})
```

All things have been done.

## Use WAL

```
name := "your builder name"
var yourCh *logpb.PChannelInfo
opener, err := registry.MustGetBuilder(name).Build()
if err != nil {
panic(err)
}
ctx := context.Background()
logger, err := opener.Open(ctx, wal.OpenOption{
Channel: yourCh
})
if err != nil {
panic(err)
}
```
## Adaptor

package `adaptor` is used to adapt `walimpls` and `wal` together.
common wal function should be implement by it. Such as:

- lifetime management
- interceptor implementation
- scanner wrapped up
- write ahead cache implementation
32 changes: 32 additions & 0 deletions internal/lognode/server/wal/adaptor/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package adaptor

import (
"github.com/milvus-io/milvus/internal/lognode/server/wal"
"github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls"
)

var _ wal.OpenerBuilder = (*builderAdaptorImpl)(nil)

func AdaptImplsToBuilder(builder walimpls.OpenerBuilderImpls) wal.OpenerBuilder {
return builderAdaptorImpl{
builder: builder,
}
}

type builderAdaptorImpl struct {
builder walimpls.OpenerBuilderImpls
}

func (b builderAdaptorImpl) Name() string {
return b.builder.Name()
}

func (b builderAdaptorImpl) Build() (wal.Opener, error) {
_, err := b.builder.Build()
if err != nil {
return nil, err
}
return nil, nil
// TODO: wait for implementation.
// return adaptImplsToOpener(o), nil
}
31 changes: 31 additions & 0 deletions internal/lognode/server/wal/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package wal

import (
"context"

"github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls"
"github.com/milvus-io/milvus/internal/proto/logpb"
)

// OpenerBuilder is the interface for build wal opener.
type OpenerBuilder interface {
// Name of the wal builder, should be a lowercase string.
Name() string

Build() (Opener, error)
}

// OpenOption is the option for allocating wal instance.
type OpenOption struct {
Channel *logpb.PChannelInfo
InterceptorBuilders []walimpls.InterceptorBuilder // Interceptor builders to build when open.
}

// Opener is the interface for build wal instance.
type Opener interface {
// Open open a wal instance.
Open(ctx context.Context, opt *OpenOption) (WAL, error)

// Close closes the opener resources.
Close()
}
58 changes: 58 additions & 0 deletions internal/lognode/server/wal/helper/scanner_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package helper

import "context"

// NewScannerHelper creates a new ScannerHelper.
func NewScannerHelper(scannerName string) *ScannerHelper {
ctx, cancel := context.WithCancel(context.Background())
return &ScannerHelper{
scannerName: scannerName,
ctx: ctx,
cancel: cancel,
finishCh: make(chan struct{}),
err: nil,
}
}

// ScannerHelper is a helper for scanner implementation.
type ScannerHelper struct {
scannerName string
ctx context.Context
cancel context.CancelFunc
finishCh chan struct{}
err error
}

// Context returns the context of the scanner, which will cancel when the scanner helper is closed.
func (s *ScannerHelper) Context() context.Context {
return s.ctx
}

// Name returns the name of the scanner.
func (s *ScannerHelper) Name() string {
return s.scannerName
}

// Error returns the error of the scanner.
func (s *ScannerHelper) Error() error {
<-s.finishCh
return s.err
}

// Done returns a channel that will be closed when the scanner is finished.
func (s *ScannerHelper) Done() <-chan struct{} {
return s.finishCh
}

// Close closes the scanner, block until the Finish is called.
func (s *ScannerHelper) Close() error {
s.cancel()
<-s.finishCh
return s.err
}

// Finish finishes the scanner with an error.
func (s *ScannerHelper) Finish(err error) {
s.err = err
close(s.finishCh)
}
58 changes: 58 additions & 0 deletions internal/lognode/server/wal/helper/scanner_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package helper

import (
"testing"

"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
)

func TestScanner(t *testing.T) {
h := NewScannerHelper("test")
assert.NotNil(t, h.Context())
assert.Equal(t, h.Name(), "test")
assert.NotNil(t, h.Context())

select {
case <-h.Done():
t.Errorf("should not done")
return
case <-h.Context().Done():
t.Error("should not cancel")
return
default:
}

finishErr := errors.New("test")

ch := make(chan struct{})
go func() {
defer close(ch)
done := false
cancel := false
cancelCh := h.Context().Done()
doneCh := h.Done()
for i := 0; ; i += 1 {
select {
case <-doneCh:
done = true
doneCh = nil
case <-cancelCh:
cancel = true
cancelCh = nil
h.Finish(finishErr)
}
if cancel && done {
return
}
if i == 0 {
assert.True(t, cancel && !done)
} else if i == 1 {
assert.True(t, cancel && done)
}
}
}()
h.Close()
assert.ErrorIs(t, h.Error(), finishErr)
<-ch
}
33 changes: 33 additions & 0 deletions internal/lognode/server/wal/helper/wal_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package helper

import (
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls"
"github.com/milvus-io/milvus/internal/proto/logpb"
"github.com/milvus-io/milvus/pkg/log"
)

// NewWALHelper creates a new WALHelper.
func NewWALHelper(opt *walimpls.OpenOption) *WALHelper {
return &WALHelper{
logger: log.With(zap.Any("channel", opt.Channel)),
channel: opt.Channel,
}
}

// WALHelper is a helper for WAL implementation.
type WALHelper struct {
logger *log.MLogger
channel *logpb.PChannelInfo
}

// Channel returns the channel of the WAL.
func (w *WALHelper) Channel() *logpb.PChannelInfo {
return w.channel
}

// Log returns the logger of the WAL.
func (w *WALHelper) Log() *log.MLogger {
return w.logger
}
24 changes: 24 additions & 0 deletions internal/lognode/server/wal/helper/wal_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package helper

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls"
"github.com/milvus-io/milvus/internal/proto/logpb"
)

func TestWALHelper(t *testing.T) {
h := NewWALHelper(&walimpls.OpenOption{
Channel: &logpb.PChannelInfo{
Name: "test",
Term: 1,
ServerID: 1,
VChannelInfos: []*logpb.VChannelInfo{},
},
})
assert.NotNil(t, h.Channel())
assert.Equal(t, h.Channel().Name, "test")
assert.NotNil(t, h.Log())
}
33 changes: 33 additions & 0 deletions internal/lognode/server/wal/registry/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package registry

import (
"github.com/milvus-io/milvus/internal/lognode/server/wal"
"github.com/milvus-io/milvus/internal/lognode/server/wal/adaptor"
"github.com/milvus-io/milvus/internal/lognode/server/wal/walimpls"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// builders is a map of registered wal builders.
var builders typeutil.ConcurrentMap[string, wal.OpenerBuilder]

// Register registers the wal builder.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), name of builder is lowercase. If multiple Builder are
// registered with the same name, panic will occur.
func RegisterBuilder(b walimpls.OpenerBuilderImpls) {
bb := adaptor.AdaptImplsToBuilder(b)
_, loaded := builders.GetOrInsert(bb.Name(), bb)
if loaded {
panic("wal builder already registered: " + b.Name())
}
}

// MustGetBuilder returns the wal builder by name.
func MustGetBuilder(name string) wal.OpenerBuilder {
b, ok := builders.Get(name)
if !ok {
panic("wal builder not found: " + name)
}
return b
}
Loading

0 comments on commit b839530

Please sign in to comment.