Skip to content

Commit

Permalink
[BACKPORT 2.14] Track and automatically recover from stuck outbound c…
Browse files Browse the repository at this point in the history
…alls

Summary:
Track all outbound calls submitted to a Reactor in a multi-index container in that reactor. Remove them from the container as soon as we learn the callback has been called, using a multiple-producer single-consumer (MPSC) queue where the consumer is the reactor thread. Report stuck calls with detailed debug information. A call is considered stuck if it is a certain time past its expiration (specified by reactor_based_outbound_call_expiration_delay_ms -- set it to 0 to disable this feature). If the call does not have a timeout, the default value stuck_outbound_call_default_timeout_sec is used for reporting only, but not for timeout enforcement.

OutboundCall now keeps track of its expiration deadline, and an additional field active_call_state_, which indicates whether the call was added to the connection's active_calls_ in the corresponding connection, or removed from it, and the removal reason.

If in the OutboundCall destructor the state of the call is not final, or the callback has not been called, we will log that even in release build.

In OutboundCall::SetState, if the current state is already finished, treat this the same as any other invalid state transition. Logically, there should never be a race between e.g. processing a call response and a call timeout, because both events happen on the reactor thread.

Fixing error handling in DoQueueOutboundData. If there is an error sending the call, we destroy the connection.

Introduce a typedef CallHandle and a special value kUnknownCallHandle instead of just using the max value of size_t in case the call handle is unknown or not being used. Change the error handling logic in Connection::DoQueueOutboundData to avoid returning kUnknownCallHandle in case of errors, and make sure the callback is called on the call in case of those errors. The connection was already being shut down.

Update the YB_STRUCT_TO_STRING macro implementation to always add parentheses around an explicitly specified field value.

We introduce multiple ways to simulate stuck outbound calls for testing.

- TEST_simulated_sent_stuck_call_probability specifies the probability of pretending a call is sent to the remote server in -Connection::QueueOutboundCall, but instead just transitioning it to a SENT state. This is similar to the situation that we have observed.
- TEST_simulated_failure_to_send_call_probability specifies the probability of a network error in stream_->Send() called from Connection::DoQueueOutboundData. This will cause the connection to be closed. Prior to this revision, this kind of an error would cause Connection::QueueOutboundCall to attempt to schedule a timer in a connection that has already been shut down.
- TEST_outbound_call_skip_callback_probability specifies the probability of skipping calling the callback on a particular remote RPC call. We don't do this for local calls.

Also replacing a DFATAL with an WARNING log in InboundCall::QueueResponse to avoid crashing tests during shutdown. Failures to queue a response on the server side should just result in timeouts on the client side in the worst case.

We are combining multiple original revisions from the master branch to avoid introducing and then fixing a memory leak bug, and to keep the resulting patch cleaner.

This revision was backported from the 2.18 backport D29215 ( d028fe2 ).

Original revisions in master:

- D27735 ( 1fbff49 ) [yugabyte#18685] Track stuck OutboundCalls and enforce timeouts at Reactor level
- D27919 ( ddb817b ) [yugabyte#18808] A write-once lock-free weak pointer class: it is frequently necessary to have a weak pointer equivalent that can only be written to once, but can be read without locking for performance.
- D28085 ( a5ba08f ) [yugabyte#18855] Add functions for formatting a CoarseTimePoint relative to the current time. Useful for logging deadlines.
- D28138 ( d0a44db ) [yugabyte#18877] Log invalid OutboundCall state transitions and other OutboundCall improvements. OutboundCall will track invalid state transitions and report them in the DebugString and in the destructor, in addition to reporting them as they happen. Do not allow OutboundCall callback to be invoked more than once and log occurrences of that. Use a compare-and-swap loop in RpcCall::Transferred and log invalid transfer_state transitions. invoke_callback_time_ and sent_time_ in OutboundCall were being accessed without synchronization, so making them atomic. OutboundCall::DebugString should not queue a task in the reactor to print the connection details in relation to the call anymore. That is now done by a separate method, QueueDumpConnectionState, called explicitly.
- D28531 ( 0648341 ) [yugabyte#19090] Fix a memory leak in tracked outbound calls in Reactor. When an OutboundCall callback is invoked, notify the reactor through a multi-producer single-consumer queue that is accessed from the OutboundCall via a weak pointer. Also switching OutboundCall allocation away from using make_shared to prevent long-lived weak OutboundCall pointers from consuming memory.

Also, parts of D23468 ( 4879cfb ) "[yugabyte#16380] Use thread safety analysis annotations to enforce the reactor thread requirement" are included. That commit is present in 2.18 but not in 2.16.

Test Plan:
Jenkins

---

Manual testing details below.

First test mode
===============

```
bin/yb-ctl wipe_restart  --tserver_flags="TEST_simulated_sent_stuck_call_probability=0.0001,reactor_based_outbound_call_expiration_delay_ms=1000" --rf=3

java -jar yb-sample-apps.jar  --workload CassandraBatchKeyValue --nodes 127.0.0.1:9042,127.0.0.2:9042,127.0.0.3:9042 --num_threads_read 16 --num_threads_write 16 --num_reads 1000000000000 --num_writes 1000000000000
```

Look for output like this in the logs:

```
W1024 21:29:58.941679 1731793 connection.cc:307] Connection (0x0000564dbd812298) client 127.0.0.1:39689 => 127.0.0.2:9100: Simulating a call stuck in SENT state: RPC call 0x0000564dbbf3e580: yb.consensus.ConsensusService.UpdateConsensus -> { remote: 127.0.0.2:9100 idx: 6 protocol: 0x00007fd157f9cd70 -> tcpc }, id: 85493, state: SENT, transfer_state: PENDING, start_time: 455775.550s (0.001s ago), sent_time: -inf, trigger_callback_time: -inf, invoke_callback_time: -inf, expiration_time: +inf, now: 455775.551s, connection: Connection (0x0000564dbd812298) client 127.0.0.1:39689 => 127.0.0.2:9100, active_call_state: kNotAdded
...
W1024 21:30:03.017571 1731793 reactor.cc:1142] TabletServer_R002: Stuck OutboundCall: RPC call 0x0000564dbbf3e580: yb.consensus.ConsensusService.UpdateConsensus -> { remote: 127.0.0.2:9100 idx: 6 protocol: 0x00007fd157f9cd70 -> tcpc }, id: 85493, state: SENT, transfer_state: PENDING, start_time: 455775.550s (4.077s ago), sent_time: -inf, trigger_callback_time: -inf, invoke_callback_time: -inf, expiration_time: 455778.551s (1.076s ago), now: 455779.628s, connection: Connection (0x0000564dbd812298) client 127.0.0.1:39689 => 127.0.0.2:9100, active_call_state: kNotAdded (forcing a timeout)
W1024 21:30:03.017947 1731995 consensus_peers.cc:611] T 9735d87712974ea9b848cc606db8d664 P 9dcaaf7955ea4ca78667b1fa92c89192 -> Peer b6dbb1d832e14b0080d2143ac43fbaae ([host: "127.0.0.2" port: 9100], []): Couldn't send request.  Status: Timed out (yb/rpc/outbound_call.cc:612): UpdateConsensus RPC (request call id 85493) to 127.0.0.2:9100 timed out after 3.000s. Retrying in the next heartbeat period. Already tried 1 times. State: 2
```

Ensure that all stuck outbound calls are eventually detected. After stopping the workload, the output of the command below should gradually decrease to zero after a couple of minutes:

```
log_path="$HOME/yugabyte-data/node-1/disk-1/yb-data/tserver/logs/yb-tserver.INFO"; grep "Simulating a call stuck in SENT state" "$log_path" | egrep -Eo 'id: [0-9]+' | awk '{print $NF}' | sort -n | while read c; do if ! grep "id: $c," "$log_path" | grep -q "Stuck OutboundCall"; then echo "Undetected stuck OutboundCall: id=$c"; fi; done | wc -l
```

Also, these stuck calls should stop appearing in the log because they would be forcibly timed out. If they were not, they would be logged every minute.

Second test mode
================

```
bin/yb-ctl wipe_restart  --tserver_flags="TEST_simulated_failure_to_send_call_probability=0.00001,reactor_based_outbound_call_expiration_delay_ms=1000" --rf=3
```

Same workload as above.

Look for "Simulated failure to send outbound data" in the tablet server log:

```
W1024 22:33:22.772235 1739663 connection.cc:401] Simulated network failure: Network error (yb/rpc/connection.cc:400): Simulated failure to send outbound data for 0x000055999706f040 -> CQL Call from 127.0.0.1:50690, stream id: 9792
...
W1024 22:33:22.772346 1739663 inbound_call.cc:104] 0x000055999706f040 -> CQL Call from 127.0.0.1:50690, stream id: 9792: Connection torn down before CQL Call from 127.0.0.1:50690, stream id: 9792 could send its response: Network error (yb/rpc/connection.cc:400): Simulated failure to send outbound data for 0x000055999706f040 -> CQL Call from 127.0.0.1:50690, stream id: 9792
```

There should be no stuck outbound calls in the log, because these simulated failures will just result in the connection being closed. Depending on the value TEST_simulated_failure_to_send_call_probability, the workload may succeed or fail. With the value 0.00001, it manages to make progress in my experience.

Third test mode
===============

```
bin/yb-ctl wipe_restart  --tserver_flags="TEST_outbound_call_skip_callback_probability=0.0001,reactor_based_outbound_call_expiration_delay_ms=1000" --rf=3
```

Same workload as above.

Look for "Skipping OutboundCall callback as a test" and "Stuck OutboundCall" in the log.

Output:
```
W1024 22:45:47.389113 1743081 outbound_call.cc:419] OutboundCall@0x0000562012975600: Skipping OutboundCall callback as a test: RPC call 0x0000562012975600: yb.consensus.ConsensusService.UpdateConsensus -> { remote: 127.0.0.2:9100 idx: 1 protocol: 0x00007fbff47fed70 -> tcpc }, id: 3815, state: FINISHED_SUCCESS, transfer_state: FINISHED, start_time: 460323.999s (0.001s ago), sent_time: 460323.999s (0.001s ago), trigger_callback_time: 460323.999s (0.001s ago), invoke_callback_time: -inf, expiration_time: 460326.999s (2.998s from now), now: 460324.000s, connection: Connection (0x00005620102b0798) client 127.0.0.1:55971 => 127.0.0.2:9100, active_call_state: kErasedOnResponse
. . .
W1024 22:45:51.463742 1742860 reactor.cc:1142] TabletServer_R001: Stuck OutboundCall: RPC call 0x0000562012975600: yb.consensus.ConsensusService.UpdateConsensus -> { remote: 127.0.0.2:9100 idx: 1 protocol: 0x00007fbff47fed70 -> tcpc }, id: 3815, state: FINISHED_SUCCESS, transfer_state: FINISHED, start_time: 460323.999s (4.075s ago), sent_time: 460323.999s (4.075s ago), trigger_callback_time: 460323.999s (4.075s ago), invoke_callback_time: -inf, expiration_time: 460326.999s (1.075s ago), now: 460328.074s, connection: Connection (0x00005620102b0798) client 127.0.0.1:55971 => 127.0.0.2:9100, active_call_state: kErasedOnResponse
. . .
W1024 22:46:21.564029 1742860 reactor.cc:1142] TabletServer_R001: Stuck OutboundCall: RPC call 0x0000562012975600: yb.consensus.ConsensusService.UpdateConsensus -> { remote: 127.0.0.2:9100 idx: 1 protocol: 0x00007fbff47fed70 -> tcpc }, id: 3815, state: FINISHED_SUCCESS, transfer_state: FINISHED, start_time: 460323.999s (34.175s ago), sent_time: 460323.999s (34.175s ago), trigger_callback_time: 460323.999s (34.175s ago), invoke_callback_time: -inf, expiration_time: 460326.999s (31.175s ago), now: 460358.174s, connection: Connection (0x00005620102b0798) client 127.0.0.1:55971 => 127.0.0.2:9100, active_call_state: kErasedOnResponse
```

Similarly to the first test, wait for all stuck outbound calls to get reported:
```
log_path="$HOME/yugabyte-data/node-1/disk-1/yb-data/tserver/logs/yb-tserver.INFO"; grep "Skipping OutboundCall callback as a test" "$log_path" | egrep -Eo 'id: [0-9]+' | awk '{print $NF}' | sort -n | while read c; do if ! grep "id: $c," "$log_path" | grep -q "Stuck OutboundCall"; then echo "Undetected stuck OutboundCall: id=$c"; fi; done | wc -l
```

In this mode, because we are skipping callbacks but allowing state transitions to the finished state, forced expiration at reactor level does not work.

Reviewers: sergei, bogdan

Subscribers: bogdan

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D29679
  • Loading branch information
mbautin committed Oct 25, 2023
1 parent 531310d commit aa262ff
Show file tree
Hide file tree
Showing 52 changed files with 1,938 additions and 814 deletions.
77 changes: 37 additions & 40 deletions requirements_frozen.txt
@@ -1,47 +1,44 @@
attrs==21.4.0
Deprecated==1.2.14
PyGithub==2.1.1
PyJWT==2.8.0
PyNaCl==1.5.0
autorepr==0.3.0
bashlex==0.16
bashlex==0.18
boto==2.49.0
botocore==1.4.59
certifi==2021.5.30
cffi==1.14.6
charset-normalizer==2.0.6
click==8.0.1
codecheck==1.1.3
botocore==1.31.70
certifi==2023.7.22
cffi==1.16.0
charset-normalizer==3.3.1
click==8.1.7
codecheck==1.3.1
compiledb==0.10.1
compiler-identification==1.0.3
Deprecated==1.2.13
distro==1.6.0
downloadutil==1.0.2
idna==3.2
importlib-metadata==4.8.3
iniconfig==1.1.1
mypy==0.910
mypy-extensions==0.4.3
overrides==6.1.0
packaging==21.0
pluggy==1.0.0
psutil==5.8.0
py==1.11.0
pycodestyle==2.7.0
pycparser==2.20
PyGithub==1.55
PyJWT==2.1.0
PyNaCl==1.4.0
pyparsing==2.4.7
pytest==6.2.5
requests==2.26.0
ruamel.yaml==0.17.16
ruamel.yaml.clib==0.2.6
semantic-version==2.8.5
cryptography==41.0.5
distro==1.8.0
downloadutil==1.0.4
exceptiongroup==1.1.3
idna==3.4
iniconfig==2.0.0
jmespath==1.0.1
mypy-extensions==1.0.0
mypy==1.6.1
overrides==7.4.0
packaging==23.2
pluggy==1.3.0
psutil==5.9.6
pycodestyle==2.11.1
pycparser==2.21
pytest==7.4.3
python-dateutil==2.8.2
requests==2.31.0
ruamel.yaml.clib==0.2.8
ruamel.yaml==0.18.2
semantic-version==2.10.0
shutilwhich==1.1.0
six==1.16.0
sys-detection==1.2.0
toml==0.10.2
typed-ast==1.5.1
typing-extensions==3.10.0.2
typing-utils==0.1.0
urllib3==1.26.7
wrapt==1.12.1
sys-detection==1.3.0
tomli==2.0.1
typing_extensions==4.8.0
urllib3==1.26.18
wrapt==1.15.0
yugabyte-pycommon==1.9.15
zipp==3.6.0
3 changes: 2 additions & 1 deletion src/yb/docdb/deadline_info.cc
Expand Up @@ -45,8 +45,9 @@ bool DeadlineInfo::CheckAndSetDeadlinePassed() {
}

std::string DeadlineInfo::ToString() const {
auto now = CoarseMonoClock::now();
return Format("{ now: $0 deadline: $1 counter: $2 }",
CoarseMonoClock::now(), deadline_, counter_);
now, ToStringRelativeToNow(deadline_, now), counter_);
}

void SimulateTimeoutIfTesting(CoarseTimePoint* deadline) {
Expand Down
15 changes: 8 additions & 7 deletions src/yb/master/async_rpc_tasks.cc
Expand Up @@ -388,17 +388,18 @@ bool RetryingTSRpcTask::RescheduleWithBackoffDelay() {
LOG_WITH_PREFIX(WARNING) << "Unable to mark this task as MonitoredTaskState::kScheduling";
return false;
}
auto task_id = master_->messenger()->ScheduleOnReactor(
auto task_id_result = master_->messenger()->ScheduleOnReactor(
std::bind(&RetryingTSRpcTask::RunDelayedTask, shared_from(this), _1),
MonoDelta::FromMilliseconds(delay_millis), SOURCE_LOCATION(), master_->messenger());
VLOG_WITH_PREFIX_AND_FUNC(4) << "Task id: " << task_id;
reactor_task_id_.store(task_id, std::memory_order_release);

if (task_id == rpc::kInvalidTaskId) {
AbortTask(STATUS(Aborted, "Messenger closing"));
MonoDelta::FromMilliseconds(delay_millis), SOURCE_LOCATION());
if (!task_id_result.ok()) {
AbortTask(task_id_result.status());
UnregisterAsyncTask();
return false;
}
auto task_id = *task_id_result;

VLOG_WITH_PREFIX_AND_FUNC(4) << "Task id: " << task_id;
reactor_task_id_.store(task_id, std::memory_order_release);

return TransitionToWaitingState(MonitoredTaskState::kScheduling);
}
Expand Down
3 changes: 2 additions & 1 deletion src/yb/rpc/binary_call_parser.h
Expand Up @@ -20,6 +20,7 @@

#include "yb/rpc/rpc_fwd.h"
#include "yb/rpc/call_data.h"
#include "yb/rpc/reactor_thread_role.h"

namespace yb {
namespace rpc {
Expand Down Expand Up @@ -47,7 +48,7 @@ class BinaryCallParser {
// (or any of its ancestors) exceeds soft memory limit.
Result<ProcessCallsResult> Parse(const rpc::ConnectionPtr& connection, const IoVecs& data,
ReadBufferFull read_buffer_full,
const MemTrackerPtr* tracker_for_throttle);
const MemTrackerPtr* tracker_for_throttle) ON_REACTOR_THREAD;

private:
MemTrackerPtr buffer_tracker_;
Expand Down
19 changes: 12 additions & 7 deletions src/yb/rpc/compressed_stream.cc
Expand Up @@ -26,6 +26,7 @@
#include "yb/rpc/circular_read_buffer.h"
#include "yb/rpc/outbound_data.h"
#include "yb/rpc/refined_stream.h"
#include "yb/rpc/reactor_thread_role.h"

#include "yb/util/logging.h"
#include "yb/util/result.h"
Expand Down Expand Up @@ -53,7 +54,8 @@ class Compressor {

// Compress specified vector of input buffers into single output buffer.
virtual Status Compress(
const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) = 0;
const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data)
ON_REACTOR_THREAD = 0;

// Decompress specified input slice to specified output buffer.
virtual Result<ReadBufferFull> Decompress(StreamReadBuffer* inp, StreamReadBuffer* out) = 0;
Expand Down Expand Up @@ -177,7 +179,8 @@ class ZlibCompressor : public Compressor {
}

Status Compress(
const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) override {
const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data)
ON_REACTOR_THREAD override {
RefCntBuffer output(deflateBound(&deflate_stream_, TotalLen(input)));
deflate_stream_.avail_out = static_cast<unsigned int>(output.size());
deflate_stream_.next_out = output.udata();
Expand Down Expand Up @@ -398,7 +401,8 @@ class SnappyCompressor : public Compressor {
}

Status Compress(
const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) override {
const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data)
ON_REACTOR_THREAD override {
RangeSource<SmallRefCntBuffers::const_iterator> source(input.begin(), input.end());
auto input_size = source.Available();
bool stop = false;
Expand Down Expand Up @@ -685,7 +689,8 @@ class LZ4Compressor : public Compressor {
}

Status Compress(
const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data) override {
const SmallRefCntBuffers& input, RefinedStream* stream, OutboundDataPtr data)
ON_REACTOR_THREAD override {
// Increment iterator in loop body to be able to check whether it is last iteration or not.
VLOG_WITH_FUNC(4) << "input: " << CollectionToString(input, [](const auto& buf) {
return buf.size();
Expand Down Expand Up @@ -772,7 +777,7 @@ class CompressedRefiner : public StreamRefiner {
stream_ = stream;
}

Status ProcessHeader() override {
Status ProcessHeader() ON_REACTOR_THREAD override {
constexpr int kHeaderLen = 3;

auto data = stream_->ReadBuffer().AppendedVecs();
Expand All @@ -797,13 +802,13 @@ class CompressedRefiner : public StreamRefiner {
return stream_->Established(RefinedStreamState::kDisabled);
}

Status Send(OutboundDataPtr data) override {
Status Send(OutboundDataPtr data) ON_REACTOR_THREAD override {
boost::container::small_vector<RefCntBuffer, 10> input;
data->Serialize(&input);
return compressor_->Compress(input, stream_, std::move(data));
}

Status Handshake() override {
Status Handshake() ON_REACTOR_THREAD override {
if (stream_->local_side() == LocalSide::kClient) {
compressor_ = CreateOutboundCompressor(stream_->buffer_tracker());
if (!compressor_) {
Expand Down

0 comments on commit aa262ff

Please sign in to comment.