Skip to content

Commit

Permalink
p2p(ticdc): remove message peer when capture is removed (#8923)
Browse files Browse the repository at this point in the history
close #8922
  • Loading branch information
sdojjy committed May 10, 2023
1 parent 5bc19cd commit 603cecc
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
9 changes: 9 additions & 0 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,15 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
})
globalState.SetOnCaptureRemoved(func(captureID model.CaptureID) {
c.MessageRouter.RemovePeer(captureID)
// If an owner is killed by "kill -19", other CDC nodes will remove that capture,
// but the peer in the message server will not be removed, so the message server still sends
// ack message to that peer, until the write buffer is full. So we need to deregister the peer
// when the capture is removed.
if err := c.MessageServer.ScheduleDeregisterPeerTask(ctx, captureID); err != nil {
log.Warn("deregister peer failed",
zap.String("captureID", captureID),
zap.Error(err))
}
})

err = c.runEtcdWorker(ownerCtx, owner,
Expand Down
22 changes: 22 additions & 0 deletions pkg/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ type taskOnRegisterPeer struct {
clientAddr string // for logging
}

type taskOnDeregisterPeer struct {
peerID string
}

type taskOnRegisterHandler struct {
topic string
handler workerpool.EventHandle
Expand Down Expand Up @@ -296,6 +300,9 @@ func (m *MessageServer) run(ctx context.Context) error {
}
return errors.Trace(err)
}
case taskOnDeregisterPeer:
log.Info("taskOnDeregisterPeer", zap.String("peerID", task.peerID))
m.deregisterPeerByID(ctx, task.peerID)
case taskDebugDelay:
log.Info("taskDebugDelay started")
select {
Expand Down Expand Up @@ -365,6 +372,21 @@ func (m *MessageServer) deregisterPeer(ctx context.Context, peer *cdcPeer, err e
}
}

func (m *MessageServer) deregisterPeerByID(ctx context.Context, peerID string) {
m.peerLock.Lock()
peer, ok := m.peers[peerID]
m.peerLock.Unlock()
if !ok {
log.Warn("peer not found", zap.String("peerID", peerID))
}
m.deregisterPeer(ctx, peer, nil)
}

// ScheduleDeregisterPeerTask schedules a task to deregister a peer.
func (m *MessageServer) ScheduleDeregisterPeerTask(ctx context.Context, peerID string) error {
return m.scheduleTask(ctx, taskOnDeregisterPeer{peerID: peerID})
}

// We use an empty interface to hold the information on the type of the object
// that we want to deserialize a message to.
// We pass an object of the desired type, and use `reflect.TypeOf` to extract the type,
Expand Down
53 changes: 53 additions & 0 deletions pkg/p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,3 +912,56 @@ func TestServerDataLossAfterUnregisterHandle(t *testing.T) {
cancel()
wg.Wait()
}

func TestServerDeregisterPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), defaultTimeout)
defer cancel()

server, newClient, closer := newServerForTesting(t, "test-server-2")
defer closer()

// Avoids server returning error due to congested topic.
server.config.MaxPendingMessageCountPerTopic = math.MaxInt64

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := server.Run(ctx)
require.Regexp(t, ".*context canceled.*", err)
}()

var sendGroup sync.WaitGroup
sendGroup.Add(1)
client, closeClient := newClient()
go func() {
defer sendGroup.Done()
stream, err := client.SendMessage(ctx)
require.NoError(t, err)

err = stream.Send(&p2p.MessagePacket{
Meta: &p2p.StreamMeta{
SenderId: "test-client-1",
ReceiverId: "test-server-2",
Epoch: 0,
},
})
require.NoError(t, err)
}()
sendGroup.Wait()
time.Sleep(1 * time.Second)

server.peerLock.Lock()
require.Equal(t, 1, len(server.peers))
server.peerLock.Unlock()
require.Nil(t, server.ScheduleDeregisterPeerTask(ctx, "test-client-1"))
time.Sleep(1 * time.Second)
server.peerLock.Lock()
require.Equal(t, 0, len(server.peers))
server.peerLock.Unlock()

closeClient()
closer()
cancel()
wg.Wait()
}

0 comments on commit 603cecc

Please sign in to comment.