Skip to content

Commit

Permalink
Async machine: Prevent dangling message handlers (#3418)
Browse files Browse the repository at this point in the history
The asynchronous state machine accepts a `ctx` parameter that is used to
control both message retransmission and message handling routines. The
state machine's `Execute` method sets up the message handling process by
registering a buffered message handler in the broadcast channel and
triggering a message processing loop that takes messages from that
handler and passes them to specific states.

It may happen that the state machine completes before their `ctx` is
done. In that case, the message handler remains registered in the
channel until the `ctx` is canceled but, at the same time, the machine's
message processing loop is stopped and no longer drains the message
handler's buffer. Such a situation causes an overflow of the message
handler's buffer as incoming messages are still passed to the message
handler. Once the buffer is full, the broadcast channel's routine that
manages the handler's lifecycle and feeds it with incoming messages gets
blocked as it waits for a buffer slot. However, as mentioned above, the
machine's processing loop that normally drains the handler's buffer does
not work due to the machine exit so the buffer remains full forever as
well as the channel's managing routine. This way, the given handler is
never unregistered from the channel, even if their `ctx` is done. This
causes a resource leak that manifests with thousands of `message handler
is too slow; dropping message;` warnings.

Here we aim to fix that by introducing a separate child context
(`recvCtx`) that is used to control the message handler lifetime and is
automatically canceled upon `Execute` method exit. This way, the message
processing loop, and the message handler are torn down simultaneously.
  • Loading branch information
pdyraga committed Nov 25, 2022
2 parents 0b9317e + ab361a0 commit 3a446af
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion pkg/protocol/state/async_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,14 @@ func NewAsyncMachine(
// Execute state machine starting with initial state up to finalization. It
// requires the broadcast channel to be pre-initialized.
func (am *AsyncMachine) Execute() (AsyncState, error) {
recvCtx, cancelRecvCtx := context.WithCancel(am.ctx)
defer cancelRecvCtx()

recvChan := make(chan net.Message, asyncReceiveBuffer)
handler := func(msg net.Message) {
recvChan <- msg
}
am.channel.Recv(am.ctx, handler)
am.channel.Recv(recvCtx, handler)

currentState := am.initialState

Expand Down

0 comments on commit 3a446af

Please sign in to comment.