diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 8349fe45be42d..8e18bfb106a55 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -183,8 +183,9 @@ tlEnv.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/db/dbmessage', - '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/service_context_test_fixture', '$BUILD_DIR/mongo/rpc/protocol', '$BUILD_DIR/mongo/rpc/rpc', '$BUILD_DIR/mongo/unittest/unittest', diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 11282830b3023..3593cc7673ad2 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -33,6 +33,7 @@ #include "mongo/transport/transport_layer_asio.h" +#include #include #include @@ -58,6 +59,7 @@ #include "mongo/util/net/ssl_manager.h" #include "mongo/util/net/ssl_options.h" #include "mongo/util/options_parser/startup_options.h" +#include "mongo/util/strong_weak_finish_line.h" #ifdef MONGO_CONFIG_SSL #include "mongo/util/net/ssl.hpp" @@ -286,6 +288,60 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params) maxConns(params->maxConns) { } +TransportLayerASIO::TimerService::TimerService() + : _reactor(std::make_shared()) {} + +TransportLayerASIO::TimerService::~TimerService() { + stop(); +} + +void TransportLayerASIO::TimerService::start() { + // Skip the expensive lock acquisition and `compareAndSwap` in the common path. + if (MONGO_likely(_state.load() != State::kInitialized)) + return; + + // The following ensures only one thread continues to spawn a thread to run the reactor. It also + // ensures concurrent `start()` and `stop()` invocations are serialized. Holding the lock + // guarantees that the following runs either before or after running `stop()`. Note that using + // `compareAndSwap` while holding the lock is for simplicity and not necessary. + auto lk = stdx::lock_guard(_mutex); + auto precondition = State::kInitialized; + if (_state.compareAndSwap(&precondition, State::kStarted)) { + _thread = stdx::thread([reactor = _reactor] { + LOGV2_INFO(5490002, "Started a new thread for the timer service"); + reactor->run(); + LOGV2_INFO(5490003, "Returning from the timer service thread"); + }); + } +} + +void TransportLayerASIO::TimerService::stop() { + // It's possible for `stop()` to be called without `start()` having been called (or for them to + // be called concurrently), so we only proceed with stopping the reactor and joining the thread + // if we've already transitioned to the `kStarted` state. + auto lk = stdx::lock_guard(_mutex); + if (_state.swap(State::kStopped) != State::kStarted) + return; + + _reactor->stop(); + _thread.join(); +} + +std::unique_ptr TransportLayerASIO::TimerService::makeTimer() { + return _getReactor()->makeTimer(); +} + +Date_t TransportLayerASIO::TimerService::now() { + return _getReactor()->now(); +} + +Reactor* TransportLayerASIO::TimerService::_getReactor() { + // TODO SERVER-57253 We can start this service as part of starting `TransportLayerASIO`. + // Then, we can remove the following invocation of `start()`. + start(); + return _reactor.get(); +} + TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts, ServiceEntryPoint* sep) : _ingressReactor(std::make_shared()), @@ -296,7 +352,8 @@ TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts, _egressSSLContext(nullptr), #endif _sep(sep), - _listenerOptions(opts) { + _listenerOptions(opts), + _timerService(std::make_unique()) { } TransportLayerASIO::~TransportLayerASIO() = default; @@ -524,13 +581,47 @@ StatusWith TransportLayerASIO::connect(HostAndPort peer, (sslMode == kGlobalSSLMode && ((globalSSLMode == SSLParams::SSLMode_preferSSL) || (globalSSLMode == SSLParams::SSLMode_requireSSL)))) { + // The handshake is complete once either of the following passes the finish line: + // - The thread running the handshake returns from `handshakeSSLForEgress`. + // - The thread running `TimerService` cancels the handshake due to a timeout. + auto finishLine = std::make_shared(2); + + // Schedules a task to cancel the synchronous handshake if it does not complete before the + // specified timeout. + auto timer = _timerService->makeTimer(); +#ifndef _WIN32 + // TODO SERVER-62035: enable the following on Windows. + if (timeout > Milliseconds(0)) { + timer->waitUntil(_timerService->now() + timeout) + .getAsync([finishLine, session](Status status) { + if (status.isOK() && finishLine->arriveStrongly()) + session->end(); + }); + } +#endif + Date_t timeBefore = Date_t::now(); auto sslStatus = session->handshakeSSLForEgress(peer).getNoThrow(); Date_t timeAfter = Date_t::now(); + if (timeAfter - timeBefore > kSlowOperationThreshold) { networkCounter.incrementNumSlowSSLOperations(); } + if (finishLine->arriveStrongly()) { + timer->cancel(); + } else if (!sslStatus.isOK()) { + // We only take this path if the handshake times out. Overwrite the socket exception + // with a network timeout. + auto errMsg = fmt::format("SSL handshake timed out after {}", + (timeAfter - timeBefore).toString()); + sslStatus = Status(ErrorCodes::NetworkTimeout, errMsg); + LOGV2(5490001, + "Timed out while running handshake", + "peer"_attr = peer, + "timeout"_attr = timeout); + } + if (!sslStatus.isOK()) { return sslStatus; } @@ -1141,6 +1232,10 @@ void TransportLayerASIO::shutdown() { return; } + lk.unlock(); + _timerService->stop(); + lk.lock(); + if (!_listenerOptions.isIngress()) { // Egress only reactors never start a listener return; diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index f11b0dbc27435..e10d050319ca5 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -117,6 +117,57 @@ class TransportLayerASIO final : public TransportLayer { TransportLayerASIO(const Options& opts, ServiceEntryPoint* sep); + /** + * A service, internal to `TransportLayerASIO`, that allows creating timers and running `Future` + * continuations when a timeout occurs. This allows setting up timeouts for synchronous + * operations, such as a synchronous SSL handshake. A separate thread is assigned to run these + * timers to: + * - Ensure there is always a thread running the timers, regardless of using a synchronous or + * asynchronous listener. + * - Avoid any performance implications on other reactors (e.g., the `egressReactor`). + * The public visibility is only for testing purposes and this service is not intended to be + * used outside `TransportLayerASIO`. + */ + class TimerService { + public: + TimerService(); + ~TimerService(); + + /** + * Spawns a thread to run the reactor. + * Immediately returns if the service has already started. + * May be called more than once, and concurrently. + */ + void start(); + + /** + * Stops the reactor and joins the thread. + * Immediately returns if the service is not started, or already stopped. + * May be called more than once, and concurrently. + */ + void stop(); + + std::unique_ptr makeTimer(); + + Date_t now(); + + private: + Reactor* _getReactor(); + + const std::shared_ptr _reactor; + + // Serializes invocations of `start()` and `stop()`, and allows updating `_state` and + // `_thread` as a single atomic operation. + Mutex _mutex = MONGO_MAKE_LATCH("TransportLayerASIO::TimerService::_mutex"); + + // State transitions: `kInitialized` --> `kStarted` --> `kStopped` + // |_______________________________^ + enum class State { kInitialized, kStarted, kStopped }; + AtomicWord _state; + + stdx::thread _thread; + }; + ~TransportLayerASIO() override; StatusWith connect(HostAndPort peer, @@ -216,6 +267,8 @@ class TransportLayerASIO final : public TransportLayer { int _listenerPort = 0; bool _isShutdown = false; + + const std::unique_ptr _timerService; }; } // namespace transport diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index eafc0fc777b9f..664e9e4df11d2 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -30,6 +30,7 @@ #include "mongo/transport/transport_layer_asio.h" +#include #include #include #include @@ -37,7 +38,10 @@ #include +#include "mongo/client/dbclient_connection.h" +#include "mongo/config.h" #include "mongo/db/server_options.h" +#include "mongo/db/service_context_test_fixture.h" #include "mongo/logv2/log.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/basic.h" @@ -47,6 +51,7 @@ #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/notification.h" #include "mongo/util/net/sock.h" +#include "mongo/util/static_immortal.h" #include "mongo/util/synchronized_value.h" #include "mongo/util/time_support.h" @@ -253,13 +258,29 @@ class MockSEP : public ServiceEntryPoint { synchronized_value>> _sessions; }; +std::unique_ptr makeTLA(ServiceEntryPoint* sep) { + auto options = [] { + ServerGlobalParams params; + params.noUnixSocket = true; + transport::TransportLayerASIO::Options opts(¶ms); + // TODO SERVER-30212 should clean this up and assign a port from the supplied port range + // provided by resmoke. + opts.port = 0; + return opts; + }(); + auto tla = std::make_unique(options, sep); + ASSERT_OK(tla->setup()); + ASSERT_OK(tla->start()); + return tla; +} + /** * Properly setting up and tearing down the MockSEP and TransportLayerASIO is * tricky. Most tests can delegate the details to this TestFixture. */ class TestFixture { public: - TestFixture() : _tla{_makeTLA()} {} + TestFixture() : _tla{makeTLA(&_sep)} {} ~TestFixture() { _sep.endAllSessions({}); @@ -275,22 +296,6 @@ class TestFixture { } private: - std::unique_ptr _makeTLA() { - auto options = [] { - ServerGlobalParams params; - params.noUnixSocket = true; - transport::TransportLayerASIO::Options opts(¶ms); - // TODO SERVER-30212 should clean this up and assign a port from the supplied port range - // provided by resmoke. - opts.port = 0; - return opts; - }(); - auto tla = std::make_unique(options, &_sep); - ASSERT_OK(tla->setup()); - ASSERT_OK(tla->start()); - return tla; - } - MockSEP _sep; std::unique_ptr _tla; }; @@ -373,5 +378,149 @@ TEST(TransportLayerASIO, SwitchTimeoutModes) { } } +class TransportLayerASIOWithServiceContextTest : public ServiceContextTest { +public: + /** + * `ThreadCounter` and `ThreadToken` allow tracking the number of active (running) threads. + * For each thread, a `ThreadToken` is created. The token notifies `ThreadCounter` about + * creation and destruction of its associated thread. This allows maintaining the number of + * active threads at any point during the execution of this unit-test. + */ + class ThreadCounter { + public: + static ThreadCounter& get() { + static StaticImmortal instance; + return *instance; + } + + int64_t count() const { + const auto count = _count.load(); + invariant(count > 0); + return count; + } + + void onCreateThread() { + _count.fetchAndAdd(1); + } + + void onDestroyThread() { + _count.fetchAndAdd(-1); + } + + private: + AtomicWord _count; + }; + + struct ThreadToken { + ThreadToken() { + ThreadCounter::get().onCreateThread(); + } + + ~ThreadToken() { + ThreadCounter::get().onDestroyThread(); + } + }; + + void setUp() override { + auto sep = std::make_unique(); + auto tl = makeTLA(sep.get()); + getServiceContext()->setServiceEntryPoint(std::move(sep)); + getServiceContext()->setTransportLayer(std::move(tl)); + } + + void tearDown() override { + getServiceContext()->getTransportLayer()->shutdown(); + } + + transport::TransportLayerASIO& tla() { + auto tl = getServiceContext()->getTransportLayer(); + return *dynamic_cast(tl); + } +}; + +#if 0 +const auto getThreadToken = + ThreadContext::declareDecoration(); + +TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotSpawnThreadsBeforeStart) { + const auto beforeThreadCount = ThreadCounter::get().count(); + transport::TransportLayerASIO::TimerService service; + // Note that the following is a best-effort and not deterministic as we don't have control over + // when threads may start running and advance the thread count. + const auto afterThreadCount = ThreadCounter::get().count(); + ASSERT_EQ(beforeThreadCount, afterThreadCount); +} + +TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceOneShotStart) { + const auto beforeThreadCount = ThreadCounter::get().count(); + transport::TransportLayerASIO::TimerService service; + service.start(); + LOGV2(5490004, "Waiting for the timer thread to start", "threads"_attr = beforeThreadCount); + while (ThreadCounter::get().count() == beforeThreadCount) { + sleepFor(Milliseconds(1)); + } + const auto afterThreadCount = ThreadCounter::get().count(); + LOGV2(5490005, "Returned from waiting for the timer thread", "threads"_attr = afterThreadCount); + + // Start the service a few times and verify that the thread count has not changed. Note that the + // following is a best-effort and not deterministic as we don't have control over when threads + // may start running and advance the thread count. + service.start(); + service.start(); + service.start(); + ASSERT_EQ(afterThreadCount, ThreadCounter::get().count()); +} + +TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotStartAfterStop) { + const auto beforeThreadCount = ThreadCounter::get().count(); + transport::TransportLayerASIO::TimerService service; + service.stop(); + service.start(); + const auto afterThreadCount = ThreadCounter::get().count(); + // The test would fail if `start` proceeds to spawn a thread for `service`. + ASSERT_EQ(beforeThreadCount, afterThreadCount); +} + +TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceCanStopMoreThanOnce) { + // Verifying that it is safe to have multiple calls to `stop()`. + { + transport::TransportLayerASIO::TimerService service; + service.start(); + service.stop(); + service.stop(); + } + { + transport::TransportLayerASIO::TimerService service; + service.stop(); + service.stop(); + } +} +#endif + +#ifdef MONGO_CONFIG_SSL +#ifndef _WIN32 +// TODO SERVER-62035: enable the following on Windows. +TEST_F(TransportLayerASIOWithServiceContextTest, ShutdownDuringSSLHandshake) { + /** + * Creates a server and a client thread: + * - The server listens for incoming connections, but doesn't participate in SSL handshake. + * - The client connects to the server, and is configured to perform SSL handshake. + * The server never writes on the socket in response to the handshake request, thus the client + * should block until it is timed out. + * The goal is to simulate a server crash, and verify the behavior of the client, during the + * handshake process. + */ + int port = tla().listenerPort(); + + auto uri = uassertStatusOK(MongoURI::parse("mongodb://localhost/?ssl=true")); + DBClientConnection conn(false, 1 /* this is ignored */, std::move(uri)); + conn.setSoTimeout(1); // 1 second timeout + + auto status = conn.connectSocketOnly({"localhost", port}); + ASSERT_EQ(status, ErrorCodes::HostUnreachable); +} +#endif // _WIN32 +#endif // MONGO_CONFIG_SSL + } // namespace } // namespace mongo