Skip to content

Commit

Permalink
SERVER-30135 Added a synchronous executor to make the code path betwe…
Browse files Browse the repository at this point in the history
…en the two modes similar while still allowing customization in the execution. Should fix some perf regressions that came with unifying the service state machine.
  • Loading branch information
henrikedin committed Sep 22, 2017
1 parent ab7ceed commit 6732fbb
Show file tree
Hide file tree
Showing 21 changed files with 532 additions and 107 deletions.
1 change: 1 addition & 0 deletions buildscripts/cpplint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,7 @@ def make_polyfill_regex():
'lock_guard',
'make_unique',
'mutex',
'notify_all_at_thread_exit',
'packaged_task',
'placeholders',
'promise',
Expand Down
2 changes: 1 addition & 1 deletion jstests/noPassthrough/transportlayer_boot_cmdline.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
{dbpath: dbpath, transportLayer: 'legacy', serviceExecutor: 'adaptive'});
assert.isnull(
m,
'MongoDB with transportLayer=legacy and serviceExecutor=fixedForTesting managed to startup which is an unsupported combination');
'MongoDB with transportLayer=legacy and serviceExecutor=adaptive managed to startup which is an unsupported combination');
if (m) {
MongoRunner.stopMongod(m);
}
Expand Down
17 changes: 8 additions & 9 deletions src/mongo/db/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -814,12 +814,10 @@ ExitCode _initAndListen(int listenPort) {
return EXIT_NET_ERROR;
}

if (globalServiceContext->getServiceExecutor()) {
start = globalServiceContext->getServiceExecutor()->start();
if (!start.isOK()) {
error() << "Failed to start the service executor: " << start;
return EXIT_NET_ERROR;
}
start = globalServiceContext->getServiceExecutor()->start();
if (!start.isOK()) {
error() << "Failed to start the service executor: " << start;
return EXIT_NET_ERROR;
}

globalServiceContext->notifyStartupComplete();
Expand Down Expand Up @@ -1140,9 +1138,10 @@ void shutdownTask() {
}

// Shutdown and wait for the service executor to exit
auto svcExec = serviceContext->getServiceExecutor();
if (svcExec) {
fassertStatusOK(40550, svcExec->shutdown());
Status status = serviceContext->getServiceExecutor()->shutdown();
if (!status.isOK()) {
log(LogComponent::kExecutor) << "shutdown: service executor failed with: "
<< status.reason();
}
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/server_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct ServerGlobalParams {
std::string socket = "/tmp"; // UNIX domain socket directory
std::string transportLayer; // --transportLayer (must be either "asio" or "legacy")

// --serviceExecutor ("adaptive", "synchronous", or "fixedForTesting")
// --serviceExecutor ("adaptive", "synchronous")
std::string serviceExecutor;

size_t maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections.
Expand Down
99 changes: 99 additions & 0 deletions src/mongo/platform/bitwise_enum_operators.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/

#pragma once

#include <type_traits>

namespace mongo {

template <typename Enum>
struct EnableBitMaskOperators {
static const bool enable = false;
};

template <typename Enum>
typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum>::type operator&(Enum lhs,
Enum rhs) {
using underlying = typename std::underlying_type<Enum>::type;
return static_cast<Enum>(static_cast<underlying>(lhs) & static_cast<underlying>(rhs));
}

template <typename Enum>
typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum>::type operator|(Enum lhs,
Enum rhs) {
using underlying = typename std::underlying_type<Enum>::type;
return static_cast<Enum>(static_cast<underlying>(lhs) | static_cast<underlying>(rhs));
}

template <typename Enum>
typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum>::type operator^(Enum lhs,
Enum rhs) {
using underlying = typename std::underlying_type<Enum>::type;
return static_cast<Enum>(static_cast<underlying>(lhs) ^ static_cast<underlying>(rhs));
}

template <typename Enum>
typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum>::type operator~(Enum rhs) {
return static_cast<Enum>(~static_cast<typename std::underlying_type<Enum>::type>(rhs));
}

template <typename Enum>
typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum&>::type operator|=(Enum& lhs,
Enum rhs) {
using underlying = typename std::underlying_type<Enum>::type;
lhs = static_cast<Enum>(static_cast<underlying>(lhs) | static_cast<underlying>(rhs));
return lhs;
}

template <typename Enum>
typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum&>::type operator&=(Enum& lhs,
Enum rhs) {
using underlying = typename std::underlying_type<Enum>::type;
lhs = static_cast<Enum>(static_cast<underlying>(lhs) & static_cast<underlying>(rhs));
return lhs;
}

template <typename Enum>
typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum&>::type operator^=(Enum& lhs,
Enum rhs) {
using underlying = typename std::underlying_type<Enum>::type;
lhs = static_cast<Enum>(static_cast<underlying>(lhs) ^ static_cast<underlying>(rhs));
return lhs;
}

} // namespace mongo

#define ENABLE_BITMASK_OPERATORS(x) \
\
template <> \
\
struct EnableBitMaskOperators<x> { \
static_assert(std::is_enum<typename x>::value, "template parameter is not an enum type"); \
static const bool enable = true; \
};
10 changes: 4 additions & 6 deletions src/mongo/s/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,10 @@ static ExitCode runMongosServer() {
return EXIT_NET_ERROR;
}

if (auto svcExec = getGlobalServiceContext()->getServiceExecutor()) {
start = svcExec->start();
if (!start.isOK()) {
error() << "Failed to start the service executor: " << start;
return EXIT_NET_ERROR;
}
start = getGlobalServiceContext()->getServiceExecutor()->start();
if (!start.isOK()) {
error() << "Failed to start the service executor: " << start;
return EXIT_NET_ERROR;
}

getGlobalServiceContext()->notifyStartupComplete();
Expand Down
1 change: 1 addition & 0 deletions src/mongo/stdx/condition_variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace stdx {
using condition_variable = ::std::condition_variable; // NOLINT
using condition_variable_any = ::std::condition_variable_any; // NOLINT
using cv_status = ::std::cv_status; // NOLINT
using ::std::notify_all_at_thread_exit; // NOLINT

} // namespace stdx
} // namespace mongo
1 change: 1 addition & 0 deletions src/mongo/transport/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ tlEnv.Library(
target='service_executor',
source=[
'service_executor_adaptive.cpp',
'service_executor_synchronous.cpp'
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/service_context',
Expand Down
48 changes: 3 additions & 45 deletions src/mongo/transport/service_entry_point_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include <vector>

#include "mongo/db/auth/restriction_environment.h"
#include "mongo/transport/service_entry_point_utils.h"
#include "mongo/transport/service_state_machine.h"
#include "mongo/transport/session.h"
#include "mongo/util/log.h"
Expand Down Expand Up @@ -85,11 +84,11 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {

SSMListIterator ssmIt;

const auto sync = (_svcCtx->getServiceExecutor() == nullptr);
const bool quiet = serverGlobalParams.quiet.load();
size_t connectionCount;

auto ssm = ServiceStateMachine::create(_svcCtx, session, sync);
auto ssm = ServiceStateMachine::create(
_svcCtx, session, _svcCtx->getServiceExecutor()->transportMode());
{
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
connectionCount = _sessions.size() + 1;
Expand Down Expand Up @@ -129,48 +128,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {

});

if (!sync) {
dassert(_svcCtx->getServiceExecutor());
ssm->scheduleNext();
return;
}

auto workerTask = [this, ssm]() mutable {
_nWorkers.addAndFetch(1);
const auto guard = MakeGuard([this, &ssm] { _nWorkers.subtractAndFetch(1); });

const auto numCores = [] {
ProcessInfo p;
if (auto availCores = p.getNumAvailableCores()) {
return static_cast<unsigned>(*availCores);
}
return static_cast<unsigned>(p.getNumCores());
}();

while (ssm->state() != ServiceStateMachine::State::Ended) {
ssm->runNext();

/*
* In perf testing we found that yielding after running a each request produced
* at 5% performance boost in microbenchmarks if the number of worker threads
* was greater than the number of available cores.
*/
if (_nWorkers.load() > numCores)
stdx::this_thread::yield();
}
};

const auto launchResult = launchServiceWorkerThread(std::move(workerTask));
if (launchResult.isOK()) {
return;
}

// We never got off the ground. Manually remove the new SSM from
// the list of sessions and close the associated socket. The SSM
// will be destroyed.
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
_sessions.erase(ssmIt);
ssm->terminateIfTagsDontMatch(0);
ssm->scheduleNext();
}

void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
Expand Down
14 changes: 14 additions & 0 deletions src/mongo/transport/service_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@

#include "mongo/base/status.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/platform/bitwise_enum_operators.h"
#include "mongo/stdx/functional.h"
#include "mongo/transport/transport_mode.h"

namespace mongo {
// This needs to be forward declared here because the service_context.h is a circular dependency.
Expand All @@ -55,6 +57,10 @@ class ServiceExecutor {

// MayRecurse indicates that a task may be run recursively.
kMayRecurse = 1 << 2,

// MayYieldBeforeSchedule indicates that the executor may yield on the current thread before
// scheduling the task.
kMayYieldBeforeSchedule = 1 << 3,
};

/*
Expand All @@ -81,11 +87,19 @@ class ServiceExecutor {
*/
virtual Status shutdown() = 0;

/*
* Returns if this service executor is using asynchronous or synchronous networking.
*/
virtual Mode transportMode() const = 0;

/*
* Appends statistics about task scheduling to a BSONObjBuilder for serverStatus output.
*/
virtual void appendStats(BSONObjBuilder* bob) const = 0;
};

} // namespace transport

ENABLE_BITMASK_OPERATORS(transport::ServiceExecutor::ScheduleFlags)

} // namespace mongo
4 changes: 4 additions & 0 deletions src/mongo/transport/service_executor_adaptive.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class ServiceExecutorAdaptive : public ServiceExecutor {
Status shutdown() final;
Status schedule(Task task, ScheduleFlags flags) final;

Mode transportMode() const final {
return Mode::kAsynchronous;
}

void appendStats(BSONObjBuilder* bob) const final;

int threadsRunning() {
Expand Down
63 changes: 63 additions & 0 deletions src/mongo/transport/service_executor_noop.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/

#pragma once

#include "mongo/base/status.h"
#include "mongo/transport/service_executor.h"

namespace mongo {
namespace transport {

/**
* The noop service executor provides the necessary interface for some unittests. Doesn't actually
* execute any work
*/
class ServiceExecutorNoop final : public ServiceExecutor {
public:
explicit ServiceExecutorNoop(ServiceContext* ctx) {}

Status start() override {
return Status::OK();
}
Status shutdown() override {
return Status::OK();
}
Status schedule(Task task, ScheduleFlags flags) override {
return Status::OK();
}

Mode transportMode() const override {
return Mode::kSynchronous;
}

void appendStats(BSONObjBuilder* bob) const override {}
};

} // namespace transport
} // namespace mongo
Loading

0 comments on commit 6732fbb

Please sign in to comment.