Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion trpc/compressor/lz4/lz4_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ bool DoCompress(LZ4F_compressionContext_t& ctx, const NoncontiguousBuffer& in, N
TRPC_FMT_ERROR("CompressedToOutputStream error, compressed_size={}", compressed_size);
return false;
}
left_to_copy = -current_size;
left_to_copy -= current_size;
current_pos += current_size;
}
}
Expand Down
1 change: 1 addition & 0 deletions trpc/runtime/iomodel/reactor/fiber/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ cc_test(
"//trpc/runtime:fiber_runtime",
"//trpc/runtime/iomodel/reactor/common:default_io_handler",
"//trpc/util:latch",
"//trpc/util:time",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
Expand Down
10 changes: 7 additions & 3 deletions trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ FiberTcpConnection::~FiberTcpConnection() {
}

void FiberTcpConnection::Established() {
SetEstablishTimestamp(trpc::time::GetMilliSeconds());
SetConnectionState(ConnectionState::kConnected);
SetConnActiveTime(trpc::time::GetMilliSeconds());
if (IsClient()) {
// The server's connection status are set separately before calling Established
auto now_ms = trpc::time::GetMilliSeconds();
SetEstablishTimestamp(now_ms);
SetConnectionState(ConnectionState::kConnected);
SetConnActiveTime(now_ms);
}

TRPC_ASSERT(GetIoHandler());
TRPC_ASSERT(GetConnectionHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "trpc/runtime/iomodel/reactor/fiber/fiber_reactor.h"
#include "trpc/util/latch.h"
#include "trpc/util/net_util.h"
#include "trpc/util/time.h"

namespace trpc {

Expand Down Expand Up @@ -105,6 +106,10 @@ class FiberTcpConnectionTestImpl {
auto io_handle = std::make_unique<DefaultIoHandler>(server_conn_.Get());
server_conn_->SetIoHandler(std::move(io_handle));

auto now_ms = trpc::time::GetMilliSeconds();
server_conn_->SetEstablishTimestamp(now_ms);
server_conn_->SetConnectionState(ConnectionState::kConnected);
server_conn_->SetConnActiveTime(now_ms);
server_conn_->Established();
server_conn_->StartHandshaking();

Expand Down
9 changes: 7 additions & 2 deletions trpc/tools/gdb_plugin/gdb_fiber_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,13 @@ def _get_memory_maps_in_core_dump():
break # TODO(luobogao): Parse executable sections.

splited = seg.split()
start = int(splited[0], 16)
end = int(splited[2], 16)
try:
start = int(splited[0], 16)
end = int(splited[2], 16)
except ValueError:
# May encounter data like: While running this, GDB does not access memory
#skip them
break
objfile = splited[-1] if not splited[-1].startswith('load') else ''
segs.append(MemorySegment(start, end, objfile))
return segs
Expand Down
6 changes: 3 additions & 3 deletions trpc/transport/server/fiber/fiber_bind_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ uint64_t FiberBindAdapter::GenConnectionId() {
return (static_cast<uint64_t>(scheduling_group_index_) << 32) | ++conn_id;
}

void FiberBindAdapter::AddConnection(RefPtr<FiberTcpConnection>&& conn) {
connection_manager_.Add(conn->GetConnId(), std::move(conn));
void FiberBindAdapter::AddConnection(const RefPtr<FiberTcpConnection>& conn) {
connection_manager_.Add(conn->GetConnId(), conn);
}

RefPtr<FiberTcpConnection> FiberBindAdapter::GetConnection(uint64_t conn_id) {
Expand Down Expand Up @@ -410,7 +410,7 @@ void FiberBindAdapter::DoClose(const CloseConnectionInfo& close_connection_info)
", conn.ip:" << fiber_conn->GetPeerIp() << ", info.ip:" << close_connection_info.client_ip <<
", conn.port:" << fiber_conn->GetPeerPort() << ", info.port:" << close_connection_info.client_port <<
", conn.fd:" << fiber_conn->GetFd() << ", info.fd:" << close_connection_info.fd);
connection_manager_.Add(fiber_conn->GetConnId(), std::move(fiber_conn));
connection_manager_.Add(fiber_conn->GetConnId(), fiber_conn);
}
}

Expand Down
2 changes: 1 addition & 1 deletion trpc/transport/server/fiber/fiber_bind_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class FiberBindAdapter : public RefCounted<FiberBindAdapter> {

uint64_t GenConnectionId();

void AddConnection(RefPtr<FiberTcpConnection>&& conn);
void AddConnection(const RefPtr<FiberTcpConnection>& conn);

void UpdateConnection(Connection* conn) {}

Expand Down
4 changes: 2 additions & 2 deletions trpc/transport/server/fiber/fiber_connection_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ FiberConnectionManager::~FiberConnectionManager() {
}
}

void FiberConnectionManager::Add(uint64_t conn_id, RefPtr<FiberTcpConnection>&& conn) {
void FiberConnectionManager::Add(uint64_t conn_id, const RefPtr<FiberTcpConnection>& conn) {
auto&& shard = conn_shards_[GetHashIndex(conn_id, kShards)];

std::scoped_lock _(shard.lock);
auto&& [it, inserted] = shard.map.emplace(conn_id, std::move(conn));
auto&& [it, inserted] = shard.map.insert(std::make_pair(conn_id, conn));
(void)it; // Suppresses compilation warnings.

TRPC_ASSERT(inserted && "insert FiberConnectionManager with Duplicate conn_id");
Expand Down
2 changes: 1 addition & 1 deletion trpc/transport/server/fiber/fiber_connection_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FiberConnectionManager {

~FiberConnectionManager();

void Add(uint64_t conn_id, RefPtr<FiberTcpConnection>&& conn);
void Add(uint64_t conn_id, const RefPtr<FiberTcpConnection>& conn);

RefPtr<FiberTcpConnection> Del(uint64_t conn_id);

Expand Down
11 changes: 9 additions & 2 deletions trpc/transport/server/fiber/fiber_server_transport_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "trpc/transport/server/fiber/fiber_server_connection_handler_factory.h"
#include "trpc/util/log/logging.h"
#include "trpc/util/random.h"
#include "trpc/util/time.h"

namespace trpc {

Expand Down Expand Up @@ -159,12 +160,18 @@ bool FiberServerTransportImpl::AcceptConnection(AcceptConnectionInfo& connection
conn_handler->Init();

conn->SetConnectionHandler(std::move(conn_handler));
// The connection active time and status must be updated before executing AddConnection to avoid
// the newly created connection being cleared as an idle connection, causing the assertion to fail.
auto now_ms = trpc::time::GetMilliSeconds();
conn->SetEstablishTimestamp(now_ms);
conn->SetConnectionState(ConnectionState::kConnected);
conn->SetConnActiveTime(now_ms);

bind_adapters_[scheduling_group_index]->AddConnection(conn);

conn->Established();
conn->StartHandshaking();

bind_adapters_[scheduling_group_index]->AddConnection(std::move(conn));

FrameStats::GetInstance()->GetServerStats().AddConnCount(1);

return true;
Expand Down
13 changes: 7 additions & 6 deletions trpc/util/queue/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ class alignas(64) LockFreeQueue {
}
}

// add count_ before store data to avoid Size becomes negative when dequeue execute more faster
count_.fetch_add(1, std::memory_order_release);

elem->data = val;
elem->sequence.store(pos + 1, std::memory_order_release);

count_.fetch_add(1, std::memory_order_release);

return RT_OK;
}

Expand Down Expand Up @@ -130,17 +131,17 @@ class alignas(64) LockFreeQueue {
}
}

count_.fetch_sub(1, std::memory_order_release);

val = elem->data;
elem->sequence.store(pos + mask_ + 1, std::memory_order_release);

count_.fetch_sub(1, std::memory_order_release);

return RT_OK;
}

/// @brief The size of the queue
/// @return uint32_t
uint32_t Size() const { return count_.load(std::memory_order_relaxed); }
/// @return int
int Size() const { return count_.load(std::memory_order_relaxed); }

/// @brief The capacity of the queue
/// @return uint32_t
Expand Down