diff --git a/trpc/compressor/lz4/lz4_util.cc b/trpc/compressor/lz4/lz4_util.cc index fd17e27d..5827cb0a 100644 --- a/trpc/compressor/lz4/lz4_util.cc +++ b/trpc/compressor/lz4/lz4_util.cc @@ -147,7 +147,7 @@ bool DoCompress(LZ4F_compressionContext_t& ctx, const NoncontiguousBuffer& in, N TRPC_FMT_ERROR("CompressedToOutputStream error, compressed_size={}", compressed_size); return false; } - left_to_copy = -current_size; + left_to_copy -= current_size; current_pos += current_size; } } diff --git a/trpc/runtime/iomodel/reactor/fiber/BUILD b/trpc/runtime/iomodel/reactor/fiber/BUILD index 489475f8..c5293365 100644 --- a/trpc/runtime/iomodel/reactor/fiber/BUILD +++ b/trpc/runtime/iomodel/reactor/fiber/BUILD @@ -150,6 +150,7 @@ cc_test( "//trpc/runtime:fiber_runtime", "//trpc/runtime/iomodel/reactor/common:default_io_handler", "//trpc/util:latch", + "//trpc/util:time", "@com_google_googletest//:gtest", "@com_google_googletest//:gtest_main", ], diff --git a/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc b/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc index 07635a08..2c79f9e3 100644 --- a/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc +++ b/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc @@ -36,9 +36,13 @@ FiberTcpConnection::~FiberTcpConnection() { } void FiberTcpConnection::Established() { - SetEstablishTimestamp(trpc::time::GetMilliSeconds()); - SetConnectionState(ConnectionState::kConnected); - SetConnActiveTime(trpc::time::GetMilliSeconds()); + if (IsClient()) { + // The server's connection status are set separately before calling Established + auto now_ms = trpc::time::GetMilliSeconds(); + SetEstablishTimestamp(now_ms); + SetConnectionState(ConnectionState::kConnected); + SetConnActiveTime(now_ms); + } TRPC_ASSERT(GetIoHandler()); TRPC_ASSERT(GetConnectionHandler()); diff --git a/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection_test.cc b/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection_test.cc index d7c3cd92..51a64d98 100644 --- a/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection_test.cc +++ b/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection_test.cc @@ -28,6 +28,7 @@ #include "trpc/runtime/iomodel/reactor/fiber/fiber_reactor.h" #include "trpc/util/latch.h" #include "trpc/util/net_util.h" +#include "trpc/util/time.h" namespace trpc { @@ -105,6 +106,10 @@ class FiberTcpConnectionTestImpl { auto io_handle = std::make_unique(server_conn_.Get()); server_conn_->SetIoHandler(std::move(io_handle)); + auto now_ms = trpc::time::GetMilliSeconds(); + server_conn_->SetEstablishTimestamp(now_ms); + server_conn_->SetConnectionState(ConnectionState::kConnected); + server_conn_->SetConnActiveTime(now_ms); server_conn_->Established(); server_conn_->StartHandshaking(); diff --git a/trpc/tools/gdb_plugin/gdb_fiber_plugin.py b/trpc/tools/gdb_plugin/gdb_fiber_plugin.py index 3807598c..7f369f21 100644 --- a/trpc/tools/gdb_plugin/gdb_fiber_plugin.py +++ b/trpc/tools/gdb_plugin/gdb_fiber_plugin.py @@ -287,8 +287,13 @@ def _get_memory_maps_in_core_dump(): break # TODO(luobogao): Parse executable sections. splited = seg.split() - start = int(splited[0], 16) - end = int(splited[2], 16) + try: + start = int(splited[0], 16) + end = int(splited[2], 16) + except ValueError: + # May encounter data like: While running this, GDB does not access memory + #skip them + break objfile = splited[-1] if not splited[-1].startswith('load') else '' segs.append(MemorySegment(start, end, objfile)) return segs diff --git a/trpc/transport/server/fiber/fiber_bind_adapter.cc b/trpc/transport/server/fiber/fiber_bind_adapter.cc index 8d17468d..363dbce7 100644 --- a/trpc/transport/server/fiber/fiber_bind_adapter.cc +++ b/trpc/transport/server/fiber/fiber_bind_adapter.cc @@ -304,8 +304,8 @@ uint64_t FiberBindAdapter::GenConnectionId() { return (static_cast(scheduling_group_index_) << 32) | ++conn_id; } -void FiberBindAdapter::AddConnection(RefPtr&& conn) { - connection_manager_.Add(conn->GetConnId(), std::move(conn)); +void FiberBindAdapter::AddConnection(const RefPtr& conn) { + connection_manager_.Add(conn->GetConnId(), conn); } RefPtr FiberBindAdapter::GetConnection(uint64_t conn_id) { @@ -410,7 +410,7 @@ void FiberBindAdapter::DoClose(const CloseConnectionInfo& close_connection_info) ", conn.ip:" << fiber_conn->GetPeerIp() << ", info.ip:" << close_connection_info.client_ip << ", conn.port:" << fiber_conn->GetPeerPort() << ", info.port:" << close_connection_info.client_port << ", conn.fd:" << fiber_conn->GetFd() << ", info.fd:" << close_connection_info.fd); - connection_manager_.Add(fiber_conn->GetConnId(), std::move(fiber_conn)); + connection_manager_.Add(fiber_conn->GetConnId(), fiber_conn); } } diff --git a/trpc/transport/server/fiber/fiber_bind_adapter.h b/trpc/transport/server/fiber/fiber_bind_adapter.h index d85539a9..cdb82051 100644 --- a/trpc/transport/server/fiber/fiber_bind_adapter.h +++ b/trpc/transport/server/fiber/fiber_bind_adapter.h @@ -50,7 +50,7 @@ class FiberBindAdapter : public RefCounted { uint64_t GenConnectionId(); - void AddConnection(RefPtr&& conn); + void AddConnection(const RefPtr& conn); void UpdateConnection(Connection* conn) {} diff --git a/trpc/transport/server/fiber/fiber_connection_manager.cc b/trpc/transport/server/fiber/fiber_connection_manager.cc index a2371853..383ac661 100644 --- a/trpc/transport/server/fiber/fiber_connection_manager.cc +++ b/trpc/transport/server/fiber/fiber_connection_manager.cc @@ -37,11 +37,11 @@ FiberConnectionManager::~FiberConnectionManager() { } } -void FiberConnectionManager::Add(uint64_t conn_id, RefPtr&& conn) { +void FiberConnectionManager::Add(uint64_t conn_id, const RefPtr& conn) { auto&& shard = conn_shards_[GetHashIndex(conn_id, kShards)]; std::scoped_lock _(shard.lock); - auto&& [it, inserted] = shard.map.emplace(conn_id, std::move(conn)); + auto&& [it, inserted] = shard.map.insert(std::make_pair(conn_id, conn)); (void)it; // Suppresses compilation warnings. TRPC_ASSERT(inserted && "insert FiberConnectionManager with Duplicate conn_id"); diff --git a/trpc/transport/server/fiber/fiber_connection_manager.h b/trpc/transport/server/fiber/fiber_connection_manager.h index b9bd18d6..112693fe 100644 --- a/trpc/transport/server/fiber/fiber_connection_manager.h +++ b/trpc/transport/server/fiber/fiber_connection_manager.h @@ -30,7 +30,7 @@ class FiberConnectionManager { ~FiberConnectionManager(); - void Add(uint64_t conn_id, RefPtr&& conn); + void Add(uint64_t conn_id, const RefPtr& conn); RefPtr Del(uint64_t conn_id); diff --git a/trpc/transport/server/fiber/fiber_server_transport_impl.cc b/trpc/transport/server/fiber/fiber_server_transport_impl.cc index 4651f846..88cadec4 100644 --- a/trpc/transport/server/fiber/fiber_server_transport_impl.cc +++ b/trpc/transport/server/fiber/fiber_server_transport_impl.cc @@ -27,6 +27,7 @@ #include "trpc/transport/server/fiber/fiber_server_connection_handler_factory.h" #include "trpc/util/log/logging.h" #include "trpc/util/random.h" +#include "trpc/util/time.h" namespace trpc { @@ -159,12 +160,18 @@ bool FiberServerTransportImpl::AcceptConnection(AcceptConnectionInfo& connection conn_handler->Init(); conn->SetConnectionHandler(std::move(conn_handler)); + // The connection active time and status must be updated before executing AddConnection to avoid + // the newly created connection being cleared as an idle connection, causing the assertion to fail. + auto now_ms = trpc::time::GetMilliSeconds(); + conn->SetEstablishTimestamp(now_ms); + conn->SetConnectionState(ConnectionState::kConnected); + conn->SetConnActiveTime(now_ms); + + bind_adapters_[scheduling_group_index]->AddConnection(conn); conn->Established(); conn->StartHandshaking(); - bind_adapters_[scheduling_group_index]->AddConnection(std::move(conn)); - FrameStats::GetInstance()->GetServerStats().AddConnCount(1); return true; diff --git a/trpc/util/queue/lockfree_queue.h b/trpc/util/queue/lockfree_queue.h index 894fc7e0..3fbd7f86 100644 --- a/trpc/util/queue/lockfree_queue.h +++ b/trpc/util/queue/lockfree_queue.h @@ -97,11 +97,12 @@ class alignas(64) LockFreeQueue { } } + // add count_ before store data to avoid Size becomes negative when dequeue execute more faster + count_.fetch_add(1, std::memory_order_release); + elem->data = val; elem->sequence.store(pos + 1, std::memory_order_release); - count_.fetch_add(1, std::memory_order_release); - return RT_OK; } @@ -130,17 +131,17 @@ class alignas(64) LockFreeQueue { } } + count_.fetch_sub(1, std::memory_order_release); + val = elem->data; elem->sequence.store(pos + mask_ + 1, std::memory_order_release); - count_.fetch_sub(1, std::memory_order_release); - return RT_OK; } /// @brief The size of the queue - /// @return uint32_t - uint32_t Size() const { return count_.load(std::memory_order_relaxed); } + /// @return int + int Size() const { return count_.load(std::memory_order_relaxed); } /// @brief The capacity of the queue /// @return uint32_t