Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: out of band requests #67

Merged
merged 2 commits into from Nov 19, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions ext/common/ApplicationPool2/Group.h
Expand Up @@ -107,6 +107,9 @@ class Group: public enable_shared_from_this<Group> {
static string generateSecret(const SuperGroupPtr &superGroup);
void onSessionInitiateFailure(const ProcessPtr &process, Session *session);
void onSessionClose(const ProcessPtr &process, Session *session);
void lockAndAsyncOOBWRequestIfNeeded(const ProcessPtr &process, DisableResult result, GroupPtr self);
void asyncOOBWRequestIfNeeded(const ProcessPtr &process);
void spawnThreadOOBWRequest(GroupPtr self, const ProcessPtr &process);
void spawnThreadMain(GroupPtr self, SpawnerPtr spawner, Options options);
void spawnThreadRealMain(const SpawnerPtr &spawner, const Options &options);
void finalizeRestart(GroupPtr self, Options options, SpawnerFactoryPtr spawnerFactory,
Expand Down Expand Up @@ -589,6 +592,9 @@ class Group: public enable_shared_from_this<Group> {
return getSuperGroup() == NULL;
}

// Thread-safe.
void requestOOBW(const ProcessPtr &process);

/**
* Attaches the given process to this Group and mark it as enabled. This
* function doesn't touch getWaitlist so be sure to fix its invariants
Expand Down
190 changes: 187 additions & 3 deletions ext/common/ApplicationPool2/Implementation.cpp
Expand Up @@ -30,6 +30,7 @@
#include <ApplicationPool2/Group.h>
#include <ApplicationPool2/PipeWatcher.h>
#include <Exceptions.h>
#include <MessageReadersWriters.h>

namespace Passenger {
namespace ApplicationPool2 {
Expand Down Expand Up @@ -262,13 +263,18 @@ Group::onSessionClose(const ProcessPtr &process, Session *session) {
pqueue.decrease(process->pqHandle, process->utilization());
}

bool maxRequestsReached = options.maxRequests > 0
&& process->processed >= options.maxRequests;
asyncOOBWRequestIfNeeded(process);
if (process->enabled == Process::DISABLED) {
return;
}

/* This group now has a process that's guaranteed to be not at
* full utilization...
*/
assert(!process->atFullUtilization());

bool maxRequestsReached = options.maxRequests > 0
&& process->processed >= options.maxRequests;
if (!getWaitlist.empty() && !maxRequestsReached) {
/* ...so if there are clients waiting for a process to
* become available, call them now.
Expand Down Expand Up @@ -323,7 +329,7 @@ Group::onSessionClose(const ProcessPtr &process, Session *session) {
*/
vector<Callback> actions;
removeProcessFromList(process, disablingProcesses);
addProcessToList(process, enabledProcesses);
addProcessToList(process, disabledProcesses);
removeFromDisableWaitlist(process, DR_SUCCESS, actions);
pool->verifyInvariants();
verifyInvariants();
Expand All @@ -333,6 +339,175 @@ Group::onSessionClose(const ProcessPtr &process, Session *session) {
}
}

void
Group::requestOOBW(const ProcessPtr &process) {
// Standard resource management boilerplate stuff...
PoolPtr pool = getPool();
if (OXT_UNLIKELY(pool == NULL)) {
return;
}
unique_lock<boost::mutex> lock(pool->syncher);
pool = getPool();
if (OXT_UNLIKELY(pool == NULL || process->detached())) {
return;
}

process->oobwRequested = true;
}

// The 'self' parameter is for keeping the current Group object alive
void
Group::lockAndAsyncOOBWRequestIfNeeded(const ProcessPtr &process, DisableResult result, GroupPtr self) {
TRACE_POINT();

if (result != DR_SUCCESS && result != DR_CANCELED) {
return;
}

// Standard resource management boilerplate stuff...
PoolPtr pool = getPool();
if (OXT_UNLIKELY(pool == NULL)) {
return;
}
unique_lock<boost::mutex> lock(pool->syncher);
pool = getPool();
if (OXT_UNLIKELY(pool == NULL || process->detached())) {
return;
}

asyncOOBWRequestIfNeeded(process);
}

void
Group::asyncOOBWRequestIfNeeded(const ProcessPtr &process) {
if (process->detached()) {
return;
}

if (!process->oobwRequested) {
// The process has not requested oobw, so nothing to do here.
return;
}

if (process->enabled == Process::ENABLED) {
// We want the process to be disabled. However, disabling a process is potentially
// asynchronous, so we pass a callback which will re-aquire the lock and call this
// method again.
DisableResult result = disable(process,
boost::bind(&Group::lockAndAsyncOOBWRequestIfNeeded, this, _1, _2, shared_from_this()));
if (result == DR_DEFERRED) { return; }
}

if (process->enabled != Process::DISABLED) {
return;
}

if (process->sessions > 0) {
// Finally, all outstanding sessions must be finished.
return;
}

createInterruptableThread(
boost::bind(&Group::spawnThreadOOBWRequest, this, shared_from_this(), process),
"oobw request thread for process " + process->pid,
POOL_HELPER_THREAD_STACK_SIZE);
}

// The 'self' parameter is for keeping the current Group object alive while this thread is running.
void
Group::spawnThreadOOBWRequest(GroupPtr self, const ProcessPtr &process) {
TRACE_POINT();

Socket *socket;
Connection connection;

{
// Standard resource management boilerplate stuff...
PoolPtr pool = getPool();
if (OXT_UNLIKELY(pool == NULL)) {
return;
}
unique_lock<boost::mutex> lock(pool->syncher);
pool = getPool();
if (OXT_UNLIKELY(pool == NULL || process->detached())) {
return;
}

assert(!process->detached());
assert(process->oobwRequested);
assert(process->sessions == 0);
assert(process->enabled == Process::DISABLED);
socket = process->sessionSockets.top();
assert(socket != NULL);
}

unsigned long long timeout = 1000 * 1000 * 60; // 1 min
try {
ScopeGuard guard(boost::bind(&Socket::checkinConnection, socket, connection));

// Grab a connection. The connection is marked as fail in order to
// ensure it is closed / recycled after this request (otherwise we'd
// need to completely read the response).
connection = socket->checkoutConnection();
connection.fail = true;


// This is copied from RequestHandler when it is sending data using the
// "session" protocol.
char sizeField[sizeof(uint32_t)];
SmallVector<StaticString, 10> data;

data.push_back(StaticString(sizeField, sizeof(uint32_t)));
data.push_back(makeStaticStringWithNull("REQUEST_METHOD"));
data.push_back(makeStaticStringWithNull("OOBW"));

data.push_back(makeStaticStringWithNull("PASSENGER_CONNECT_PASSWORD"));
data.push_back(makeStaticStringWithNull(process->connectPassword));

uint32_t dataSize = 0;
for (unsigned int i = 1; i < data.size(); i++) {
dataSize += (uint32_t) data[i].size();
}
Uint32Message::generate(sizeField, dataSize);

gatheredWrite(connection.fd, &data[0], data.size(), &timeout);

// We do not care what the actual response is ... just wait for it.
waitUntilReadable(connection.fd, &timeout);
} catch (const SystemException &e) {
P_ERROR("*** ERROR: " << e.what() << "\n" << e.backtrace());
} catch (const TimeoutException &e) {
P_ERROR("*** ERROR: " << e.what() << "\n" << e.backtrace());
}

vector<Callback> actions;
{
// Standard resource management boilerplate stuff...
PoolPtr pool = getPool();
if (OXT_UNLIKELY(pool == NULL)) {
return;
}
unique_lock<boost::mutex> lock(pool->syncher);
pool = getPool();
if (OXT_UNLIKELY(pool == NULL)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too: OXT_UNLIKELY(pool == NULL || process->detached())

return;
}

process->oobwRequested = false;
if (process->detached()) {
return;
}

enable(process, actions);
assignSessionsToGetWaiters(actions);

verifyInvariants();
verifyExpensiveInvariants();
pool->verifyInvariants();
}
runAllActions(actions);
}

// The 'self' parameter is for keeping the current Group object alive while this thread is running.
void
Group::spawnThreadMain(GroupPtr self, SpawnerPtr spawner, Options options) {
Expand Down Expand Up @@ -554,6 +729,15 @@ Session::getGupid() const {
return process->gupid;
}

void
Session::requestOOBW() {
GroupPtr group = process->getGroup();
if (OXT_UNLIKELY(group == NULL)) {
return;
}

group->requestOOBW(process);
}

PipeWatcher::PipeWatcher(
const SafeLibevPtr &_libev,
Expand Down
9 changes: 6 additions & 3 deletions ext/common/ApplicationPool2/Process.h
Expand Up @@ -177,7 +177,6 @@ class Process: public enable_shared_from_this<Process> {
* 0 means unlimited. */
int concurrency;


/*************************************************************
* Information used by Pool. Do not write to these from
* outside the Pool. If you read these make sure the Pool
Expand All @@ -203,6 +202,9 @@ class Process: public enable_shared_from_this<Process> {
} enabled;
ProcessMetrics metrics;

/** Marks whether the process requested oobw. We need to wait until all sessions
* end and the process has been disabled. */
bool oobwRequested;

Process(const SafeLibevPtr _libev,
pid_t _pid,
Expand Down Expand Up @@ -249,8 +251,9 @@ class Process: public enable_shared_from_this<Process> {
indexSessionSockets();
}

lastUsed = SystemTime::getUsec();
spawnEndTime = lastUsed;
lastUsed = SystemTime::getUsec();
spawnEndTime = lastUsed;
oobwRequested = false;
}

~Process() {
Expand Down
1 change: 1 addition & 0 deletions ext/common/ApplicationPool2/Session.h
Expand Up @@ -109,6 +109,7 @@ class Session {
const string &getConnectPassword() const;
pid_t getPid() const;
const string &getGupid() const;
void requestOOBW();

const ProcessPtr &getProcess() const {
return process;
Expand Down
10 changes: 10 additions & 0 deletions ext/common/Utils/StrIntUtils.cpp
Expand Up @@ -432,4 +432,14 @@ escapeHTML(const StaticString &input) {
return result;
}

StaticString
makeStaticStringWithNull(const char *data) {
return StaticString(data, strlen(data) + 1);
}

StaticString
makeStaticStringWithNull(const string &data) {
return StaticString(data.c_str(), data.size() + 1);
}

} // namespace Passenger
3 changes: 3 additions & 0 deletions ext/common/Utils/StrIntUtils.h
Expand Up @@ -245,6 +245,9 @@ string cEscapeString(const StaticString &input);
*/
string escapeHTML(const StaticString &input);

StaticString makeStaticStringWithNull(const char *data);

StaticString makeStaticStringWithNull(const string &data);

} // namespace Passenger

Expand Down
18 changes: 10 additions & 8 deletions ext/common/agents/HelperAgent/RequestHandler.h
Expand Up @@ -913,6 +913,16 @@ class RequestHandler {
headerData.append("X-Powered-By: Phusion Passenger\r\n");
}

// Detect out of band work request
Header oobw = lookupHeader(headerData, "X-Passenger-Request-OOB-Work", "x-passenger-request-oob-work");
if (!oobw.empty()) {
P_TRACE(3, "Response with oobw detected.");
if (client->session != NULL) {
client->session->requestOOBW();
}
removeHeader(headerData, oobw);
}

headerData.append("\r\n");
writeToClientOutputPipe(client, headerData);
return true;
Expand Down Expand Up @@ -1815,14 +1825,6 @@ class RequestHandler {

/******* State: SENDING_HEADER_TO_APP *******/

static StaticString makeStaticStringWithNull(const char *data) {
return StaticString(data, strlen(data) + 1);
}

static StaticString makeStaticStringWithNull(const string &data) {
return StaticString(data.c_str(), data.size() + 1);
}

void state_sendingHeaderToApp_verifyInvariants(const ClientPtr &client) {
assert(!client->clientInput->isStarted());
assert(!client->clientBodyBuffer->isStarted());
Expand Down
3 changes: 3 additions & 0 deletions lib/phusion_passenger/public_api.rb
Expand Up @@ -27,6 +27,7 @@ class << self
@@event_stopping_worker_process = []
@@event_credentials = []
@@event_after_installing_signal_handlers = []
@@event_oob_work = []

def on_event(name, &block)
callback_list_for_event(name) << block
Expand Down Expand Up @@ -92,6 +93,8 @@ def callback_list_for_event(name)
@@event_credentials
when :after_installing_signal_handlers
@@event_after_installing_signal_handlers
when :oob_work
@@event_oob_work
else
raise ArgumentError, "Unknown event name '#{name}'"
end
Expand Down