Skip to content

Commit

Permalink
fix keep remote lock
Browse files Browse the repository at this point in the history
  • Loading branch information
iamlinjunhong committed Apr 3, 2024
1 parent 1519f8e commit f8c9406
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
"the connection between CN and TN has been disconnected"},
ErrStreamClosed: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "stream closed"},
ErrNoAvailableBackend: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "no available backend"},
ErrBackendCannotConnect: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can not connect to remote backend"},
ErrBackendCannotConnect: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can not connect to remote backend, %v"},

// Group 6: txn
ErrTxnClosed: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "the transaction %s has been committed or aborted"},
Expand Down
7 changes: 5 additions & 2 deletions pkg/common/moerr/error_no_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,11 @@ func NewNoAvailableBackendNoCtx() *Error {
return newError(Context(), ErrNoAvailableBackend)
}

func NewBackendCannotConnectNoCtx() *Error {
return newError(Context(), ErrBackendCannotConnect)
func NewBackendCannotConnectNoCtx(args ...any) *Error {
if len(args) == 0 {
return newError(Context(), ErrBackendCannotConnect, "none")
}
return newError(Context(), ErrBackendCannotConnect, args...)
}

func NewTxnClosedNoCtx(txnID []byte) *Error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/morpc/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func (rb *remoteBackend) resetConn() error {
zap.Error(err))

if !canRetry {
return err
return moerr.NewBackendCannotConnectNoCtx(err)
}
duration := time.Duration(0)
for {
Expand Down
36 changes: 32 additions & 4 deletions pkg/lockservice/lock_table_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type lockTableAllocator struct {
keepBindTimeout time.Duration
address string
server Server
client Client

mu struct {
sync.RWMutex
Expand All @@ -50,6 +51,11 @@ func NewLockTableAllocator(
panic("invalid lock table bind timeout")
}

rpcClient, err := NewClient(cfg)
if err != nil {
panic(err)
}

logger := runtime.ProcessLevelRuntime().Logger()
tag := "lockservice.allocator"
la := &lockTableAllocator{
Expand All @@ -58,6 +64,7 @@ func NewLockTableAllocator(
stopper: stopper.NewStopper(tag,
stopper.WithLogger(logger.RawLogger().Named(tag))),
keepBindTimeout: keepBindTimeout,
client: rpcClient,
}
la.mu.lockTables = make(map[uint64]pb.LockTable, 10240)
la.mu.services = make(map[string]*serviceBinds)
Expand Down Expand Up @@ -124,9 +131,19 @@ func (l *lockTableAllocator) Valid(binds []pb.LockTable) []uint64 {

func (l *lockTableAllocator) Close() error {
l.stopper.Stop()
err := l.server.Close()
l.logger.Debug("lock service allocator closed",
var err error
err1 := l.server.Close()
l.logger.Debug("lock service allocator server closed",
zap.Error(err))
if err1 != nil {
err = err1
}
err2 := l.client.Close()
l.logger.Debug("lock service allocator client closed",
zap.Error(err))
if err2 != nil {
err = err2
}
return err
}

Expand Down Expand Up @@ -270,8 +287,19 @@ func (l *lockTableAllocator) checkInvalidBinds(ctx context.Context) {
zap.Int("count", len(timeoutBinds)))
}
for _, b := range timeoutBinds {
b.disable()
l.disableTableBinds(b)
succ := false
for i := 0; i < 3; i++ {
err := l.client.Ping(ctx, b.serviceID)
if err == nil {
succ = true
break
}
logPingFailed(b.serviceID, err)
}
if !succ {
b.disable()
l.disableTableBinds(b)
}
}
timer.Reset(l.keepBindTimeout)
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/lockservice/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,17 @@ func logKeepRemoteLocksFailed(
}
}

func logPingFailed(
serviceID string,
err error) {
logger := getWithSkipLogger()
if logger.Enabled(zap.ErrorLevel) {
logger.Error("failed to ping lock service",
zap.String("serviceID", serviceID),
zap.Error(err))
}
}

func logLocalBindsInvalid() {
logger := getWithSkipLogger()
logger.Error("all local lock table invalid")
Expand Down
25 changes: 25 additions & 0 deletions pkg/lockservice/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,31 @@ func (c *client) Close() error {
return c.client.Close()
}

func (c *client) Ping(ctx context.Context, serviceID string) error {
var address string
for i := 0; i < 2; i++ {
sid := getUUIDFromServiceIdentifier(serviceID)
c.cluster.GetCNServiceWithoutWorkingState(
clusterservice.NewServiceIDSelector(sid),
func(s metadata.CNService) bool {
address = s.LockServiceAddress
return false
})
if address != "" {
break
}
if i == 0 {
c.cluster.ForceRefresh(true)
}
}
if address == "" {
getLogger().Error("cannot ping lockservice address",
zap.String("serviceID", serviceID))

}
return c.client.Ping(ctx, address)
}

// WithServerMessageFilter set filter func. Requests can be modified or filtered out by the filter
// before they are processed by the handler.
func WithServerMessageFilter(filter func(*pb.Request) bool) ServerOption {
Expand Down
72 changes: 72 additions & 0 deletions pkg/lockservice/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,34 @@ func TestRPCSend(t *testing.T) {
)
}

func TestRPCSendErrBackendCannotConnect(t *testing.T) {
runRPCServerNoCloseTests(
t,
func(c Client, s Server) {
s.RegisterMethodHandler(
lock.Method_Lock,
func(
ctx context.Context,
cancel context.CancelFunc,
req *lock.Request,
resp *lock.Response,
cs morpc.ClientSession) {
writeResponse(ctx, cancel, resp, nil, cs)
})

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
err := s.Close()
require.NoError(t, err)
_, err = c.Send(ctx,
&lock.Request{
LockTable: lock.LockTable{ServiceID: "s1"},
Method: lock.Method_Lock})
require.True(t, moerr.IsMoErrCode(err, moerr.ErrBackendCannotConnect))
},
)
}

func TestRPCSendWithNotSupport(t *testing.T) {
runRPCTests(
t,
Expand Down Expand Up @@ -204,3 +232,47 @@ func runRPCTests(

fn(c, s)
}

func runRPCServerNoCloseTests(
t *testing.T,
fn func(Client, Server),
opts ...ServerOption) {
defer leaktest.AfterTest(t)()
testSockets := fmt.Sprintf("unix:///tmp/%d.sock", time.Now().Nanosecond())
assert.NoError(t, os.RemoveAll(testSockets[7:]))

runtime.SetupProcessLevelRuntime(runtime.DefaultRuntime())
cluster := clusterservice.NewMOCluster(
nil,
0,
clusterservice.WithDisableRefresh(),
clusterservice.WithServices(
[]metadata.CNService{
{
ServiceID: "s1",
LockServiceAddress: testSockets,
},
{
ServiceID: "s2",
LockServiceAddress: testSockets,
},
},
[]metadata.TNService{
{
LockServiceAddress: testSockets,
},
}))
runtime.ProcessLevelRuntime().SetGlobalVariables(runtime.ClusterService, cluster)

s, err := NewServer(testSockets, morpc.Config{}, opts...)
require.NoError(t, err)
require.NoError(t, s.Start())

c, err := NewClient(morpc.Config{})
require.NoError(t, err)
defer func() {
assert.NoError(t, c.Close())
}()

fn(c, s)
}
40 changes: 40 additions & 0 deletions pkg/lockservice/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,46 @@ func TestDeadLockWith2Txn(t *testing.T) {
}
}

func TestLockSuccWithKeepBindTimeout(t *testing.T) {
runLockServiceTestsWithLevel(
t,
zapcore.DebugLevel,
[]string{"s1"},
time.Second*1,
func(alloc *lockTableAllocator, s []*service) {
l := s[0]

ctx, cancel := context.WithTimeout(
context.Background(),
time.Second*10)
defer cancel()
option := pb.LockOptions{
Granularity: pb.Granularity_Row,
Mode: pb.LockMode_Exclusive,
Policy: pb.WaitPolicy_Wait,
}

_, err := l.Lock(
ctx,
0,
[][]byte{{1}},
[]byte("txn1"),
option)
require.NoError(t, err)

err = l.remote.keeper.Close()
require.NoError(t, err)

time.Sleep(time.Second * 3)

p := alloc.GetLatest(0)
require.True(t, p.Valid)
},
nil,
)

}

func TestLockResultWithNoConflict(t *testing.T) {
runLockServiceTests(
t,
Expand Down
2 changes: 1 addition & 1 deletion pkg/lockservice/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func RunLockServicesForTest(
defaultLazyCheckDuration.Store(time.Millisecond * 50)
testSockets := fmt.Sprintf("unix:///tmp/%d.sock", time.Now().Nanosecond())
runtime.SetupProcessLevelRuntime(runtime.DefaultRuntimeWithLevel(level))
allocator := NewLockTableAllocator(testSockets, lockTableBindTimeout, morpc.Config{})
services := make([]LockService, 0, len(serviceIDs))

cns := make([]metadata.CNService, 0, len(serviceIDs))
Expand Down Expand Up @@ -74,6 +73,7 @@ func RunLockServicesForTest(
services = append(services,
NewLockService(cfg).(*service))
}
allocator := NewLockTableAllocator(testSockets, lockTableBindTimeout, morpc.Config{})
fn(allocator.(*lockTableAllocator), services)

for _, s := range services {
Expand Down
2 changes: 2 additions & 0 deletions pkg/lockservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ type Client interface {
Send(context.Context, *pb.Request) (*pb.Response, error)
// AsyncSend async send request to other lock service.
AsyncSend(context.Context, *pb.Request) (*morpc.Future, error)
// Ping ping lock service from lock allocator.
Ping(ctx context.Context, backend string) error
// Close close the client
Close() error
}
Expand Down

0 comments on commit f8c9406

Please sign in to comment.