@@ -209,7 +209,7 @@ Group::spawnThreadOOBWRequest(GroupPtr self, ProcessPtr process) {

assert(process->oobwStatus == Process::OOBW_IN_PROGRESS);
assert(process->sessions == 0);
socket = process->findSessionSocketWithLowestBusyness();
socket = process->findSocketsAcceptingHttpRequestsAndWithLowestBusyness();
}

UPDATE_TRACE_POINT();
@@ -109,8 +109,8 @@ Group::spawnThreadRealMain(const SpawningKit::SpawnerPtr &spawner,
} catch (const boost::thread_interrupted &) {
break;
} catch (SpawningKit::SpawnException &e) {
exception = copyException(e);
processAndLogNewSpawnException(e, options, pool->getContext());
exception = copyException(e);
} catch (const tracable_exception &e) {
exception = copyException(e);
// Let other (unexpected) exceptions crash the program so
@@ -165,7 +165,7 @@ rethrowException(const ExceptionPtr &e) {
}

void processAndLogNewSpawnException(SpawningKit::SpawnException &e, const Options &options,
Context *context)
const Context *context)
{
TRACE_POINT();
UnionStation::TransactionPtr transaction;
@@ -1,6 +1,6 @@
/*
* Phusion Passenger - https://www.phusionpassenger.com/
* Copyright (c) 2011-2015 Phusion Holding B.V.
* Copyright (c) 2011-2017 Phusion Holding B.V.
*
* "Passenger", "Phusion Passenger" and "Union Station" are registered
* trademarks of Phusion Holding B.V.
@@ -108,7 +108,7 @@ Pool::inspectProcessList(const InspectOptions &options, stringstream &result,
}

const Socket *socket;
if (options.verbose && (socket = process->getSockets().findSocketWithName("http")) != NULL) {
if (options.verbose && (socket = process->getSockets().findFirstSocketWithProtocol("http")) != NULL) {
result << " URL : http://" << replaceString(socket->address, "tcp://", "") << endl;
result << " Password: " << group->getApiKey().toStaticString() << endl;
}
@@ -100,7 +100,7 @@ typedef boost::container::vector<ProcessPtr> ProcessList;
*/
class Process {
public:
static const unsigned int MAX_SESSION_SOCKETS = 3;
static const unsigned int MAX_SOCKETS_ACCEPTING_HTTP_REQUESTS = 3;

private:
/*************************************************************
@@ -119,11 +119,11 @@ class Process {
int concurrency;

/**
* A subset of 'sockets': all sockets that speak the
* "session" or "http_session" protocol.
* A subset of 'sockets': all sockets that accept HTTP requests
* from the Passenger Core controller.
*/
unsigned int sessionSocketCount;
Socket *sessionSockets[MAX_SESSION_SOCKETS];
unsigned int socketsAcceptingHttpRequestsCount;
Socket *socketsAcceptingHttpRequests[MAX_SOCKETS_ACCEPTING_HTTP_REQUESTS];

/** Input pipe. See Process class description. */
FileDescriptor inputPipe;
@@ -198,19 +198,25 @@ class Process {
};

struct SocketStringOffsets {
String name;
String address;
String protocol;
String description;
};

vector<SocketStringOffsets> socketStringOffsets;
String codeRevision;
};

void appendJsonFieldToBuffer(std::string &buffer, const Json::Value &json,
const char *key, InitializationLog::String &str) const
const char *key, InitializationLog::String &str, bool required = true) const
{
StaticString value = getJsonStaticStringField(json, key);
StaticString value;
if (required) {
value = getJsonStaticStringField(json, key);
} else {
value = getJsonStaticStringField(json, Json::StaticString(key),
StaticString());
}
str.offset = buffer.size();
str.size = value.size();
buffer.append(value.data(), value.size());
@@ -247,9 +253,10 @@ class Process {
const Json::Value &socket = *it;
InitializationLog::SocketStringOffsets offsets;

appendJsonFieldToBuffer(buffer, socket, "name", offsets.name);
appendJsonFieldToBuffer(buffer, socket, "address", offsets.address);
appendJsonFieldToBuffer(buffer, socket, "protocol", offsets.protocol);
appendJsonFieldToBuffer(buffer, socket, "description", offsets.description,
false);

log.socketStringOffsets.push_back(offsets);
}
@@ -276,13 +283,14 @@ class Process {
const Json::Value &socket = *it;
this->sockets.add(
info.pid,
StaticString(base + log.socketStringOffsets[i].name.offset,
log.socketStringOffsets[i].name.size),
StaticString(base + log.socketStringOffsets[i].address.offset,
log.socketStringOffsets[i].address.size),
StaticString(base + log.socketStringOffsets[i].protocol.offset,
log.socketStringOffsets[i].protocol.size),
getJsonIntField(socket, "concurrency")
StaticString(base + log.socketStringOffsets[i].description.offset,
log.socketStringOffsets[i].description.size),
getJsonIntField(socket, "concurrency"),
getJsonBoolField(socket, "accept_http_requests")
);
}

@@ -292,37 +300,37 @@ class Process {
}
}

void indexSessionSockets() {
void indexSocketsAcceptingHttpRequests() {
SocketList::iterator it;

concurrency = 0;
memset(sessionSockets, 0, sizeof(sessionSockets));
memset(socketsAcceptingHttpRequests, 0, sizeof(socketsAcceptingHttpRequests));

for (it = sockets.begin(); it != sockets.end(); it++) {
Socket *socket = &(*it);
if (socket->protocol == "session" || socket->protocol == "http_session") {
if (sessionSocketCount == MAX_SESSION_SOCKETS) {
throw RuntimeException("The process has too many session sockets. "
"A maximum of " + toString(MAX_SESSION_SOCKETS) + " is allowed");
if (socket->acceptHttpRequests) {
if (socketsAcceptingHttpRequestsCount == MAX_SOCKETS_ACCEPTING_HTTP_REQUESTS) {
throw RuntimeException("The process has too many sockets that accept HTTP requests. "
"A maximum of " + toString(MAX_SOCKETS_ACCEPTING_HTTP_REQUESTS) + " is allowed");
}
sessionSockets[sessionSocketCount] = socket;
sessionSocketCount++;
socketsAcceptingHttpRequests[socketsAcceptingHttpRequestsCount] = socket;
socketsAcceptingHttpRequestsCount++;

if (concurrency != -1) {
if (concurrency != -999) {
if (socket->concurrency == 0) {
// If one of the sockets has a concurrency of
// 0 (unlimited) then we mark this entire Process
// as having a concurrency of 0.
concurrency = -1;
// < 0 (so either unlimited or unknown) then we mark
// this entire Process as having a concurrency of -1 (unknown).
concurrency = -999;
} else {
concurrency += socket->concurrency;
}
}
}
}

if (concurrency == -1) {
concurrency = 0;
if (concurrency == -999) {
concurrency = -1;
}
}

@@ -436,7 +444,7 @@ class Process {

Process(const BasicGroupInfo *groupInfo, const Json::Value &args)
: info(this, groupInfo, args),
sessionSocketCount(0),
socketsAcceptingHttpRequestsCount(0),
spawnerCreationTime(getJsonUint64Field(args, "spawner_creation_time")),
spawnStartTime(getJsonUint64Field(args, "spawn_start_time")),
spawnEndTime(SystemTime::getUsec()),
@@ -455,13 +463,13 @@ class Process {
shutdownStartTime(0)
{
initializeSocketsAndStringFields(args);
indexSessionSockets();
indexSocketsAcceptingHttpRequests();
}

Process(const BasicGroupInfo *groupInfo, const SpawningKit::Result &skResult,
const Json::Value &args)
: info(this, groupInfo, skResult),
sessionSocketCount(0),
socketsAcceptingHttpRequestsCount(0),
spawnerCreationTime(getJsonUint64Field(args, "spawner_creation_time")),
spawnStartTime(skResult.spawnStartTime),
spawnEndTime(skResult.spawnEndTime),
@@ -480,7 +488,7 @@ class Process {
shutdownStartTime(0)
{
initializeSocketsAndStringFields(skResult);
indexSessionSockets();
indexSocketsAcceptingHttpRequests();

inputPipe = skResult.stdinFd;
outputPipe = skResult.stdoutAndErrFd;
@@ -510,8 +518,8 @@ class Process {
void forceMaxConcurrency(int value) {
assert(value >= 0);
concurrency = value;
for (unsigned i = 0; i < sessionSocketCount; i++) {
sessionSockets[i]->concurrency = concurrency;
for (unsigned i = 0; i < socketsAcceptingHttpRequestsCount; i++) {
socketsAcceptingHttpRequests[i]->concurrency = concurrency;
}
}

@@ -675,23 +683,23 @@ class Process {
return sockets;
}

Socket *findSessionSocketWithLowestBusyness() const {
if (OXT_UNLIKELY(sessionSocketCount == 0)) {
Socket *findSocketsAcceptingHttpRequestsAndWithLowestBusyness() const {
if (OXT_UNLIKELY(socketsAcceptingHttpRequestsCount == 0)) {
return NULL;
} else if (sessionSocketCount == 1) {
return sessionSockets[0];
} else if (socketsAcceptingHttpRequestsCount == 1) {
return socketsAcceptingHttpRequests[0];
} else {
int leastBusySessionSocketIndex = 0;
int lowestBusyness = sessionSockets[0]->busyness();
int leastBusySocketIndex = 0;
int lowestBusyness = socketsAcceptingHttpRequests[0]->busyness();

for (unsigned i = 1; i < sessionSocketCount; i++) {
if (sessionSockets[i]->busyness() < lowestBusyness) {
leastBusySessionSocketIndex = i;
lowestBusyness = sessionSockets[i]->busyness();
for (unsigned i = 1; i < socketsAcceptingHttpRequestsCount; i++) {
if (socketsAcceptingHttpRequests[i]->busyness() < lowestBusyness) {
leastBusySocketIndex = i;
lowestBusyness = socketsAcceptingHttpRequests[i]->busyness();
}
}

return sessionSockets[leastBusySessionSocketIndex];
return socketsAcceptingHttpRequests[leastBusySocketIndex];
}
}

@@ -731,15 +739,15 @@ class Process {
/* Different processes within a Group may have different
* 'concurrency' values. We want:
* - the process with the smallest busyness to be be picked for routing.
* - to give processes with concurrency == 0 more priority (in general)
* - to give processes with concurrency == 0 or -1 more priority (in general)
* over processes with concurrency > 0.
* Therefore, in case of processes with concurrency > 0, we describe our
* busyness as a percentage of 'concurrency', with the percentage value
* in [0..INT_MAX] instead of [0..1]. That way, the busyness value
* of processes with concurrency > 0 is usually higher than that of processes
* with concurrency == 0.
* with concurrency == 0 or -1.
*/
if (concurrency == 0) {
if (concurrency <= 0) {
return sessions;
} else {
return (int) (((long long) sessions * INT_MAX) / (double) concurrency);
@@ -776,7 +784,7 @@ class Process {
* not result in any harmful behavior.
*/
SessionPtr newSession(unsigned long long now = 0) {
Socket *socket = findSessionSocketWithLowestBusyness();
Socket *socket = findSocketsAcceptingHttpRequestsAndWithLowestBusyness();
if (socket->isTotallyBusy()) {
return SessionPtr();
} else {
@@ -913,10 +921,13 @@ class Process {
for (it = sockets.begin(); it != sockets.end(); it++) {
const Socket &socket = *it;
stream << "<socket>";
stream << "<name>" << escapeForXml(socket.name) << "</name>";
stream << "<address>" << escapeForXml(socket.address) << "</address>";
stream << "<protocol>" << escapeForXml(socket.protocol) << "</protocol>";
if (!socket.description.empty()) {
stream << "<description>" << escapeForXml(socket.description) << "</description>";
}
stream << "<concurrency>" << socket.concurrency << "</concurrency>";
stream << "<accept_http_requests>" << socket.acceptHttpRequests << "</accept_http_requests>";
stream << "<sessions>" << socket.sessions << "</sessions>";
stream << "</socket>";
}