Skip to content

Commit

Permalink
[CP] add _stream_rpc_max_wait_timeout to avoid tenant worker hung for…
Browse files Browse the repository at this point in the history
… waiting next request
  • Loading branch information
liucc1997 authored and ob-robot committed Nov 28, 2023
1 parent 647341e commit df33c1c
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 2 deletions.
11 changes: 11 additions & 0 deletions deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ int __attribute__((weak)) check_arb_white_list(int64_t cluster_id, bool& is_arb)
return ret;
}

int64_t __attribute__((weak)) get_stream_rpc_max_wait_timeout(int64_t tenant_id)
{
//do nothing
UNUSED(tenant_id);
return ObRpcProcessorBase::DEFAULT_WAIT_NEXT_PACKET_TIMEOUT;
}
void ObRpcProcessorBase::reuse()
{
rpc_pkt_ = NULL;
Expand Down Expand Up @@ -542,6 +548,11 @@ int ObRpcProcessorBase::flush(int64_t wait_timeout)
rpc::ObRequest *req = NULL;
UNIS_VERSION_GUARD(unis_version_);

const int64_t stream_rpc_max_wait_timeout = get_stream_rpc_max_wait_timeout(tenant_id_);
if (0 == wait_timeout || wait_timeout > stream_rpc_max_wait_timeout) {
wait_timeout = stream_rpc_max_wait_timeout;
}

if (nullptr == sc_) {
sc_ = OB_NEWx(ObRpcStreamCond, (&lib::this_worker().get_sql_arena_allocator()), *sh_);
if (nullptr == sc_) {
Expand Down
2 changes: 1 addition & 1 deletion deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class ObRpcProcessorBase : public rpc::frame::ObReqProcessor
virtual int deserialize();
virtual int serialize();
virtual int response(const int retcode) { return part_response(retcode, true); }
virtual int flush(int64_t wait_timeout = DEFAULT_WAIT_NEXT_PACKET_TIMEOUT);
virtual int flush(int64_t wait_timeout = 0);

void set_preserve_recv_data() { preserve_recv_data_ = true; }
void set_result_compress_type(common::ObCompressorType t) { result_compress_type_ = t; }
Expand Down
2 changes: 1 addition & 1 deletion deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ using namespace oceanbase::obrpc;

ObRpcSessionHandler::ObRpcSessionHandler()
{
sessid_ = 0;
sessid_ = ObTimeUtility::current_time();
ObMemAttr attr(OB_SERVER_TENANT_ID, ObModIds::OB_HASH_NODE_NEXT_WAIT_MAP);
SET_USE_500(attr);
next_wait_map_.create(MAX_COND_COUNT, attr, attr);
Expand Down
10 changes: 10 additions & 0 deletions src/share/config/ob_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,16 @@ int64_t get_max_rpc_packet_size()
{
return GCONF._max_rpc_packet_size;
}

int64_t get_stream_rpc_max_wait_timeout(int64_t tenant_id)
{
int64_t stream_rpc_max_wait_timeout = ObRpcProcessorBase::DEFAULT_WAIT_NEXT_PACKET_TIMEOUT;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (OB_LIKELY(tenant_config.is_valid())) {
stream_rpc_max_wait_timeout = tenant_config->_stream_rpc_max_wait_timeout;
}
return stream_rpc_max_wait_timeout;
}
} // end of namespace obrpc
} // end of namespace oceanbase

Expand Down
3 changes: 3 additions & 0 deletions src/share/parameter/ob_parameter_seed.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,9 @@ DEF_TIME(_ob_get_gts_ahead_interval, OB_CLUSTER_PARAMETER, "0s", "[0s, 1s]",
DEF_TIME(rpc_timeout, OB_CLUSTER_PARAMETER, "2s",
"the time during which a RPC request is permitted to execute before it is terminated",
ObParameterAttr(Section::RPC, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(_stream_rpc_max_wait_timeout, OB_TENANT_PARAMETER, "30s", "[1s,)",
"the maximum timeout for a tenant worker thread to wait for the next request while processing streaming RPC",
ObParameterAttr(Section::RPC, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_enable_pkt_nio, OB_CLUSTER_PARAMETER, "True",
"enable pkt-nio, the new RPC framework"
"Value: True:turned on; False: turned off",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ _sqlexec_disable_hash_based_distagg_tiv
_sql_insert_multi_values_split_opt
_stall_threshold_for_dynamic_worker
_storage_meta_memory_limit_percentage
_stream_rpc_max_wait_timeout
_temporary_file_io_area_size
_temporary_file_meta_memory_limit_percentage
_trace_control_info
Expand Down

0 comments on commit df33c1c

Please sign in to comment.