From 402bfae9becd34c05cd3bf3e07e60206e523ea78 Mon Sep 17 00:00:00 2001 From: chhy2009 Date: Tue, 9 Sep 2025 17:05:45 +0800 Subject: [PATCH] Bugfix: fixed the following bugs - fix the issue that program will cause a core dump at Promise::SetValue rarely when make rpc call from handle or io work thread - fix MQThreadPool memory leak when queue is full - fix the issue where the timeout parameter of HTTP asynchronous streaming ReadFullResponse interface did not take effect - fix the issue that stream length is 0 in HttpServiceProxy streaming request - correct type of timeout from int to uint32_t in http stream to avoid overflow during conversion - make tvar sampler remain valid before and after SamplerCollectorStop/Start --- trpc/future/future.h | 102 ++++++++---------- trpc/future/future_test.cc | 4 +- trpc/future/future_utility_test.cc | 20 ++++ trpc/stream/http/async/stream.cc | 10 +- trpc/stream/http/async/stream.h | 14 +-- .../stream/http/async/stream_reader_writer.cc | 83 +++++++------- trpc/stream/http/async/stream_reader_writer.h | 28 ++--- .../stream/http/http_client_stream_handler.cc | 1 + .../http/http_client_stream_handler_test.cc | 9 +- trpc/tvar/common/sampler.cc | 8 +- trpc/util/thread/mq_thread_pool.cc | 5 +- trpc/util/thread/mq_thread_pool_test.cc | 35 +++++- 12 files changed, 184 insertions(+), 135 deletions(-) diff --git a/trpc/future/future.h b/trpc/future/future.h index 718b0097..ba3da9c6 100644 --- a/trpc/future/future.h +++ b/trpc/future/future.h @@ -531,12 +531,12 @@ class FutureImpl : public FutureImplBase { state_.callback = new ContinuationWithValue(std::forward(func), std::forward(promise)); if (executor) state_.callback->SetExecutor(executor); - state_.has_callback = true; - std::lock_guard lock(state_.mtx); + std::unique_lock lock(state_.mtx); + state_.has_callback = true; // Got immediately executed. if (HasResult()) { - TrySchedule(); + TrySchedule(lock); } } @@ -549,12 +549,12 @@ class FutureImpl : public FutureImplBase { void SetTerminalCallback(Func&& func, Executor* executor) { state_.callback = new TerminalWithValue(std::forward(func)); if (executor) state_.callback->SetExecutor(executor); - state_.has_callback = true; - std::lock_guard lock(state_.mtx); + std::unique_lock lock(state_.mtx); + state_.has_callback = true; // Got immediately executed. if (HasResult()) { - TrySchedule(); + TrySchedule(lock); } } @@ -571,12 +571,12 @@ class FutureImpl : public FutureImplBase { state_.callback = new ContinuationWithFuture(std::forward(func), std::forward(promise)); if (executor) state_.callback->SetExecutor(executor); - state_.has_callback = true; - std::lock_guard lock(state_.mtx); + std::unique_lock lock(state_.mtx); + state_.has_callback = true; // Got immediately executed. if (HasResult()) { - TrySchedule(); + TrySchedule(lock); } } @@ -589,12 +589,12 @@ class FutureImpl : public FutureImplBase { void SetTerminalCallbackWrapped(Func&& func, Executor* executor) { state_.callback = new TerminalWithFuture(std::forward(func)); if (executor) state_.callback->SetExecutor(executor); - state_.has_callback = true; - std::lock_guard lock(state_.mtx); + std::unique_lock lock(state_.mtx); + state_.has_callback = true; // Got immediately executed. if (HasResult()) { - TrySchedule(); + TrySchedule(lock); } } @@ -603,11 +603,9 @@ class FutureImpl : public FutureImplBase { state_.value = std::move(value); state_.ready = true; // Result state may be looped by another thread. - { - std::lock_guard lock(state_.mtx); - state_.has_result = true; - } - TrySchedule(); + std::unique_lock lock(state_.mtx); + state_.has_result = true; + TrySchedule(lock); } /// @brief Exceptional value set through promise. @@ -615,29 +613,26 @@ class FutureImpl : public FutureImplBase { state_.exception = e; state_.failed = true; // Result state may be looped by another thread. - { - std::lock_guard lock(state_.mtx); - state_.has_result = true; - } - TrySchedule(); + std::unique_lock lock(state_.mtx); + state_.has_result = true; + TrySchedule(lock); } /// @brief Support non const parameter. void SetException(Exception&& e) { state_.exception = std::move(e); state_.failed = true; - { - std::lock_guard lock(state_.mtx); - state_.has_result = true; - } - TrySchedule(); + std::unique_lock lock(state_.mtx); + state_.has_result = true; + TrySchedule(lock); } private: /// @brief Future may or may not registered callback yet, check to inspire callback. /// @note Callback can only inspired once. - void TrySchedule() { + void TrySchedule(std::unique_lock& lock) { if (state_.has_callback) { + lock.unlock(); if (IsReady()) { if (state_.schedule_flag.test_and_set() == false) { state_.callback->SetValue(GetValue()); @@ -704,11 +699,11 @@ class FutureImpl : public FutureImplBase { state_.callback = new ContinuationWithValue(std::forward(func), std::forward(promise)); if (executor) state_.callback->SetExecutor(executor); - state_.has_callback = true; - std::lock_guard lock(state_.mtx); + std::unique_lock lock(state_.mtx); + state_.has_callback = true; if (HasResult()) { - TrySchedule(); + TrySchedule(lock); } } @@ -717,11 +712,11 @@ class FutureImpl : public FutureImplBase { void SetTerminalCallback(Func&& func, Executor* executor) { state_.callback = new TerminalWithValue(std::forward(func)); if (executor) state_.callback->SetExecutor(executor); - state_.has_callback = true; - std::lock_guard lock(state_.mtx); + std::unique_lock lock(state_.mtx); + state_.has_callback = true; if (HasResult()) { - TrySchedule(); + TrySchedule(lock); } } @@ -732,11 +727,11 @@ class FutureImpl : public FutureImplBase { state_.callback = new ContinuationWithFuture(std::forward(func), std::forward(promise)); if (executor) state_.callback->SetExecutor(executor); - state_.has_callback = true; - std::lock_guard lock(state_.mtx); + std::unique_lock lock(state_.mtx); + state_.has_callback = true; if (HasResult()) { - TrySchedule(); + TrySchedule(lock); } } @@ -745,11 +740,11 @@ class FutureImpl : public FutureImplBase { void SetTerminalCallbackWrapped(Func&& func, Executor* executor) { state_.callback = new TerminalWithFuture(std::forward(func)); if (executor) state_.callback->SetExecutor(executor); - state_.has_callback = true; - std::lock_guard lock(state_.mtx); + std::unique_lock lock(state_.mtx); + state_.has_callback = true; if (HasResult()) { - TrySchedule(); + TrySchedule(lock); } } @@ -757,39 +752,34 @@ class FutureImpl : public FutureImplBase { void SetValue(T&& value) { state_.value = std::move(value); state_.ready = true; - { - std::lock_guard lock(state_.mtx); - state_.has_result = true; - } - TrySchedule(); + std::unique_lock lock(state_.mtx); + state_.has_result = true; + TrySchedule(lock); } /// @brief Same as multiple version. void SetException(const Exception& e) { state_.exception = e; state_.failed = true; - { - std::lock_guard lock(state_.mtx); - state_.has_result = true; - } - TrySchedule(); + std::unique_lock lock(state_.mtx); + state_.has_result = true; + TrySchedule(lock); } /// @brief Same as multiple version. void SetException(Exception&& e) { state_.exception = std::move(e); state_.failed = true; - { - std::lock_guard lock(state_.mtx); - state_.has_result = true; - } - TrySchedule(); + std::unique_lock lock(state_.mtx); + state_.has_result = true; + TrySchedule(lock); } private: /// @brief Same as multiple version. - void TrySchedule() { + void TrySchedule(std::unique_lock& lock) { if (state_.has_callback) { + lock.unlock(); if (IsReady()) { if (state_.schedule_flag.test_and_set() == false) { state_.callback->SetValue(GetValue()); diff --git a/trpc/future/future_test.cc b/trpc/future/future_test.cc index 79543273..a5f75b38 100644 --- a/trpc/future/future_test.cc +++ b/trpc/future/future_test.cc @@ -641,15 +641,13 @@ TEST(Future, ThenCopyableExecutor) { // Test mltiple threads executor. TEST(Future, TestExecuteOkInDifferentThread) { static int exec_count = 0; - int loop_times = 50000; + int loop_times = 500; for (int i = 0; i < loop_times; i++) { Promise pr; auto fut = pr.GetFuture(); std::thread t([pr = std::move(pr)]() mutable { - std::this_thread::sleep_for(std::chrono::nanoseconds(1)); pr.SetValue(1); }); - std::this_thread::sleep_for(std::chrono::nanoseconds(10000)); fut.Then([](int&& val) { exec_count++; return MakeReadyFuture<>(); diff --git a/trpc/future/future_utility_test.cc b/trpc/future/future_utility_test.cc index c374157f..0a676ae1 100644 --- a/trpc/future/future_utility_test.cc +++ b/trpc/future/future_utility_test.cc @@ -629,4 +629,24 @@ TEST(BlockingTryGet, test) { ASSERT_TRUE(TestBlockingTryGet(100, 1000)); } +// Test in the capture promise by reference situation, the promise object is not destroyed during SetValue +TEST(BlockingGet, capture_promise_by_ref) { + constexpr int kMaxLoopTimes = 10000; + std::atomic execute_times = 0; + for (int i = 0; i < kMaxLoopTimes; ++i) { + std::unique_ptr t; + { + trpc::Promise<> pr; + auto fut = pr.GetFuture(); + t = std::make_unique([&]() { + pr.SetValue(); + execute_times++; + }); + future::BlockingGet(std::move(fut)); + } + t->join(); + } + EXPECT_EQ(execute_times.load(), kMaxLoopTimes); +} + } // namespace trpc diff --git a/trpc/stream/http/async/stream.cc b/trpc/stream/http/async/stream.cc index 99fdb6a0..ea3a12f0 100644 --- a/trpc/stream/http/async/stream.cc +++ b/trpc/stream/http/async/stream.cc @@ -56,7 +56,7 @@ Future<> HttpAsyncStream::PushSendMessage(HttpStreamFramePtr&& msg) { return AsyncWrite(std::move(out)); } -Future HttpAsyncStream::AsyncReadHeader(int timeout) { +Future HttpAsyncStream::AsyncReadHeader(uint32_t timeout) { pending_header_.val = Promise(); auto ft = pending_header_.val.value().GetFuture(); @@ -80,7 +80,7 @@ Future HttpAsyncStream::AsyncReadHeader(int timeout) { return ft; } -Future HttpAsyncStream::AsyncReadChunk(int timeout) { +Future HttpAsyncStream::AsyncReadChunk(uint32_t timeout) { // it can read only in chunked mode if (read_mode_ != DataMode::kChunked) { Status status{TRPC_STREAM_UNKNOWN_ERR, 0, "Can't read no chunk data"}; @@ -90,15 +90,15 @@ Future HttpAsyncStream::AsyncReadChunk(int timeout) { return AsyncReadInner(ReadOperation::kReadChunk, 0, timeout); } -Future HttpAsyncStream::AsyncReadAtMost(uint64_t len, int timeout) { +Future HttpAsyncStream::AsyncReadAtMost(uint64_t len, uint32_t timeout) { return AsyncReadInner(ReadOperation::kReadAtMost, len, timeout); } -Future HttpAsyncStream::AsyncReadExactly(uint64_t len, int timeout) { +Future HttpAsyncStream::AsyncReadExactly(uint64_t len, uint32_t timeout) { return AsyncReadInner(ReadOperation::kReadExactly, len, timeout); } -Future HttpAsyncStream::AsyncReadInner(ReadOperation op, uint64_t len, int timeout) { +Future HttpAsyncStream::AsyncReadInner(ReadOperation op, uint64_t len, uint32_t timeout) { if (read_mode_ == DataMode::kNoData) { // can not read data when content-length equal to 0 return MakeReadyFuture(NoncontiguousBuffer{}); diff --git a/trpc/stream/http/async/stream.h b/trpc/stream/http/async/stream.h index 79338dda..e2cd7909 100644 --- a/trpc/stream/http/async/stream.h +++ b/trpc/stream/http/async/stream.h @@ -36,11 +36,11 @@ class HttpAsyncStream : public HttpCommonStream { /// @brief Reads the header asynchronously. /// @param timeout time to wait for the header to be ready - Future AsyncReadHeader(int timeout = std::numeric_limits::max()); + Future AsyncReadHeader(uint32_t timeout = std::numeric_limits::max()); /// @brief Reads a chunk in chunked mode asynchronously, note that reading in non-chunked mode will fail /// @param timeout time to wait for the header to be ready - Future AsyncReadChunk(int timeout = std::numeric_limits::max()); + Future AsyncReadChunk(uint32_t timeout = std::numeric_limits::max()); /// @brief Reads at most len data asynchronously. /// @param len max size to read @@ -50,7 +50,7 @@ class HttpAsyncStream : public HttpCommonStream { /// An empty buffer means that the end has been read /// Usage scenario 1: Limits the maximum length of each read When the memory is limited. /// Usage scenario 2: Gets part of data in time and send it downstream on route server. - Future AsyncReadAtMost(uint64_t len, int timeout = std::numeric_limits::max()); + Future AsyncReadAtMost(uint64_t len, uint32_t timeout = std::numeric_limits::max()); /// @brief Reads data with a fixed length. If eof is read, it will return as much data as there is in the network /// @param len size to read @@ -58,7 +58,7 @@ class HttpAsyncStream : public HttpCommonStream { /// @note If the read buffer size is less than the required length, it means that eof has been read. /// Usage scenario 1: The requested data is compressed by a fixed size, and needs to be read and decompressed by /// a fixed size. - Future AsyncReadExactly(uint64_t len, int timeout = std::numeric_limits::max()); + Future AsyncReadExactly(uint64_t len, uint32_t timeout = std::numeric_limits::max()); protected: template @@ -98,7 +98,7 @@ class HttpAsyncStream : public HttpCommonStream { /// @brief Creates a scheduled waiting task template - void CreatePendingTimer(PendingVal* pending, int timeout); + void CreatePendingTimer(PendingVal* pending, uint32_t timeout); /// @brief Checks the pending state template @@ -110,7 +110,7 @@ class HttpAsyncStream : public HttpCommonStream { void NotifyPendingDataQueue(); - Future AsyncReadInner(ReadOperation op, uint64_t len, int timeout); + Future AsyncReadInner(ReadOperation op, uint64_t len, uint32_t timeout); protected: /// @brief Used to store asynchronous data request @@ -147,7 +147,7 @@ void HttpAsyncStream::PendingDone(PendingVal* pending) { } template -void HttpAsyncStream::CreatePendingTimer(PendingVal* pending, int timeout) { +void HttpAsyncStream::CreatePendingTimer(PendingVal* pending, uint32_t timeout) { TRPC_CHECK_EQ(pending->timer_id, iotimer::InvalidID); pending->timer_id = iotimer::Create(timeout, 0, [this, pending]() { if (!pending->val) { diff --git a/trpc/stream/http/async/stream_reader_writer.cc b/trpc/stream/http/async/stream_reader_writer.cc index c6099de1..addffee0 100644 --- a/trpc/stream/http/async/stream_reader_writer.cc +++ b/trpc/stream/http/async/stream_reader_writer.cc @@ -22,15 +22,15 @@ HttpAsyncStreamReader::HttpAsyncStreamReader(HttpAsyncStreamPtr stream) : stream TRPC_ASSERT(stream_ && "HttpAsyncStreamPtr can't be nullptr"); } -Future HttpAsyncStreamReader::ReadHeader(int timeout) { return stream_->AsyncReadHeader(timeout); } +Future HttpAsyncStreamReader::ReadHeader(uint32_t timeout) { return stream_->AsyncReadHeader(timeout); } -Future HttpAsyncStreamReader::ReadChunk(int timeout) { return stream_->AsyncReadChunk(timeout); } +Future HttpAsyncStreamReader::ReadChunk(uint32_t timeout) { return stream_->AsyncReadChunk(timeout); } -Future HttpAsyncStreamReader::ReadAtMost(uint64_t len, int timeout) { +Future HttpAsyncStreamReader::ReadAtMost(uint64_t len, uint32_t timeout) { return stream_->AsyncReadAtMost(len, timeout); } -Future HttpAsyncStreamReader::ReadExactly(uint64_t len, int timeout) { +Future HttpAsyncStreamReader::ReadExactly(uint64_t len, uint32_t timeout) { return stream_->AsyncReadExactly(len, timeout); } @@ -87,15 +87,15 @@ HttpAsyncStreamReaderWriter::HttpAsyncStreamReaderWriter(HttpAsyncStreamPtr stre TRPC_ASSERT(stream_ && "HttpAsyncStreamPtr can't be nullptr"); } -Future HttpAsyncStreamReaderWriter::ReadHeader(int timeout) { return reader_->ReadHeader(timeout); } +Future HttpAsyncStreamReaderWriter::ReadHeader(uint32_t timeout) { return reader_->ReadHeader(timeout); } -Future HttpAsyncStreamReaderWriter::ReadChunk(int timeout) { return reader_->ReadChunk(timeout); } +Future HttpAsyncStreamReaderWriter::ReadChunk(uint32_t timeout) { return reader_->ReadChunk(timeout); } -Future HttpAsyncStreamReaderWriter::ReadAtMost(uint64_t len, int timeout) { +Future HttpAsyncStreamReaderWriter::ReadAtMost(uint64_t len, uint32_t timeout) { return reader_->ReadAtMost(len, timeout); } -Future HttpAsyncStreamReaderWriter::ReadExactly(uint64_t len, int timeout) { +Future HttpAsyncStreamReaderWriter::ReadExactly(uint64_t len, uint32_t timeout) { return reader_->ReadExactly(len, timeout); } @@ -177,34 +177,34 @@ http::HttpRequestPtr ConstructHttpRequest(HttpRequestLine&& start_line, trpc::ht } template -Future ReadFullRequestImpl(T rw, int timeout) { +Future ReadFullRequestImpl(T rw, uint32_t timeout) { HttpRequestLine start_line = rw->GetRequestLine(); // reads request line http::PathParameters param = rw->GetParameters(); // Reads header first, then read the body. - return rw->ReadHeader(timeout).Then( - [rw, start_line = std::move(start_line), param = std::move(param)](trpc::http::HttpHeader&& header) mutable { - // Read full request body if it has any. - bool has_data = false; - uint64_t read_bytes = std::numeric_limits::max(); - if (header.Get(http::kHeaderTransferEncoding) == http::kTransferEncodingChunked) { - has_data = true; - } - if (const std::string& len = header.Get(http::kHeaderContentLength); !len.empty()) { - has_data = true; - // The |len| here is valid(a failure will occur if it has bad length). - read_bytes = trpc::TryParse(len).value(); - } - if (has_data) { - return rw->ReadExactly(read_bytes) - .Then([start_line = std::move(start_line), param = std::move(param), - header = std::move(header)](NoncontiguousBuffer&& data) mutable { - return MakeReadyFuture( - ConstructHttpRequest(std::move(start_line), std::move(param), std::move(header), std::move(data))); - }); - } - return MakeReadyFuture( - ConstructHttpRequest(std::move(start_line), std::move(param), std::move(header), NoncontiguousBuffer{})); - }); + return rw->ReadHeader(timeout).Then([rw, timeout, start_line = std::move(start_line), + param = std::move(param)](trpc::http::HttpHeader&& header) mutable { + // Read full request body if it has any. + bool has_data = false; + uint64_t read_bytes = std::numeric_limits::max(); + if (header.Get(http::kHeaderTransferEncoding) == http::kTransferEncodingChunked) { + has_data = true; + } + if (const std::string& len = header.Get(http::kHeaderContentLength); !len.empty()) { + has_data = true; + // The |len| here is valid(a failure will occur if it has bad length). + read_bytes = trpc::TryParse(len).value(); + } + if (has_data) { + return rw->ReadExactly(read_bytes, timeout) + .Then([start_line = std::move(start_line), param = std::move(param), + header = std::move(header)](NoncontiguousBuffer&& data) mutable { + return MakeReadyFuture( + ConstructHttpRequest(std::move(start_line), std::move(param), std::move(header), std::move(data))); + }); + } + return MakeReadyFuture( + ConstructHttpRequest(std::move(start_line), std::move(param), std::move(header), NoncontiguousBuffer{})); + }); } } // namespace @@ -217,11 +217,11 @@ Future<> WriteFullResponse(HttpServerAsyncStreamWriterPtr rw, http::HttpResponse return WriteFullResponseImpl(rw, std::move(rsp)); } -Future ReadFullRequest(HttpServerAsyncStreamReaderWriterPtr rw, int timeout) { +Future ReadFullRequest(HttpServerAsyncStreamReaderWriterPtr rw, uint32_t timeout) { return ReadFullRequestImpl(rw, timeout); } -Future ReadFullRequest(HttpServerAsyncStreamReaderPtr rw, int timeout) { +Future ReadFullRequest(HttpServerAsyncStreamReaderPtr rw, uint32_t timeout) { return ReadFullRequestImpl(rw, timeout); } @@ -283,9 +283,10 @@ trpc::http::HttpResponsePtr ConstructHttpResponse(HttpStatusLine&& staus_line, h } template -Future ReadFullResponseImpl(T rw, int timeout) { - return rw->ReadStatusLine().Then([rw](HttpStatusLine&& start_line) { - return rw->ReadHeader().Then([rw, start_line = std::move(start_line)](trpc::http::HttpHeader&& header) mutable { +Future ReadFullResponseImpl(T rw, uint32_t timeout) { + return rw->ReadStatusLine(timeout).Then([rw, timeout](HttpStatusLine&& start_line) { + return rw->ReadHeader(timeout).Then([rw, timeout, + start_line = std::move(start_line)](trpc::http::HttpHeader&& header) mutable { // Reads content if it has. bool has_data = false; uint64_t read_bytes = std::numeric_limits::max(); @@ -298,7 +299,7 @@ Future ReadFullResponseImpl(T rw, int timeout) { read_bytes = trpc::TryParse(len).value(); } if (has_data) { - return rw->ReadExactly(read_bytes) + return rw->ReadExactly(read_bytes, timeout) .Then([start_line = std::move(start_line), header = std::move(header)](NoncontiguousBuffer&& data) mutable { return MakeReadyFuture( ConstructHttpResponse(std::move(start_line), std::move(header), std::move(data))); @@ -311,10 +312,10 @@ Future ReadFullResponseImpl(T rw, int timeout) { } } // namespace -Future ReadFullResponse(HttpClientAsyncStreamReaderWriterPtr rw, int timeout) { +Future ReadFullResponse(HttpClientAsyncStreamReaderWriterPtr rw, uint32_t timeout) { return ReadFullResponseImpl(rw, timeout); } -Future ReadFullResponse(HttpClientAsyncStreamReaderPtr rw, int timeout) { +Future ReadFullResponse(HttpClientAsyncStreamReaderPtr rw, uint32_t timeout) { return ReadFullResponseImpl(rw, timeout); } diff --git a/trpc/stream/http/async/stream_reader_writer.h b/trpc/stream/http/async/stream_reader_writer.h index 6d4ee274..04d06318 100644 --- a/trpc/stream/http/async/stream_reader_writer.h +++ b/trpc/stream/http/async/stream_reader_writer.h @@ -32,11 +32,11 @@ class HttpAsyncStreamReader : public RefCounted { /// @brief Reads Header /// @param timeout time to wait for the header to be ready - Future ReadHeader(int timeout = std::numeric_limits::max()); + Future ReadHeader(uint32_t timeout = std::numeric_limits::max()); /// @brief Reads a chunk in chunked mode, note that reading in non-chunked mode will fail /// @param timeout time to wait for the header to be ready - Future ReadChunk(int timeout = std::numeric_limits::max()); + Future ReadChunk(uint32_t timeout = std::numeric_limits::max()); /// @brief Reads at most len data. /// @param len max size to read @@ -46,7 +46,7 @@ class HttpAsyncStreamReader : public RefCounted { /// An empty buffer means that the end has been read /// Usage scenario 1: Limits the maximum length of each read When the memory is limited. /// Usage scenario 2: Gets part of data in time and send it downstream on route server. - Future ReadAtMost(uint64_t len, int timeout = std::numeric_limits::max()); + Future ReadAtMost(uint64_t len, uint32_t timeout = std::numeric_limits::max()); /// @brief Reads data with a fixed length. If eof is read, it will return as much data as there is in the network /// @param len size to read @@ -54,7 +54,7 @@ class HttpAsyncStreamReader : public RefCounted { /// @note If the read buffer size is less than the required length, it means that eof has been read. /// Usage scenario 1: The requested data is compressed by a fixed size, and needs to be read and decompressed by /// a fixed size. - Future ReadExactly(uint64_t len, int timeout = std::numeric_limits::max()); + Future ReadExactly(uint64_t len, uint32_t timeout = std::numeric_limits::max()); private: HttpAsyncStreamPtr stream_{nullptr}; @@ -87,7 +87,7 @@ class HttpClientAsyncStreamReader : public HttpAsyncStreamReader { } /// @brief Reads the status line of response. - Future ReadStatusLine(int timeout = std::numeric_limits::max()) { + Future ReadStatusLine(uint32_t timeout = std::numeric_limits::max()) { return stream_->ReadStatusLine(timeout); } @@ -160,11 +160,11 @@ class HttpAsyncStreamReaderWriter : public RefCounted ReadHeader(int timeout = std::numeric_limits::max()); + Future ReadHeader(uint32_t timeout = std::numeric_limits::max()); /// @brief Reads a chunk in chunked mode, note that reading in non-chunked mode will fail /// @param timeout time to wait for the header to be ready - Future ReadChunk(int timeout = std::numeric_limits::max()); + Future ReadChunk(uint32_t timeout = std::numeric_limits::max()); /// @brief Reads at most len data. /// @param len max size to read @@ -174,7 +174,7 @@ class HttpAsyncStreamReaderWriter : public RefCounted ReadAtMost(uint64_t len, int timeout = std::numeric_limits::max()); + Future ReadAtMost(uint64_t len, uint32_t timeout = std::numeric_limits::max()); /// @brief Reads data with a fixed length. If eof is read, it will return as much data as there is in the network /// @param len size to read @@ -182,7 +182,7 @@ class HttpAsyncStreamReaderWriter : public RefCounted ReadExactly(uint64_t len, int timeout = std::numeric_limits::max()); + Future ReadExactly(uint64_t len, uint32_t timeout = std::numeric_limits::max()); /// @brief Writes header or trailer Future<> WriteHeader(http::HttpHeader&& header); @@ -247,7 +247,7 @@ class HttpClientAsyncStreamReaderWriter : public HttpAsyncStreamReaderWriter { Future<> WriteRequestLine(HttpRequestLine&& req_line) { return writer_->WriteRequestLine(std::move(req_line)); } /// @brief Reads status line of response. - Future ReadStatusLine(int timeout = std::numeric_limits::max()) { + Future ReadStatusLine(uint32_t timeout = std::numeric_limits::max()) { return reader_->ReadStatusLine(timeout); } @@ -279,14 +279,14 @@ Future<> WriteFullResponse(HttpServerAsyncStreamWriterPtr rw, http::HttpResponse /// @brief Reads a complete request from the stream. Future ReadFullRequest(HttpServerAsyncStreamReaderWriterPtr rw, - int timeout = std::numeric_limits::max()); + uint32_t timeout = std::numeric_limits::max()); Future ReadFullRequest(HttpServerAsyncStreamReaderPtr rw, - int timeout = std::numeric_limits::max()); + uint32_t timeout = std::numeric_limits::max()); /// @brief Reads a complete response from the stream. Future ReadFullResponse(HttpClientAsyncStreamReaderWriterPtr rw, - int timeout = std::numeric_limits::max()); + uint32_t timeout = std::numeric_limits::max()); Future ReadFullResponse(HttpClientAsyncStreamReaderPtr rw, - int timeout = std::numeric_limits::max()); + uint32_t timeout = std::numeric_limits::max()); } // namespace trpc::stream diff --git a/trpc/stream/http/http_client_stream_handler.cc b/trpc/stream/http/http_client_stream_handler.cc index c7a80943..23fc2cf0 100644 --- a/trpc/stream/http/http_client_stream_handler.cc +++ b/trpc/stream/http/http_client_stream_handler.cc @@ -25,6 +25,7 @@ StreamReaderWriterProviderPtr HttpClientStreamHandler::CreateStream(StreamOption int HttpClientStreamHandler::SendMessage(const std::any& msg, NoncontiguousBuffer&& send_data) { IoMessage io_message; + io_message.context_ext = std::any_cast(msg)->GetCurrentContextExt(); io_message.buffer = std::move(send_data); io_message.msg = msg; return options_.send(std::move(io_message)); diff --git a/trpc/stream/http/http_client_stream_handler_test.cc b/trpc/stream/http/http_client_stream_handler_test.cc index b63fbc53..fc74d41f 100644 --- a/trpc/stream/http/http_client_stream_handler_test.cc +++ b/trpc/stream/http/http_client_stream_handler_test.cc @@ -28,14 +28,17 @@ TEST(HttpClientStreamHandlerTest, Run) { stream::StreamOptions options; options.send = [](IoMessage&& message) { return 0; }; stream::HttpClientStreamHandler handler(std::move(options)); - EXPECT_TRUE(handler.Init()); - auto stream = handler.CreateStream(stream::StreamOptions{}); + + auto ctx = MakeRefCounted(); + stream::StreamOptions options2; + options2.context.context = ctx; + auto stream = handler.CreateStream(std::move(options2)); EXPECT_TRUE(stream); EXPECT_TRUE(handler.GetHttpStream()); NoncontiguousBuffer in = CreateBufferSlow("hello"); - EXPECT_EQ(0, handler.SendMessage("", std::move(in))); + EXPECT_EQ(0, handler.SendMessage(ctx, std::move(in))); // Pushes the response header to stream. http::HttpResponse http_response; diff --git a/trpc/tvar/common/sampler.cc b/trpc/tvar/common/sampler.cc index a8827ebe..9e58c3ee 100644 --- a/trpc/tvar/common/sampler.cc +++ b/trpc/tvar/common/sampler.cc @@ -124,22 +124,21 @@ class SamplerCollector : public WriteMostly { void Run() { SetCurrentThreadName("sampler_collector"); - std::list root; int consecutive_no_sleep = 0; while (!stop_) { auto abs_time = trpc::time::GetSteadyMicroSeconds(); if (auto s = Reset(); !s.empty()) { - root.splice(root.end(), s); + root_.splice(root_.end(), s); } int removed_num = 0; int sampled_num = 0; - for (auto itr = root.begin(); itr != root.end();) { + for (auto itr = root_.begin(); itr != root_.end();) { // We may remove p from the list, save next first. std::unique_lock lc((*itr)->mutex_); if (!(*itr)->used_) { lc.unlock(); - itr = root.erase(itr); + itr = root_.erase(itr); ++removed_num; } else { (*itr)->TakeSample(); @@ -174,6 +173,7 @@ class SamplerCollector : public WriteMostly { bool stop_{false}; std::unique_ptr thread_{nullptr}; int64_t calculated_time_us_{0}; + std::list root_{}; }; Sampler::Sampler() : used_(true) {} diff --git a/trpc/util/thread/mq_thread_pool.cc b/trpc/util/thread/mq_thread_pool.cc index 709b81e3..cd23b874 100644 --- a/trpc/util/thread/mq_thread_pool.cc +++ b/trpc/util/thread/mq_thread_pool.cc @@ -55,11 +55,14 @@ bool MQThreadPool::AddTask(Function&& job) { return true; } + auto task = new MQThreadPoolTask(std::move(job)); // Other threads add tasks to the global queue - if (global_task_queue_.Push(new MQThreadPoolTask(std::move(job)))) { + if (global_task_queue_.Push(task)) { // notify consume notifier_.Notify(false); return true; + } else { + delete task; } return false; diff --git a/trpc/util/thread/mq_thread_pool_test.cc b/trpc/util/thread/mq_thread_pool_test.cc index bac25681..3faf6e4c 100644 --- a/trpc/util/thread/mq_thread_pool_test.cc +++ b/trpc/util/thread/mq_thread_pool_test.cc @@ -3,6 +3,7 @@ // Tencent is pleased to support the open source community by making tRPC available. // // Copyright (C) 2023 Tencent. +// // All rights reserved. // // If you have downloaded a copy of the tRPC source code from Tencent, @@ -33,7 +34,7 @@ TEST(MQThreadPoolTest, All) { usleep(1000); } - std::promise waiters[10000]; + std::promise waiters[1000]; for (auto& waiter : waiters) { thread_pool.AddTask([&waiter] { int64_t res = 0; @@ -54,4 +55,36 @@ TEST(MQThreadPoolTest, All) { ASSERT_FALSE(thread_pool.AddTask([]() {})); } +TEST(MQThreadPoolTest, MQThreadPoolAddTaskFail) { + trpc::ThreadPoolOption thread_pool_option; + thread_pool_option.thread_num = 1; + thread_pool_option.task_queue_size = 2; + + trpc::MQThreadPool thread_pool(std::move(thread_pool_option)); + thread_pool.Start(); + + std::atomic count{0}; + std::atomic failed_count{0}; + for (int i = 0; i < 100; ++i) { + bool ret = thread_pool.AddTask([&count] { + int64_t res = 0; + for (unsigned int i = 1; i <= 1000000; i++) { + res += i; + } + ++count; + }); + if (!ret) { + ++failed_count; + } + } + + while ((count + failed_count) != 100) { + ::usleep(10000); + } + + EXPECT_TRUE(failed_count > 0); + + thread_pool.Stop(); +} + } // namespace trpc::testing