Skip to content

Commit

Permalink
[bug:#1019]: fix storage node goes dead when gc pause (#1026)
Browse files Browse the repository at this point in the history
* [opt]: upgrade golangci version

* [chore]: change log

* [bug:#1019]: fix storage node goes dead when gc pause
  • Loading branch information
stone1100 committed May 7, 2024
1 parent 2ea8de1 commit bad9228
Show file tree
Hide file tree
Showing 36 changed files with 403 additions and 480 deletions.
6 changes: 3 additions & 3 deletions app/broker/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ func (r *runtime) Run() error {
r.master = newMasterController(masterCfg)

// register broker node info
r.registry = newRegistry(r.repo, constants.LiveNodesPath, r.config.Coordinator.LeaseTTL.Duration())
err = r.registry.Register(r.node)
r.registry = newRegistry(r.repo, constants.GetLiveNodePath(r.node.Indicator()), r.node, r.config.Coordinator.LeaseTTL.Duration())
err = r.registry.Register()
if err != nil {
r.state = server.Failed
return fmt.Errorf("register broker node error:%s", err)
Expand Down Expand Up @@ -306,7 +306,7 @@ func (r *runtime) Stop() {
// close registry, deregister broker node from active list
if r.registry != nil {
r.logger.Info("closing discovery-registry...")
if err := r.registry.Deregister(r.node); err != nil {
if err := r.registry.Deregister(); err != nil {
r.logger.Error("unregister broker node error", logger.Error(err))
}
if err := r.registry.Close(); err != nil {
Expand Down
42 changes: 24 additions & 18 deletions app/broker/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
"time"

"github.com/gin-gonic/gin"
"github.com/lindb/common/pkg/logger"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"

"github.com/lindb/common/pkg/logger"

"github.com/lindb/lindb/config"
"github.com/lindb/lindb/coordinator"
brokerpkg "github.com/lindb/lindb/coordinator/broker"
Expand Down Expand Up @@ -58,7 +57,8 @@ var cfg = config.Broker{
GRPC: config.GRPC{
Port: 2881,
},
}}
},
}

func init() {
gin.SetMode(gin.ReleaseMode)
Expand Down Expand Up @@ -112,8 +112,8 @@ func TestBrokerRuntime_Run(t *testing.T) {
prepare: func() {
repoFct.EXPECT().CreateNormalRepo(gomock.Any()).Return(repo, nil)
registry := discovery.NewMockRegistry(ctrl)
registry.EXPECT().Register(gomock.Any()).Return(fmt.Errorf("err"))
newRegistry = func(repo state.Repository, prefixPath string, ttl time.Duration) discovery.Registry {
registry.EXPECT().Register().Return(fmt.Errorf("err"))
newRegistry = func(repo state.Repository, path string, node models.Node, ttl time.Duration) discovery.Registry {
return registry
}
},
Expand All @@ -124,8 +124,8 @@ func TestBrokerRuntime_Run(t *testing.T) {
prepare: func() {
repoFct.EXPECT().CreateNormalRepo(gomock.Any()).Return(repo, nil)
registry := discovery.NewMockRegistry(ctrl)
registry.EXPECT().Register(gomock.Any()).Return(nil)
newRegistry = func(repo state.Repository, prefixPath string, ttl time.Duration) discovery.Registry {
registry.EXPECT().Register().Return(nil)
newRegistry = func(repo state.Repository, path string, node models.Node, ttl time.Duration) discovery.Registry {
return registry
}
mc := coordinator.NewMockMasterController(ctrl)
Expand All @@ -142,8 +142,8 @@ func TestBrokerRuntime_Run(t *testing.T) {
prepare: func() {
repoFct.EXPECT().CreateNormalRepo(gomock.Any()).Return(repo, nil)
registry := discovery.NewMockRegistry(ctrl)
registry.EXPECT().Register(gomock.Any()).Return(nil)
newRegistry = func(repo state.Repository, prefixPath string, ttl time.Duration) discovery.Registry {
registry.EXPECT().Register().Return(nil)
newRegistry = func(repo state.Repository, path string, node models.Node, ttl time.Duration) discovery.Registry {
return registry
}
mc := coordinator.NewMockMasterController(ctrl)
Expand All @@ -157,7 +157,8 @@ func TestBrokerRuntime_Run(t *testing.T) {
smFct := discovery.NewMockStateMachineFactory(ctrl)
smFct.EXPECT().Start().Return(fmt.Errorf("err"))
newStateMachineFactory = func(ctx context.Context, discoveryFactory discovery.Factory,
stateMgr brokerpkg.StateManager) discovery.StateMachineFactory {
stateMgr brokerpkg.StateManager,
) discovery.StateMachineFactory {
return smFct
}
},
Expand All @@ -168,8 +169,8 @@ func TestBrokerRuntime_Run(t *testing.T) {
prepare: func() {
repoFct.EXPECT().CreateNormalRepo(gomock.Any()).Return(repo, nil)
registry := discovery.NewMockRegistry(ctrl)
registry.EXPECT().Register(gomock.Any()).Return(nil)
newRegistry = func(repo state.Repository, prefixPath string, ttl time.Duration) discovery.Registry {
registry.EXPECT().Register().Return(nil)
newRegistry = func(repo state.Repository, path string, node models.Node, ttl time.Duration) discovery.Registry {
return registry
}
mc := coordinator.NewMockMasterController(ctrl)
Expand All @@ -183,7 +184,8 @@ func TestBrokerRuntime_Run(t *testing.T) {
smFct := discovery.NewMockStateMachineFactory(ctrl)
smFct.EXPECT().Start().Return(nil)
newStateMachineFactory = func(ctx context.Context, discoveryFactory discovery.Factory,
stateMgr brokerpkg.StateManager) discovery.StateMachineFactory {
stateMgr brokerpkg.StateManager,
) discovery.StateMachineFactory {
return smFct
}
httpSrv := httppkg.NewMockServer(ctrl)
Expand Down Expand Up @@ -253,7 +255,7 @@ func TestBrokerRuntime_Stop(t *testing.T) {
connectionMgr := rpc.NewMockConnectionManager(ctrl)
channelMgr := replica.NewMockChannelManager(ctrl)
grpcServer := rpc.NewMockGRPCServer(ctrl)
registry.EXPECT().Deregister(gomock.Any()).Return(fmt.Errorf("err")).AnyTimes()
registry.EXPECT().Deregister().Return(fmt.Errorf("err")).AnyTimes()

cases := []struct {
name string
Expand Down Expand Up @@ -330,6 +332,7 @@ func TestBrokerRuntime_startGrpcServer(t *testing.T) {
serveGRPC(grpcServer)
})
}

func TestBrokerRuntime_RunHTTPServer(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand All @@ -354,23 +357,26 @@ func TestBrokerRuntime_RunHTTPServer(t *testing.T) {
func resetNewDepsMock() {
newStateManager = func(ctx context.Context, currentNode models.StatelessNode,
connectionManager rpc.ConnectionManager,
taskClientFactory rpc.TaskClientFactory) brokerpkg.StateManager {
taskClientFactory rpc.TaskClientFactory,
) brokerpkg.StateManager {
return nil
}
newChannelManager = func(ctx context.Context, fct rpc.ClientStreamFactory,
stateMgr brokerpkg.StateManager) replica.ChannelManager {
stateMgr brokerpkg.StateManager,
) replica.ChannelManager {
return nil
}
newMasterController = func(cfg *coordinator.MasterCfg) coordinator.MasterController {
return nil
}
newRegistry = func(repo state.Repository, prefixPath string, ttl time.Duration) discovery.Registry {
newRegistry = func(repo state.Repository, path string, node models.Node, ttl time.Duration) discovery.Registry {
return nil
}
serveGRPCFn = func(grpc rpc.GRPCServer) {
}
newStateMachineFactory = func(ctx context.Context, discoveryFactory discovery.Factory,
stateMgr brokerpkg.StateManager) discovery.StateMachineFactory {
stateMgr brokerpkg.StateManager,
) discovery.StateMachineFactory {
return nil
}
}
6 changes: 3 additions & 3 deletions app/root/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (r *runtime) Run() error {
return err
}
// register root node info
r.registry = newRegistry(r.repo, constants.LiveNodesPath, r.config.Coordinator.LeaseTTL.Duration())
r.registry = newRegistry(r.repo, constants.GetLiveNodePath(r.node.Indicator()), r.node, r.config.Coordinator.LeaseTTL.Duration())

if err = r.MustRegisterStatelessNode(); err != nil {
r.state = server.Failed
Expand Down Expand Up @@ -197,7 +197,7 @@ func (r *runtime) Run() error {

// MustRegisterStatelessNode make sure root node is registered to etcd.
func (r *runtime) MustRegisterStatelessNode() error {
if err := r.registry.Register(r.node); err != nil {
if err := r.registry.Register(); err != nil {
return fmt.Errorf("register root node error:%s", err)
}
// sometimes lease isn't expired when storage restarts, retry registering is necessary
Expand Down Expand Up @@ -235,7 +235,7 @@ func (r *runtime) Stop() {
// close registry, deregister root node from active list
if r.registry != nil {
r.logger.Info("closing discovery-registry...")
if err := r.registry.Deregister(r.node); err != nil {
if err := r.registry.Deregister(); err != nil {
r.logger.Error("unregister root node error", logger.Error(err))
}
if err := r.registry.Close(); err != nil {
Expand Down
28 changes: 14 additions & 14 deletions app/root/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ import (
"time"

"github.com/gin-gonic/gin"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"

"github.com/lindb/common/pkg/logger"
"github.com/lindb/common/pkg/ltoml"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"

"github.com/lindb/lindb/config"
"github.com/lindb/lindb/coordinator/discovery"
"github.com/lindb/lindb/coordinator/root"
"github.com/lindb/lindb/internal/linmetric"
"github.com/lindb/lindb/internal/server"
"github.com/lindb/lindb/models"
"github.com/lindb/lindb/pkg/hostutil"
httppkg "github.com/lindb/lindb/pkg/http"
"github.com/lindb/lindb/pkg/state"
Expand All @@ -57,12 +57,12 @@ func TestRootRun(t *testing.T) {
ctrl.Finish()
}()
registry := discovery.NewMockRegistry(ctrl)
newRegistry = func(_ state.Repository, _ string, _ time.Duration) discovery.Registry {
newRegistry = func(_ state.Repository, _ string, _ models.Node, _ time.Duration) discovery.Registry {
return registry
}
registry.EXPECT().Register(gomock.Any()).Return(nil)
registry.EXPECT().Register().Return(nil)
registry.EXPECT().IsSuccess().Return(true)
registry.EXPECT().Deregister(gomock.Any()).Return(fmt.Errorf("err"))
registry.EXPECT().Deregister().Return(fmt.Errorf("err"))
registry.EXPECT().Close().Return(fmt.Errorf("err"))
repoFct := state.NewMockRepositoryFactory(ctrl)
newRepositoryFactory = func(_ string) state.RepositoryFactory {
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestRootRun_Err(t *testing.T) {
}()
registry := discovery.NewMockRegistry(ctrl)
registry.EXPECT().IsSuccess().Return(true)
newRegistry = func(_ state.Repository, _ string, _ time.Duration) discovery.Registry {
newRegistry = func(_ state.Repository, _ string, _ models.Node, _ time.Duration) discovery.Registry {
return registry
}
cfg.HTTP.Port = 3991
Expand Down Expand Up @@ -141,19 +141,19 @@ func TestRootRun_Err(t *testing.T) {
})
t.Run("register node fail", func(t *testing.T) {
repoFct.EXPECT().CreateRootRepo(gomock.Any()).Return(nil, nil)
registry.EXPECT().Register(gomock.Any()).Return(fmt.Errorf("err"))
registry.EXPECT().Register().Return(fmt.Errorf("err"))
r := NewRootRuntime("test-version", &cfg)
err := r.Run()
assert.Error(t, err)
})
t.Run("start state machine fail", func(t *testing.T) {
repoFct.EXPECT().CreateRootRepo(gomock.Any()).Return(nil, nil)
registry.EXPECT().Register(gomock.Any()).Return(nil)
registry.EXPECT().Register().Return(nil)
stateMachineFct.EXPECT().Start().Return(fmt.Errorf("err"))
r := NewRootRuntime("test-version", &cfg)
err := r.Run()
assert.Error(t, err)
registry.EXPECT().Deregister(gomock.Any()).Return(nil)
registry.EXPECT().Deregister().Return(nil)
registry.EXPECT().Close().Return(nil)
r.Stop()
})
Expand Down Expand Up @@ -237,22 +237,22 @@ func TestRuntime_MustRegisterNode(t *testing.T) {
ctx: ctx,
registry: register,
}
register.EXPECT().Register(gomock.Any()).Return(fmt.Errorf("err"))
register.EXPECT().Register().Return(fmt.Errorf("err"))
err := r.MustRegisterStatelessNode()
assert.Error(t, err)

register.EXPECT().Register(gomock.Any()).Return(nil)
register.EXPECT().Register().Return(nil)
register.EXPECT().IsSuccess().Return(true)
err = r.MustRegisterStatelessNode()
assert.NoError(t, err)

register.EXPECT().Register(gomock.Any()).Return(nil)
register.EXPECT().Register().Return(nil)
register.EXPECT().IsSuccess().Return(false).MaxTimes(2)
err = r.MustRegisterStatelessNode()
assert.Error(t, err)

cancel()
register.EXPECT().Register(gomock.Any()).Return(nil)
register.EXPECT().Register().Return(nil)
err = r.MustRegisterStatelessNode()
assert.NoError(t, err)
}
7 changes: 3 additions & 4 deletions app/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ var (
// BaseRuntime represents the common logic of runtime.
type BaseRuntime struct {
ctx context.Context
monitor config.Monitor
registry *linmetric.Registry
pusher monitoring.NativePusher
logger logger.Logger
registry *linmetric.Registry
monitor config.Monitor
globalKeyValues tag.Tags

logger logger.Logger
}

// NewBaseRuntime creates a base runtime instance.
Expand Down
Loading

0 comments on commit bad9228

Please sign in to comment.