Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 46 additions & 56 deletions trpc/future/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,12 @@ class FutureImpl : public FutureImplBase {
state_.callback = new ContinuationWithValue<Func, PromiseType, T...>(std::forward<Func>(func),
std::forward<PromiseType>(promise));
if (executor) state_.callback->SetExecutor(executor);
state_.has_callback = true;

std::lock_guard<std::mutex> lock(state_.mtx);
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_callback = true;
// Got immediately executed.
if (HasResult()) {
TrySchedule();
TrySchedule(lock);
}
}

Expand All @@ -549,12 +549,12 @@ class FutureImpl : public FutureImplBase {
void SetTerminalCallback(Func&& func, Executor* executor) {
state_.callback = new TerminalWithValue<Func, T...>(std::forward<Func>(func));
if (executor) state_.callback->SetExecutor(executor);
state_.has_callback = true;

std::lock_guard<std::mutex> lock(state_.mtx);
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_callback = true;
// Got immediately executed.
if (HasResult()) {
TrySchedule();
TrySchedule(lock);
}
}

Expand All @@ -571,12 +571,12 @@ class FutureImpl : public FutureImplBase {
state_.callback = new ContinuationWithFuture<Func, PromiseType, T...>(std::forward<Func>(func),
std::forward<PromiseType>(promise));
if (executor) state_.callback->SetExecutor(executor);
state_.has_callback = true;

std::lock_guard<std::mutex> lock(state_.mtx);
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_callback = true;
// Got immediately executed.
if (HasResult()) {
TrySchedule();
TrySchedule(lock);
}
}

Expand All @@ -589,12 +589,12 @@ class FutureImpl : public FutureImplBase {
void SetTerminalCallbackWrapped(Func&& func, Executor* executor) {
state_.callback = new TerminalWithFuture<Func, T...>(std::forward<Func>(func));
if (executor) state_.callback->SetExecutor(executor);
state_.has_callback = true;

std::lock_guard<std::mutex> lock(state_.mtx);
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_callback = true;
// Got immediately executed.
if (HasResult()) {
TrySchedule();
TrySchedule(lock);
}
}

Expand All @@ -603,41 +603,36 @@ class FutureImpl : public FutureImplBase {
state_.value = std::move(value);
state_.ready = true;
// Result state may be looped by another thread.
{
std::lock_guard<std::mutex> lock(state_.mtx);
state_.has_result = true;
}
TrySchedule();
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_result = true;
TrySchedule(lock);
}

/// @brief Exceptional value set through promise.
void SetException(const Exception& e) {
state_.exception = e;
state_.failed = true;
// Result state may be looped by another thread.
{
std::lock_guard<std::mutex> lock(state_.mtx);
state_.has_result = true;
}
TrySchedule();
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_result = true;
TrySchedule(lock);
}

/// @brief Support non const parameter.
void SetException(Exception&& e) {
state_.exception = std::move(e);
state_.failed = true;
{
std::lock_guard<std::mutex> lock(state_.mtx);
state_.has_result = true;
}
TrySchedule();
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_result = true;
TrySchedule(lock);
}

private:
/// @brief Future may or may not registered callback yet, check to inspire callback.
/// @note Callback can only inspired once.
void TrySchedule() {
void TrySchedule(std::unique_lock<std::mutex>& lock) {
if (state_.has_callback) {
lock.unlock();
if (IsReady()) {
if (state_.schedule_flag.test_and_set() == false) {
state_.callback->SetValue(GetValue());
Expand Down Expand Up @@ -704,11 +699,11 @@ class FutureImpl<T> : public FutureImplBase {
state_.callback =
new ContinuationWithValue<Func, PromiseType, T>(std::forward<Func>(func), std::forward<PromiseType>(promise));
if (executor) state_.callback->SetExecutor(executor);
state_.has_callback = true;

std::lock_guard<std::mutex> lock(state_.mtx);
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_callback = true;
if (HasResult()) {
TrySchedule();
TrySchedule(lock);
}
}

Expand All @@ -717,11 +712,11 @@ class FutureImpl<T> : public FutureImplBase {
void SetTerminalCallback(Func&& func, Executor* executor) {
state_.callback = new TerminalWithValue<Func, T>(std::forward<Func>(func));
if (executor) state_.callback->SetExecutor(executor);
state_.has_callback = true;

std::lock_guard<std::mutex> lock(state_.mtx);
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_callback = true;
if (HasResult()) {
TrySchedule();
TrySchedule(lock);
}
}

Expand All @@ -732,11 +727,11 @@ class FutureImpl<T> : public FutureImplBase {
state_.callback =
new ContinuationWithFuture<Func, PromiseType, T>(std::forward<Func>(func), std::forward<PromiseType>(promise));
if (executor) state_.callback->SetExecutor(executor);
state_.has_callback = true;

std::lock_guard<std::mutex> lock(state_.mtx);
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_callback = true;
if (HasResult()) {
TrySchedule();
TrySchedule(lock);
}
}

Expand All @@ -745,51 +740,46 @@ class FutureImpl<T> : public FutureImplBase {
void SetTerminalCallbackWrapped(Func&& func, Executor* executor) {
state_.callback = new TerminalWithFuture<Func, T>(std::forward<Func>(func));
if (executor) state_.callback->SetExecutor(executor);
state_.has_callback = true;

std::lock_guard<std::mutex> lock(state_.mtx);
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_callback = true;
if (HasResult()) {
TrySchedule();
TrySchedule(lock);
}
}

/// @brief Same as multiple version.
void SetValue(T&& value) {
state_.value = std::move(value);
state_.ready = true;
{
std::lock_guard<std::mutex> lock(state_.mtx);
state_.has_result = true;
}
TrySchedule();
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_result = true;
TrySchedule(lock);
}

/// @brief Same as multiple version.
void SetException(const Exception& e) {
state_.exception = e;
state_.failed = true;
{
std::lock_guard<std::mutex> lock(state_.mtx);
state_.has_result = true;
}
TrySchedule();
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_result = true;
TrySchedule(lock);
}

/// @brief Same as multiple version.
void SetException(Exception&& e) {
state_.exception = std::move(e);
state_.failed = true;
{
std::lock_guard<std::mutex> lock(state_.mtx);
state_.has_result = true;
}
TrySchedule();
std::unique_lock<std::mutex> lock(state_.mtx);
state_.has_result = true;
TrySchedule(lock);
}

private:
/// @brief Same as multiple version.
void TrySchedule() {
void TrySchedule(std::unique_lock<std::mutex>& lock) {
if (state_.has_callback) {
lock.unlock();
if (IsReady()) {
if (state_.schedule_flag.test_and_set() == false) {
state_.callback->SetValue(GetValue());
Expand Down
4 changes: 1 addition & 3 deletions trpc/future/future_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,15 +641,13 @@ TEST(Future, ThenCopyableExecutor) {
// Test mltiple threads executor.
TEST(Future, TestExecuteOkInDifferentThread) {
static int exec_count = 0;
int loop_times = 50000;
int loop_times = 500;
for (int i = 0; i < loop_times; i++) {
Promise<int> pr;
auto fut = pr.GetFuture();
std::thread t([pr = std::move(pr)]() mutable {
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
pr.SetValue(1);
});
std::this_thread::sleep_for(std::chrono::nanoseconds(10000));
fut.Then([](int&& val) {
exec_count++;
return MakeReadyFuture<>();
Expand Down
20 changes: 20 additions & 0 deletions trpc/future/future_utility_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,4 +629,24 @@ TEST(BlockingTryGet, test) {
ASSERT_TRUE(TestBlockingTryGet(100, 1000));
}

// Test in the capture promise by reference situation, the promise object is not destroyed during SetValue
TEST(BlockingGet, capture_promise_by_ref) {
constexpr int kMaxLoopTimes = 10000;
std::atomic<int> execute_times = 0;
for (int i = 0; i < kMaxLoopTimes; ++i) {
std::unique_ptr<std::thread> t;
{
trpc::Promise<> pr;
auto fut = pr.GetFuture();
t = std::make_unique<std::thread>([&]() {
pr.SetValue();
execute_times++;
});
future::BlockingGet(std::move(fut));
}
t->join();
}
EXPECT_EQ(execute_times.load(), kMaxLoopTimes);
}

} // namespace trpc
10 changes: 5 additions & 5 deletions trpc/stream/http/async/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Future<> HttpAsyncStream::PushSendMessage(HttpStreamFramePtr&& msg) {
return AsyncWrite(std::move(out));
}

Future<http::HttpHeader> HttpAsyncStream::AsyncReadHeader(int timeout) {
Future<http::HttpHeader> HttpAsyncStream::AsyncReadHeader(uint32_t timeout) {
pending_header_.val = Promise<http::HttpHeader>();

auto ft = pending_header_.val.value().GetFuture();
Expand All @@ -80,7 +80,7 @@ Future<http::HttpHeader> HttpAsyncStream::AsyncReadHeader(int timeout) {
return ft;
}

Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadChunk(int timeout) {
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadChunk(uint32_t timeout) {
// it can read only in chunked mode
if (read_mode_ != DataMode::kChunked) {
Status status{TRPC_STREAM_UNKNOWN_ERR, 0, "Can't read no chunk data"};
Expand All @@ -90,15 +90,15 @@ Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadChunk(int timeout) {
return AsyncReadInner(ReadOperation::kReadChunk, 0, timeout);
}

Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadAtMost(uint64_t len, int timeout) {
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadAtMost(uint64_t len, uint32_t timeout) {
return AsyncReadInner(ReadOperation::kReadAtMost, len, timeout);
}

Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadExactly(uint64_t len, int timeout) {
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadExactly(uint64_t len, uint32_t timeout) {
return AsyncReadInner(ReadOperation::kReadExactly, len, timeout);
}

Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadInner(ReadOperation op, uint64_t len, int timeout) {
Future<NoncontiguousBuffer> HttpAsyncStream::AsyncReadInner(ReadOperation op, uint64_t len, uint32_t timeout) {
if (read_mode_ == DataMode::kNoData) {
// can not read data when content-length equal to 0
return MakeReadyFuture<NoncontiguousBuffer>(NoncontiguousBuffer{});
Expand Down
14 changes: 7 additions & 7 deletions trpc/stream/http/async/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class HttpAsyncStream : public HttpCommonStream {

/// @brief Reads the header asynchronously.
/// @param timeout time to wait for the header to be ready
Future<http::HttpHeader> AsyncReadHeader(int timeout = std::numeric_limits<int>::max());
Future<http::HttpHeader> AsyncReadHeader(uint32_t timeout = std::numeric_limits<uint32_t>::max());

/// @brief Reads a chunk in chunked mode asynchronously, note that reading in non-chunked mode will fail
/// @param timeout time to wait for the header to be ready
Future<NoncontiguousBuffer> AsyncReadChunk(int timeout = std::numeric_limits<int>::max());
Future<NoncontiguousBuffer> AsyncReadChunk(uint32_t timeout = std::numeric_limits<int>::max());

/// @brief Reads at most len data asynchronously.
/// @param len max size to read
Expand All @@ -50,15 +50,15 @@ class HttpAsyncStream : public HttpCommonStream {
/// An empty buffer means that the end has been read
/// Usage scenario 1: Limits the maximum length of each read When the memory is limited.
/// Usage scenario 2: Gets part of data in time and send it downstream on route server.
Future<NoncontiguousBuffer> AsyncReadAtMost(uint64_t len, int timeout = std::numeric_limits<int>::max());
Future<NoncontiguousBuffer> AsyncReadAtMost(uint64_t len, uint32_t timeout = std::numeric_limits<uint32_t>::max());

/// @brief Reads data with a fixed length. If eof is read, it will return as much data as there is in the network
/// @param len size to read
/// @param timeout time to wait for the header to be ready
/// @note If the read buffer size is less than the required length, it means that eof has been read.
/// Usage scenario 1: The requested data is compressed by a fixed size, and needs to be read and decompressed by
/// a fixed size.
Future<NoncontiguousBuffer> AsyncReadExactly(uint64_t len, int timeout = std::numeric_limits<int>::max());
Future<NoncontiguousBuffer> AsyncReadExactly(uint64_t len, uint32_t timeout = std::numeric_limits<uint32_t>::max());

protected:
template <class T>
Expand Down Expand Up @@ -98,7 +98,7 @@ class HttpAsyncStream : public HttpCommonStream {

/// @brief Creates a scheduled waiting task
template <class T>
void CreatePendingTimer(PendingVal<T>* pending, int timeout);
void CreatePendingTimer(PendingVal<T>* pending, uint32_t timeout);

/// @brief Checks the pending state
template <class T>
Expand All @@ -110,7 +110,7 @@ class HttpAsyncStream : public HttpCommonStream {

void NotifyPendingDataQueue();

Future<NoncontiguousBuffer> AsyncReadInner(ReadOperation op, uint64_t len, int timeout);
Future<NoncontiguousBuffer> AsyncReadInner(ReadOperation op, uint64_t len, uint32_t timeout);

protected:
/// @brief Used to store asynchronous data request
Expand Down Expand Up @@ -147,7 +147,7 @@ void HttpAsyncStream::PendingDone(PendingVal<T>* pending) {
}

template <class T>
void HttpAsyncStream::CreatePendingTimer(PendingVal<T>* pending, int timeout) {
void HttpAsyncStream::CreatePendingTimer(PendingVal<T>* pending, uint32_t timeout) {
TRPC_CHECK_EQ(pending->timer_id, iotimer::InvalidID);
pending->timer_id = iotimer::Create(timeout, 0, [this, pending]() {
if (!pending->val) {
Expand Down
Loading