Skip to content
Permalink
Browse files

SERVER-30136 Move session stats tracking to the service entry point. …

…At the same time, unify logging of connection accepted/refused/closed in this class too to make the transport layers cleaner.
  • Loading branch information
henrikedin committed Jul 27, 2017
1 parent f1bf0b3 commit 699f4020febee8b8bf0e3cbc056c01776b52a238
@@ -96,6 +96,14 @@ class DummyServiceEntryPoint : public ServiceEntryPoint {
MONGO_UNREACHABLE;
}

Stats sessionStats() const override {
return {};
}

size_t numOpenSessions() const override {
return 0ULL;
}

void setReplyDelay(Milliseconds delay) {
_replyDelay = delay;
}
@@ -48,7 +48,7 @@
#include "mongo/db/stats/counters.h"
#include "mongo/platform/process_id.h"
#include "mongo/transport/message_compressor_registry.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/util/log.h"
#include "mongo/util/net/hostname_canonicalization.h"
#include "mongo/util/net/sock.h"
@@ -233,10 +233,11 @@ class Connections : public ServerStatusSection {

BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const {
BSONObjBuilder bb;
if (!opCtx->getServiceContext()->getTransportLayer()) {
return bb.obj();
}
auto stats = opCtx->getServiceContext()->getTransportLayer()->sessionStats();

auto serviceEntryPoint = opCtx->getServiceContext()->getServiceEntryPoint();
invariant(serviceEntryPoint);

auto stats = serviceEntryPoint->sessionStats();
bb.append("current", static_cast<int>(stats.numOpenSessions));
bb.append("available", static_cast<int>(stats.numAvailableSessions));
bb.append("totalCreated", static_cast<int>(stats.numCreatedSessions));
@@ -1073,7 +1073,7 @@ void shutdownTask() {
}

#if __has_feature(address_sanitizer)
auto sep = checked_cast<ServiceEntryPointImpl*>(serviceContext->getServiceEntryPoint());
auto sep = serviceContext->getServiceEntryPoint();
auto tl = serviceContext->getTransportLayer();
if (sep && tl) {
// When running under address sanitizer, we get false positive leaks due to disorder around
@@ -1097,7 +1097,7 @@ void shutdownTask() {
// There isn't currently a way to wait on the TicketHolder to have all its tickets back,
// unfortunately. So, busy wait in this detached thread.
while (true) {
const auto runningWorkers = sep->getNumberOfConnections();
const auto runningWorkers = sep->numOpenSessions();

if (runningWorkers == 0) {
log(LogComponent::kNetwork) << "shutdown: no running workers found...";
@@ -35,7 +35,7 @@
namespace mongo {

const int DEFAULT_UNIX_PERMS = 0700;
constexpr auto DEFAULT_MAX_CONN = 1000000;
constexpr size_t DEFAULT_MAX_CONN = 1000000;

enum class ClusterRole { None, ShardServer, ConfigServer };

@@ -79,7 +79,7 @@ struct ServerGlobalParams {
// --serviceExecutor ("adaptive", "synchronous", or "fixedForTesting")
std::string serviceExecutor;

int maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections.
size_t maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections.

int unixSocketPermissions = DEFAULT_UNIX_PERMS; // permissions for the UNIX domain socket

@@ -45,6 +45,29 @@ class ServiceEntryPoint {
MONGO_DISALLOW_COPYING(ServiceEntryPoint);

public:
/**
* Stats for sessions open.
*/
struct Stats {
/**
* Returns the number of sessions currently open.
*/
size_t numOpenSessions = 0;

/**
* Returns the total number of sessions that have ever been created.
*/
size_t numCreatedSessions = 0;

/**
* Returns the number of available sessions we could still open. Only relevant
* when we are operating under a transport::Session limit (for example, in the
* legacy implementation, we respect a maximum number of connections). If there
* is no session limit, returns std::numeric_limits<int>::max().
*/
size_t numAvailableSessions = 0;
};

virtual ~ServiceEntryPoint() = default;

/**
@@ -57,6 +80,16 @@ class ServiceEntryPoint {
*/
virtual void endAllSessions(transport::Session::TagMask tags) = 0;

/**
* Returns high-level stats about current sessions.
*/
virtual Stats sessionStats() const = 0;

/**
* Returns the number of sessions currently open.
*/
virtual size_t numOpenSessions() const = 0;

/**
* Processes a request and fills out a DbResponse.
*/
@@ -42,7 +42,37 @@
#include "mongo/util/processinfo.h"
#include "mongo/util/scopeguard.h"

#if !defined(_WIN32)
#include <sys/resource.h>
#endif

namespace mongo {
ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) {

const auto supportedMax = [] {
#ifdef _WIN32
return serverGlobalParams.maxConns;
#else
struct rlimit limit;
verify(getrlimit(RLIMIT_NOFILE, &limit) == 0);

size_t max = (size_t)(limit.rlim_cur * .8);

LOG(1) << "fd limit"
<< " hard:" << limit.rlim_max << " soft:" << limit.rlim_cur << " max conn: " << max;

return std::min(max, serverGlobalParams.maxConns);
#endif
}();

// If we asked for more connections than supported, inform the user.
if (supportedMax < serverGlobalParams.maxConns &&
serverGlobalParams.maxConns != DEFAULT_MAX_CONN) {
log() << " --maxConns too high, can only handle " << supportedMax;
}

_maxNumConnections = supportedMax;
}

void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
// Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs
@@ -56,15 +86,47 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
SSMListIterator ssmIt;

const auto sync = (_svcCtx->getServiceExecutor() == nullptr);
auto ssm = ServiceStateMachine::create(_svcCtx, std::move(session), sync);
const bool quiet = serverGlobalParams.quiet.load();
size_t connectionCount;

auto ssm = ServiceStateMachine::create(_svcCtx, session, sync);
{
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
ssmIt = _sessions.emplace(_sessions.begin(), ssm);
connectionCount = _sessions.size() + 1;
if (connectionCount <= _maxNumConnections) {
ssmIt = _sessions.emplace(_sessions.begin(), ssm);
_currentConnections.store(connectionCount);
_createdConnections.addAndFetch(1);
}
}

ssm->setCleanupHook([this, ssmIt] {
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
_sessions.erase(ssmIt);
// Checking if we successfully added a connection above. Separated from the lock so we don't log
// while holding it.
if (connectionCount > _maxNumConnections) {
if (!quiet) {
log() << "connection refused because too many open connections: " << connectionCount;
}
return;
}

if (!quiet) {
const auto word = (connectionCount == 1 ? " connection"_sd : " connections"_sd);
log() << "connection accepted from " << session->remote() << " #" << session->id() << " ("
<< connectionCount << word << " now open)";
}

ssm->setCleanupHook([ this, ssmIt, session = std::move(session) ] {
size_t connectionCount;
auto remote = session->remote();
{
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
_sessions.erase(ssmIt);
connectionCount = _sessions.size();
_currentConnections.store(connectionCount);
}
const auto word = (connectionCount == 1 ? " connection"_sd : " connections"_sd);
log() << "end connection " << remote << " (" << connectionCount << word << " now open)";

});

if (!sync) {
@@ -122,9 +184,15 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
}
}

std::size_t ServiceEntryPointImpl::getNumberOfConnections() const {
stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
return _sessions.size();
ServiceEntryPoint::Stats ServiceEntryPointImpl::sessionStats() const {

size_t sessionCount = _currentConnections.load();

ServiceEntryPoint::Stats ret;
ret.numOpenSessions = sessionCount;
ret.numCreatedSessions = _createdConnections.load();
ret.numAvailableSessions = _maxNumConnections - sessionCount;
return ret;
}

} // namespace mongo
@@ -54,13 +54,17 @@ class ServiceEntryPointImpl : public ServiceEntryPoint {
MONGO_DISALLOW_COPYING(ServiceEntryPointImpl);

public:
explicit ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) {}
explicit ServiceEntryPointImpl(ServiceContext* svcCtx);

void startSession(transport::SessionHandle session) final;

void endAllSessions(transport::Session::TagMask tags) final;

std::size_t getNumberOfConnections() const;
Stats sessionStats() const final;

size_t numOpenSessions() const final {
return _currentConnections.load();
}

private:
using SSMList = stdx::list<std::shared_ptr<ServiceStateMachine>>;
@@ -71,6 +75,10 @@ class ServiceEntryPointImpl : public ServiceEntryPoint {

mutable stdx::mutex _sessionsMutex;
SSMList _sessions;

size_t _maxNumConnections{DEFAULT_MAX_CONN};
AtomicWord<size_t> _currentConnections{0};
AtomicWord<size_t> _createdConnections{0};
};

} // namespace mongo
@@ -110,4 +110,12 @@ void ServiceEntryPointMock::endAllSessions(transport::Session::TagMask) {
}
}

ServiceEntryPoint::Stats ServiceEntryPointMock::sessionStats() const {
return {};
}

size_t ServiceEntryPointMock::numOpenSessions() const {
return 0ULL;
}

} // namespace mongo
@@ -67,6 +67,10 @@ class ServiceEntryPointMock : public ServiceEntryPoint {

void endAllSessions(transport::Session::TagMask tags) override;

Stats sessionStats() const override;

size_t numOpenSessions() const override;

DbResponse handleRequest(OperationContext* opCtx, const Message& request) override;

private:
@@ -131,10 +131,6 @@ void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket&& ticket,
return _asyncWait(std::move(ticket), std::move(callback));
}

TransportLayer::Stats ServiceEntryPointTestSuite::MockTLHarness::sessionStats() {
return Stats();
}

void ServiceEntryPointTestSuite::MockTLHarness::end(const SessionHandle& session) {
return _end(session);
}
@@ -110,7 +110,6 @@ class ServiceEntryPointTestSuite : public mongo::unittest::Test {
Status wait(transport::Ticket&& ticket) override;
void asyncWait(transport::Ticket&& ticket, TicketCallback callback) override;

Stats sessionStats() override;
void end(const transport::SessionHandle& session) override;
Status setup() override;
Status start() override;
@@ -463,24 +463,11 @@ ServiceStateMachine::State ServiceStateMachine::state() {
void ServiceStateMachine::_cleanupSession(ThreadGuard& guard) {
_state.store(State::Ended);

auto tl = _session()->getTransportLayer();
auto remote = _session()->remote();

_inMessage.reset();

// By ignoring the return value of Client::releaseCurrent() we destroy the session.
// _dbClient is now nullptr and _dbClientPtr is invalid and should never be accessed.
Client::releaseCurrent();

if (!serverGlobalParams.quiet.load()) {
// Get the number of open sessions minus 1 (this one will get cleaned up when
// this SSM gets destroyed)
// TODO Swich to using ServiceEntryPointImpl::getNumberOfConnections(), or move this
// into the ServiceEntryPoint
auto conns = tl->sessionStats().numOpenSessions - 1;
const char* word = (conns == 1 ? " connection" : " connections");
log() << "end connection " << remote << " (" << conns << word << " now open)";
}
}

} // namespace mongo
@@ -76,6 +76,14 @@ class MockSEP : public ServiceEntryPoint {

void endAllSessions(transport::Session::TagMask tags) override {}

Stats sessionStats() const override {
return {};
}

size_t numOpenSessions() const override {
return 0ULL;
}

void setUassertInHandler() {
_uassertInHandler = true;
}
@@ -75,10 +75,6 @@ class TransportLayerASIO::ASIOSession : public Session {
}
}

virtual ~ASIOSession() {
_tl->_currentConnections.subtractAndFetch(1);
}

TransportLayer* getTransportLayer() const override {
return _tl;
}
@@ -64,29 +64,6 @@ class TransportLayer {

friend class Session;

/**
* Stats for sessions open in the Transport Layer.
*/
struct Stats {
/**
* Returns the number of sessions currently open in the transport layer.
*/
size_t numOpenSessions = 0;

/**
* Returns the total number of sessions that have ever been created by this TransportLayer.
*/
size_t numCreatedSessions = 0;

/**
* Returns the number of available sessions we could still open. Only relevant
* when we are operating under a transport::Session limit (for example, in the
* legacy implementation, we respect a maximum number of connections). If there
* is no session limit, returns std::numeric_limits<int>::max().
*/
size_t numAvailableSessions = 0;
};

virtual ~TransportLayer() = default;

/**
@@ -147,11 +124,6 @@ class TransportLayer {
*/
virtual void asyncWait(Ticket&& ticket, TicketCallback callback) = 0;

/**
* Returns the number of sessions currently open in the transport layer.
*/
virtual Stats sessionStats() = 0;

/**
* End the given Session. Tickets for this Session that have already been
* started via wait() or asyncWait() will complete, but may return a failed Status.

0 comments on commit 699f402

Please sign in to comment.
You can’t perform that action at this time.