Skip to content

Commit

Permalink
SERVER-54900 Cancel ASIO session when SSL handshake times out
Browse files Browse the repository at this point in the history
(cherry picked from commit 263c063)
  • Loading branch information
samanca authored and Evergreen Agent committed Jun 8, 2022
1 parent 1e4c6f2 commit 65134ea
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 19 deletions.
3 changes: 2 additions & 1 deletion src/mongo/transport/SConscript
Expand Up @@ -183,8 +183,9 @@ tlEnv.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/dbmessage',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
'$BUILD_DIR/mongo/rpc/protocol',
'$BUILD_DIR/mongo/rpc/rpc',
'$BUILD_DIR/mongo/unittest/unittest',
Expand Down
97 changes: 96 additions & 1 deletion src/mongo/transport/transport_layer_asio.cpp
Expand Up @@ -33,6 +33,7 @@

#include "mongo/transport/transport_layer_asio.h"

#include <fmt/format.h>
#include <fstream>

#include <asio.hpp>
Expand All @@ -58,6 +59,7 @@
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/net/ssl_options.h"
#include "mongo/util/options_parser/startup_options.h"
#include "mongo/util/strong_weak_finish_line.h"

#ifdef MONGO_CONFIG_SSL
#include "mongo/util/net/ssl.hpp"
Expand Down Expand Up @@ -286,6 +288,60 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params)
maxConns(params->maxConns) {
}

TransportLayerASIO::TimerService::TimerService()
: _reactor(std::make_shared<TransportLayerASIO::ASIOReactor>()) {}

TransportLayerASIO::TimerService::~TimerService() {
stop();
}

void TransportLayerASIO::TimerService::start() {
// Skip the expensive lock acquisition and `compareAndSwap` in the common path.
if (MONGO_likely(_state.load() != State::kInitialized))
return;

// The following ensures only one thread continues to spawn a thread to run the reactor. It also
// ensures concurrent `start()` and `stop()` invocations are serialized. Holding the lock
// guarantees that the following runs either before or after running `stop()`. Note that using
// `compareAndSwap` while holding the lock is for simplicity and not necessary.
auto lk = stdx::lock_guard(_mutex);
auto precondition = State::kInitialized;
if (_state.compareAndSwap(&precondition, State::kStarted)) {
_thread = stdx::thread([reactor = _reactor] {
LOGV2_INFO(5490002, "Started a new thread for the timer service");
reactor->run();
LOGV2_INFO(5490003, "Returning from the timer service thread");
});
}
}

void TransportLayerASIO::TimerService::stop() {
// It's possible for `stop()` to be called without `start()` having been called (or for them to
// be called concurrently), so we only proceed with stopping the reactor and joining the thread
// if we've already transitioned to the `kStarted` state.
auto lk = stdx::lock_guard(_mutex);
if (_state.swap(State::kStopped) != State::kStarted)
return;

_reactor->stop();
_thread.join();
}

std::unique_ptr<ReactorTimer> TransportLayerASIO::TimerService::makeTimer() {
return _getReactor()->makeTimer();
}

Date_t TransportLayerASIO::TimerService::now() {
return _getReactor()->now();
}

Reactor* TransportLayerASIO::TimerService::_getReactor() {
// TODO SERVER-57253 We can start this service as part of starting `TransportLayerASIO`.
// Then, we can remove the following invocation of `start()`.
start();
return _reactor.get();
}

TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
ServiceEntryPoint* sep)
: _ingressReactor(std::make_shared<ASIOReactor>()),
Expand All @@ -296,7 +352,8 @@ TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
_egressSSLContext(nullptr),
#endif
_sep(sep),
_listenerOptions(opts) {
_listenerOptions(opts),
_timerService(std::make_unique<TimerService>()) {
}

TransportLayerASIO::~TransportLayerASIO() = default;
Expand Down Expand Up @@ -524,13 +581,47 @@ StatusWith<SessionHandle> TransportLayerASIO::connect(HostAndPort peer,
(sslMode == kGlobalSSLMode &&
((globalSSLMode == SSLParams::SSLMode_preferSSL) ||
(globalSSLMode == SSLParams::SSLMode_requireSSL)))) {
// The handshake is complete once either of the following passes the finish line:
// - The thread running the handshake returns from `handshakeSSLForEgress`.
// - The thread running `TimerService` cancels the handshake due to a timeout.
auto finishLine = std::make_shared<StrongWeakFinishLine>(2);

// Schedules a task to cancel the synchronous handshake if it does not complete before the
// specified timeout.
auto timer = _timerService->makeTimer();
#ifndef _WIN32
// TODO SERVER-62035: enable the following on Windows.
if (timeout > Milliseconds(0)) {
timer->waitUntil(_timerService->now() + timeout)
.getAsync([finishLine, session](Status status) {
if (status.isOK() && finishLine->arriveStrongly())
session->end();
});
}
#endif

Date_t timeBefore = Date_t::now();
auto sslStatus = session->handshakeSSLForEgress(peer).getNoThrow();
Date_t timeAfter = Date_t::now();

if (timeAfter - timeBefore > kSlowOperationThreshold) {
networkCounter.incrementNumSlowSSLOperations();
}

if (finishLine->arriveStrongly()) {
timer->cancel();
} else if (!sslStatus.isOK()) {
// We only take this path if the handshake times out. Overwrite the socket exception
// with a network timeout.
auto errMsg = fmt::format("SSL handshake timed out after {}",
(timeAfter - timeBefore).toString());
sslStatus = Status(ErrorCodes::NetworkTimeout, errMsg);
LOGV2(5490001,
"Timed out while running handshake",
"peer"_attr = peer,
"timeout"_attr = timeout);
}

if (!sslStatus.isOK()) {
return sslStatus;
}
Expand Down Expand Up @@ -1141,6 +1232,10 @@ void TransportLayerASIO::shutdown() {
return;
}

lk.unlock();
_timerService->stop();
lk.lock();

if (!_listenerOptions.isIngress()) {
// Egress only reactors never start a listener
return;
Expand Down
53 changes: 53 additions & 0 deletions src/mongo/transport/transport_layer_asio.h
Expand Up @@ -117,6 +117,57 @@ class TransportLayerASIO final : public TransportLayer {

TransportLayerASIO(const Options& opts, ServiceEntryPoint* sep);

/**
* A service, internal to `TransportLayerASIO`, that allows creating timers and running `Future`
* continuations when a timeout occurs. This allows setting up timeouts for synchronous
* operations, such as a synchronous SSL handshake. A separate thread is assigned to run these
* timers to:
* - Ensure there is always a thread running the timers, regardless of using a synchronous or
* asynchronous listener.
* - Avoid any performance implications on other reactors (e.g., the `egressReactor`).
* The public visibility is only for testing purposes and this service is not intended to be
* used outside `TransportLayerASIO`.
*/
class TimerService {
public:
TimerService();
~TimerService();

/**
* Spawns a thread to run the reactor.
* Immediately returns if the service has already started.
* May be called more than once, and concurrently.
*/
void start();

/**
* Stops the reactor and joins the thread.
* Immediately returns if the service is not started, or already stopped.
* May be called more than once, and concurrently.
*/
void stop();

std::unique_ptr<ReactorTimer> makeTimer();

Date_t now();

private:
Reactor* _getReactor();

const std::shared_ptr<Reactor> _reactor;

// Serializes invocations of `start()` and `stop()`, and allows updating `_state` and
// `_thread` as a single atomic operation.
Mutex _mutex = MONGO_MAKE_LATCH("TransportLayerASIO::TimerService::_mutex");

// State transitions: `kInitialized` --> `kStarted` --> `kStopped`
// |_______________________________^
enum class State { kInitialized, kStarted, kStopped };
AtomicWord<State> _state;

stdx::thread _thread;
};

~TransportLayerASIO() override;

StatusWith<SessionHandle> connect(HostAndPort peer,
Expand Down Expand Up @@ -216,6 +267,8 @@ class TransportLayerASIO final : public TransportLayer {
int _listenerPort = 0;

bool _isShutdown = false;

const std::unique_ptr<TimerService> _timerService;
};

} // namespace transport
Expand Down

0 comments on commit 65134ea

Please sign in to comment.