From fa37de138654418fd90a9c9103bda5509f3de82e Mon Sep 17 00:00:00 2001 From: Paul Kmiec Date: Tue, 23 Oct 2012 18:42:56 -0700 Subject: [PATCH 1/2] feature: out of band requests --- ext/common/ApplicationPool2/Group.h | 6 + .../ApplicationPool2/Implementation.cpp | 188 +++++++++++++++++- ext/common/ApplicationPool2/Process.h | 9 +- ext/common/ApplicationPool2/Session.h | 1 + ext/common/Utils/StrIntUtils.cpp | 10 + ext/common/Utils/StrIntUtils.h | 3 + .../agents/HelperAgent/RequestHandler.h | 18 +- lib/phusion_passenger/public_api.rb | 3 + lib/phusion_passenger/rack/out_of_band_gc.rb | 61 ++++++ .../request_handler/thread_handler.rb | 8 + test/cxx/RequestHandlerTest.cpp | 61 ++++++ test/integration_tests/nginx_tests.rb | 48 +++++ test/stub/wsgi/passenger_wsgi.py | 15 +- 13 files changed, 415 insertions(+), 16 deletions(-) create mode 100644 lib/phusion_passenger/rack/out_of_band_gc.rb diff --git a/ext/common/ApplicationPool2/Group.h b/ext/common/ApplicationPool2/Group.h index 9b0f98485b..2eedd7452a 100644 --- a/ext/common/ApplicationPool2/Group.h +++ b/ext/common/ApplicationPool2/Group.h @@ -107,6 +107,9 @@ class Group: public enable_shared_from_this { static string generateSecret(const SuperGroupPtr &superGroup); void onSessionInitiateFailure(const ProcessPtr &process, Session *session); void onSessionClose(const ProcessPtr &process, Session *session); + void lockAndAsyncOOBWRequestIfNeeded(GroupPtr self, const ProcessPtr &process); + void asyncOOBWRequestIfNeeded(const ProcessPtr &process); + void spawnThreadOOBWRequest(GroupPtr self, const ProcessPtr &process); void spawnThreadMain(GroupPtr self, SpawnerPtr spawner, Options options); void spawnThreadRealMain(const SpawnerPtr &spawner, const Options &options); void finalizeRestart(GroupPtr self, Options options, SpawnerFactoryPtr spawnerFactory, @@ -589,6 +592,9 @@ class Group: public enable_shared_from_this { return getSuperGroup() == NULL; } + // Thread-safe. + void requestOOBW(const ProcessPtr &process); + /** * Attaches the given process to this Group and mark it as enabled. This * function doesn't touch getWaitlist so be sure to fix its invariants diff --git a/ext/common/ApplicationPool2/Implementation.cpp b/ext/common/ApplicationPool2/Implementation.cpp index cf22d04d37..74de59db8b 100644 --- a/ext/common/ApplicationPool2/Implementation.cpp +++ b/ext/common/ApplicationPool2/Implementation.cpp @@ -30,6 +30,7 @@ #include #include #include +#include namespace Passenger { namespace ApplicationPool2 { @@ -262,13 +263,18 @@ Group::onSessionClose(const ProcessPtr &process, Session *session) { pqueue.decrease(process->pqHandle, process->utilization()); } - bool maxRequestsReached = options.maxRequests > 0 - && process->processed >= options.maxRequests; + asyncOOBWRequestIfNeeded(process); + if (process->enabled == Process::DISABLED) { + return; + } /* This group now has a process that's guaranteed to be not at * full utilization... */ assert(!process->atFullUtilization()); + + bool maxRequestsReached = options.maxRequests > 0 + && process->processed >= options.maxRequests; if (!getWaitlist.empty() && !maxRequestsReached) { /* ...so if there are clients waiting for a process to * become available, call them now. @@ -333,6 +339,175 @@ Group::onSessionClose(const ProcessPtr &process, Session *session) { } } +void +Group::requestOOBW(const ProcessPtr &process) { + // Standard resource management boilerplate stuff... + PoolPtr pool = getPool(); + if (OXT_UNLIKELY(pool == NULL)) { + return; + } + unique_lock lock(pool->syncher); + pool = getPool(); + if (OXT_UNLIKELY(pool == NULL)) { + return; + } + + process->oobwRequested = true; +} + +// The 'self' parameter is for keeping the current Group object alive +void +Group::lockAndAsyncOOBWRequestIfNeeded(GroupPtr self, const ProcessPtr &process) { + TRACE_POINT(); + + // Standard resource management boilerplate stuff... + PoolPtr pool = getPool(); + if (OXT_UNLIKELY(pool == NULL)) { + return; + } + unique_lock lock(pool->syncher); + pool = getPool(); + if (OXT_UNLIKELY(pool == NULL)) { + return; + } + + assert(process->enabled == Process::DISABLED); + asyncOOBWRequestIfNeeded(process); +} + +void +Group::asyncOOBWRequestIfNeeded(const ProcessPtr &process) { + if (process->detached()) { + return; + } + + if (!process->oobwRequested) { + // The process has not requested oobw, so nothing to do here. + return; + } + + if (process->enabled == Process::ENABLED) { + // We want the process to be disabled. However, disabling a process is potentially + // asynchronous, so we pass a callback which will re-aquire the lock and call this + // method again. + disable(process, boost::bind(&Group::lockAndAsyncOOBWRequestIfNeeded, this, + shared_from_this(), process)); + } + + if (process->enabled != Process::DISABLED) { + // The process is still not disabled (perhaps it is in the process of disabling). + return; + } + + if (process->sessions > 0) { + // Finally, all outstanding sessions must be finished. + return; + } + + createInterruptableThread( + boost::bind(&Group::spawnThreadOOBWRequest, this, shared_from_this(), process), + "oobw request thread for process " + process->pid, + POOL_HELPER_THREAD_STACK_SIZE); +} + +// The 'self' parameter is for keeping the current Group object alive while this thread is running. +void +Group::spawnThreadOOBWRequest(GroupPtr self, const ProcessPtr &process) { + TRACE_POINT(); + + Socket *socket; + Connection connection; + + { + // Standard resource management boilerplate stuff... + PoolPtr pool = getPool(); + if (OXT_UNLIKELY(pool == NULL)) { + return; + } + unique_lock lock(pool->syncher); + pool = getPool(); + if (OXT_UNLIKELY(pool == NULL)) { + return; + } + + assert(!process->detached()); + assert(process->oobwRequested); + assert(process->sessions == 0); + assert(process->enabled == Process::DISABLED); + socket = process->sessionSockets.top(); + + lock.unlock(); + } + + unsigned long long timeout = 1000 * 1000 * 60; // 1 min + try { + // Grab a connection. The connection is marked as fail in order to + // ensure it is closed / recycled after this request (otherwise we'd + // need to completely read the response). + connection = socket->checkoutConnection(); + connection.fail = true; + FileDescriptor theFd = FileDescriptor(connection.fd, false); + + // This is copied from RequestHandler when it is sending data using the + // "session" protocol. + char sizeField[sizeof(uint32_t)]; + SmallVector data; + + data.push_back(StaticString(sizeField, sizeof(uint32_t))); + data.push_back(makeStaticStringWithNull("REQUEST_METHOD")); + data.push_back(makeStaticStringWithNull("OOBW")); + + data.push_back(makeStaticStringWithNull("PASSENGER_CONNECT_PASSWORD")); + data.push_back(makeStaticStringWithNull(process->connectPassword)); + + uint32_t dataSize = 0; + for (unsigned int i = 1; i < data.size(); i++) { + dataSize += (uint32_t) data[i].size(); + } + Uint32Message::generate(sizeField, dataSize); + + gatheredWrite(theFd, &data[0], data.size(), &timeout); + + // We do not care what the actual response is ... just wait for it. + waitUntilReadable(theFd, &timeout); + } catch (const SystemException &e) { + P_ERROR("*** ERROR: " << e.what() << "\n" << e.backtrace()); + } catch (const TimeoutException &e) { + P_ERROR("*** ERROR: " << e.what() << "\n" << e.backtrace()); + } + + // Clean up + if (socket != NULL && connection.fd != -1) { + socket->checkinConnection(connection); + } + + vector actions; + { + // Standard resource management boilerplate stuff... + PoolPtr pool = getPool(); + if (OXT_UNLIKELY(pool == NULL)) { + return; + } + unique_lock lock(pool->syncher); + pool = getPool(); + if (OXT_UNLIKELY(pool == NULL)) { + return; + } + + process->oobwRequested = false; + if (process->detached()) { + return; + } + + enable(process, actions); + assignSessionsToGetWaiters(actions); + + verifyInvariants(); + lock.unlock(); + } + runAllActions(actions); +} + // The 'self' parameter is for keeping the current Group object alive while this thread is running. void Group::spawnThreadMain(GroupPtr self, SpawnerPtr spawner, Options options) { @@ -554,6 +729,15 @@ Session::getGupid() const { return process->gupid; } +void +Session::requestOOBW() { + GroupPtr group = process->getGroup(); + if (OXT_UNLIKELY(group == NULL)) { + return; + } + + group->requestOOBW(process); +} PipeWatcher::PipeWatcher( const SafeLibevPtr &_libev, diff --git a/ext/common/ApplicationPool2/Process.h b/ext/common/ApplicationPool2/Process.h index 268039aa84..efb5fe4089 100644 --- a/ext/common/ApplicationPool2/Process.h +++ b/ext/common/ApplicationPool2/Process.h @@ -177,7 +177,6 @@ class Process: public enable_shared_from_this { * 0 means unlimited. */ int concurrency; - /************************************************************* * Information used by Pool. Do not write to these from * outside the Pool. If you read these make sure the Pool @@ -203,6 +202,9 @@ class Process: public enable_shared_from_this { } enabled; ProcessMetrics metrics; + /** Marks whether the process requested oobw. We need to wait until all sessions + * end and the process has been disabled. */ + bool oobwRequested; Process(const SafeLibevPtr _libev, pid_t _pid, @@ -249,8 +251,9 @@ class Process: public enable_shared_from_this { indexSessionSockets(); } - lastUsed = SystemTime::getUsec(); - spawnEndTime = lastUsed; + lastUsed = SystemTime::getUsec(); + spawnEndTime = lastUsed; + oobwRequested = false; } ~Process() { diff --git a/ext/common/ApplicationPool2/Session.h b/ext/common/ApplicationPool2/Session.h index ba2cda18e2..2cf8a91b58 100644 --- a/ext/common/ApplicationPool2/Session.h +++ b/ext/common/ApplicationPool2/Session.h @@ -109,6 +109,7 @@ class Session { const string &getConnectPassword() const; pid_t getPid() const; const string &getGupid() const; + void requestOOBW(); const ProcessPtr &getProcess() const { return process; diff --git a/ext/common/Utils/StrIntUtils.cpp b/ext/common/Utils/StrIntUtils.cpp index d79b7c57ac..58ca8c280b 100644 --- a/ext/common/Utils/StrIntUtils.cpp +++ b/ext/common/Utils/StrIntUtils.cpp @@ -432,4 +432,14 @@ escapeHTML(const StaticString &input) { return result; } +StaticString +makeStaticStringWithNull(const char *data) { + return StaticString(data, strlen(data) + 1); +} + +StaticString +makeStaticStringWithNull(const string &data) { + return StaticString(data.c_str(), data.size() + 1); +} + } // namespace Passenger diff --git a/ext/common/Utils/StrIntUtils.h b/ext/common/Utils/StrIntUtils.h index 4f475a7e1d..7841891808 100644 --- a/ext/common/Utils/StrIntUtils.h +++ b/ext/common/Utils/StrIntUtils.h @@ -245,6 +245,9 @@ string cEscapeString(const StaticString &input); */ string escapeHTML(const StaticString &input); +StaticString makeStaticStringWithNull(const char *data); + +StaticString makeStaticStringWithNull(const string &data); } // namespace Passenger diff --git a/ext/common/agents/HelperAgent/RequestHandler.h b/ext/common/agents/HelperAgent/RequestHandler.h index c8c0af2c82..1f951d287f 100644 --- a/ext/common/agents/HelperAgent/RequestHandler.h +++ b/ext/common/agents/HelperAgent/RequestHandler.h @@ -913,6 +913,16 @@ class RequestHandler { headerData.append("X-Powered-By: Phusion Passenger\r\n"); } + // Detect out of band work request + Header oobw = lookupHeader(headerData, "X-Passenger-Request-OOB-Work", "x-passenger-request-oob-work"); + if (!oobw.empty()) { + P_TRACE(3, "Response with oobw detected."); + if (client->session != NULL) { + client->session->requestOOBW(); + } + removeHeader(headerData, oobw); + } + headerData.append("\r\n"); writeToClientOutputPipe(client, headerData); return true; @@ -1815,14 +1825,6 @@ class RequestHandler { /******* State: SENDING_HEADER_TO_APP *******/ - static StaticString makeStaticStringWithNull(const char *data) { - return StaticString(data, strlen(data) + 1); - } - - static StaticString makeStaticStringWithNull(const string &data) { - return StaticString(data.c_str(), data.size() + 1); - } - void state_sendingHeaderToApp_verifyInvariants(const ClientPtr &client) { assert(!client->clientInput->isStarted()); assert(!client->clientBodyBuffer->isStarted()); diff --git a/lib/phusion_passenger/public_api.rb b/lib/phusion_passenger/public_api.rb index 9c5df1058f..5327d8450e 100644 --- a/lib/phusion_passenger/public_api.rb +++ b/lib/phusion_passenger/public_api.rb @@ -27,6 +27,7 @@ class << self @@event_stopping_worker_process = [] @@event_credentials = [] @@event_after_installing_signal_handlers = [] + @@event_oob_work = [] def on_event(name, &block) callback_list_for_event(name) << block @@ -92,6 +93,8 @@ def callback_list_for_event(name) @@event_credentials when :after_installing_signal_handlers @@event_after_installing_signal_handlers + when :oob_work + @@event_oob_work else raise ArgumentError, "Unknown event name '#{name}'" end diff --git a/lib/phusion_passenger/rack/out_of_band_gc.rb b/lib/phusion_passenger/rack/out_of_band_gc.rb new file mode 100644 index 0000000000..c36bd6a125 --- /dev/null +++ b/lib/phusion_passenger/rack/out_of_band_gc.rb @@ -0,0 +1,61 @@ +# encoding: binary +# Phusion Passenger - http://www.modrails.com/ +# Copyright (c) 2010-2012 Phusion +# +# "Phusion Passenger" is a trademark of Hongli Lai & Ninh Bui. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +require 'thread' + +module PhusionPassenger +module Rack + +class OutOfBandGc + def initialize(app, frequency, logger = nil) + @app = app + @frequency = frequency + @request_count = 0 + @mutex = Mutex.new + + ::PhusionPassenger.on_event(:oob_work) do + t0 = Time.now + GC.start + logger.info "Out Of Band GC finished in #{Time.now - t0} sec" if logger + end + end + + def call(env) + status, headers, body = @app.call(env) + + @mutex.synchronize do + @request_count += 1 + if @request_count == @frequency + @request_count = 0 + headers['X-Passenger-Request-OOB-Work'] = 'true' + end + end + + [status, headers, body] + end +end + +end # module Rack +end # module PhusionPassenger + diff --git a/lib/phusion_passenger/request_handler/thread_handler.rb b/lib/phusion_passenger/request_handler/thread_handler.rb index 963c7692a9..018154b922 100644 --- a/lib/phusion_passenger/request_handler/thread_handler.rb +++ b/lib/phusion_passenger/request_handler/thread_handler.rb @@ -39,6 +39,7 @@ class ThreadHandler REQUEST_METHOD = 'REQUEST_METHOD'.freeze PING = 'PING'.freeze + OOBW = 'OOBW'.freeze PASSENGER_CONNECT_PASSWORD = 'PASSENGER_CONNECT_PASSWORD'.freeze MAX_HEADER_SIZE = 128 * 1024 @@ -122,6 +123,8 @@ def accept_and_process_next_request(socket_wrapper, channel, buffer) begin if headers[REQUEST_METHOD] == PING process_ping(headers, connection) + elsif headers[REQUEST_METHOD] == OOBW + process_oobw(headers, connection) else process_request(headers, connection, @protocol == :http) end @@ -249,6 +252,11 @@ def process_ping(env, connection) connection.write("pong") end + def process_oobw(env, connection) + PhusionPassenger.call_event(:oob_work) + connection.write("oobw done") + end + # def process_request(env, connection, full_http_response) # raise NotImplementedError, "Override with your own implementation!" # end diff --git a/test/cxx/RequestHandlerTest.cpp b/test/cxx/RequestHandlerTest.cpp index 9aa3c6db63..5b4b5e3cfd 100644 --- a/test/cxx/RequestHandlerTest.cpp +++ b/test/cxx/RequestHandlerTest.cpp @@ -777,6 +777,67 @@ namespace tut { ensure_equals(buf.st_size, (off_t) 0); } + TEST_METHOD(46) { + // If the application outputs a request oobw header, handler should remove the header, mark + // the process as oobw requested. The process should continue to process requests until the + // spawner spawns another process (to avoid the group being empty). As soon as the new + // process is spawned, the original process will make the oobw request. Afterwards, the + // original process is re-enabled. + init(); + connect(); + sendHeaders(defaultHeaders, + "PASSENGER_APP_ROOT", wsgiAppPath.c_str(), + "PATH_INFO", "/oobw", + NULL); + string response = readAll(connection); + ensure(containsSubstring(response, "Status: 200 OK\r\n")); + ensure(!containsSubstring(response, "X-Passenger-Request-OOB-Work:")); + pid_t origPid = atoi(stripHeaders(response)); + + // Get a reference to the orignal process and verify oobw has been requested. + ProcessPtr origProcess; + { + unique_lock lock(pool->syncher); + origProcess = pool->superGroups.get(wsgiAppPath)->defaultGroup->processes.front(); + ensure(origProcess->oobwRequested); + lock.unlock(); + } + ensure(origPid == origProcess->pid); // just a sanity check + + // Issue requests until the new process handles it. + pid_t pid; + EVENTUALLY(2, + connect(); + sendHeaders(defaultHeaders, + "PASSENGER_APP_ROOT", wsgiAppPath.c_str(), + "PATH_INFO", "/pid", + NULL); + string response = readAll(connection); + ensure(containsSubstring(response, "Status: 200 OK\r\n")); + pid = atoi(stripHeaders(response)); + + result = (pid != origPid); + ); + + // Wait for the original process to finish oobw request. + EVENTUALLY(2, + { + unique_lock lock(pool->syncher); + result = (origProcess->oobwRequested == 0); + lock.unlock(); + } + ); + + // Final asserts. + { + unique_lock lock(pool->syncher); + ensure(pool->superGroups.get(wsgiAppPath)->defaultGroup->processes.size() == 2); + ensure(origProcess->oobwRequested == 0); + ensure(origProcess->enabled == Process::ENABLED); + lock.unlock(); + } + } + // Test small response buffering. // Test large response buffering. } diff --git a/test/integration_tests/nginx_tests.rb b/test/integration_tests/nginx_tests.rb index 8b66b180c7..767110a3cb 100644 --- a/test/integration_tests/nginx_tests.rb +++ b/test/integration_tests/nginx_tests.rb @@ -262,6 +262,54 @@ end end + describe "oob work" do + + before :all do + @server = "http://passenger.test:#{@nginx.port}" + @stub = RackStub.new('rack') + @nginx.add_server do |server| + server[:server_name] = "passenger.test" + server[:root] = "#{@stub.full_app_root}/public" + end + end + + before :each do + @stub.reset + + File.write("#{@stub.app_root}/config.ru", <<-RUBY) + PhusionPassenger.on_event(:oob_work) do + f = File.open("#{@stub.full_app_root}/oob_work.\#{$$}", 'w') + f.close + sleep 1 + end + app = lambda do |env| + if env['PATH_INFO'] == '/oobw' + [200, { "Content-Type" => "text/html", "X-Passenger-Request-OOB-Work" => 'true' }, [$$]] + else + [200, { "Content-Type" => "text/html" }, [$$]] + end + end + run app + RUBY + + @nginx.start + end + + it "invokes oobw when requested by the app process" do + pid = get("/oobw") + sleep 0.5 # wait for oobw callback to be invoked + File.exists?("#{@stub.app_root}/oob_work.#{pid}").should == true + end + + it "does not block client while invoking oob work" do + get("/") # ensure there are spawned app processes + t0 = Time.now + get("/oobw") + secs = Time.now - t0 + secs.should <= 0.1 + end + + end ##### Helper methods ##### diff --git a/test/stub/wsgi/passenger_wsgi.py b/test/stub/wsgi/passenger_wsgi.py index 6709b5acb3..0c82f13c64 100644 --- a/test/stub/wsgi/passenger_wsgi.py +++ b/test/stub/wsgi/passenger_wsgi.py @@ -8,15 +8,21 @@ def file_exist(filename): return False def application(env, start_response): - path = env['PATH_INFO'] status = '200 OK' body = None - + + method = env.get('REQUEST_METHOD') + if method == 'OOBW': + time.sleep(1) + start_response(status, [('Content-Type', 'text/html')]) + return [str('oobw ok')] + filename = env.get('HTTP_X_WAIT_FOR_FILE') if filename is not None: while not file_exist(filename): time.sleep(0.01) - + + path = env['PATH_INFO'] if path == '/pid': body = os.getpid() elif path == '/env': @@ -64,6 +70,9 @@ def body(): written += len(data) start_response(status, [('Content-Type', 'text/plain')]) return body() + elif path == '/oobw': + start_response(status, [('Content-Type', 'text/plain'), ('X-Passenger-Request-OOB-Work', 'true')]) + return [str(os.getpid())] else: body = 'hello world' From a92312eeed71034a27d98c67af0c651600acdb29 Mon Sep 17 00:00:00 2001 From: Paul Kmiec Date: Fri, 9 Nov 2012 16:04:29 -0800 Subject: [PATCH 2/2] Adapted code to use the new DisableResult, avoiding nasty race condition with || process->detached() --- ext/common/ApplicationPool2/Group.h | 2 +- .../ApplicationPool2/Implementation.cpp | 42 +++++++++---------- test/cxx/RequestHandlerTest.cpp | 18 ++++---- 3 files changed, 29 insertions(+), 33 deletions(-) diff --git a/ext/common/ApplicationPool2/Group.h b/ext/common/ApplicationPool2/Group.h index 2eedd7452a..6d710fe39c 100644 --- a/ext/common/ApplicationPool2/Group.h +++ b/ext/common/ApplicationPool2/Group.h @@ -107,7 +107,7 @@ class Group: public enable_shared_from_this { static string generateSecret(const SuperGroupPtr &superGroup); void onSessionInitiateFailure(const ProcessPtr &process, Session *session); void onSessionClose(const ProcessPtr &process, Session *session); - void lockAndAsyncOOBWRequestIfNeeded(GroupPtr self, const ProcessPtr &process); + void lockAndAsyncOOBWRequestIfNeeded(const ProcessPtr &process, DisableResult result, GroupPtr self); void asyncOOBWRequestIfNeeded(const ProcessPtr &process); void spawnThreadOOBWRequest(GroupPtr self, const ProcessPtr &process); void spawnThreadMain(GroupPtr self, SpawnerPtr spawner, Options options); diff --git a/ext/common/ApplicationPool2/Implementation.cpp b/ext/common/ApplicationPool2/Implementation.cpp index 74de59db8b..e6db4fdd4b 100644 --- a/ext/common/ApplicationPool2/Implementation.cpp +++ b/ext/common/ApplicationPool2/Implementation.cpp @@ -329,7 +329,7 @@ Group::onSessionClose(const ProcessPtr &process, Session *session) { */ vector actions; removeProcessFromList(process, disablingProcesses); - addProcessToList(process, enabledProcesses); + addProcessToList(process, disabledProcesses); removeFromDisableWaitlist(process, DR_SUCCESS, actions); pool->verifyInvariants(); verifyInvariants(); @@ -348,7 +348,7 @@ Group::requestOOBW(const ProcessPtr &process) { } unique_lock lock(pool->syncher); pool = getPool(); - if (OXT_UNLIKELY(pool == NULL)) { + if (OXT_UNLIKELY(pool == NULL || process->detached())) { return; } @@ -357,9 +357,13 @@ Group::requestOOBW(const ProcessPtr &process) { // The 'self' parameter is for keeping the current Group object alive void -Group::lockAndAsyncOOBWRequestIfNeeded(GroupPtr self, const ProcessPtr &process) { +Group::lockAndAsyncOOBWRequestIfNeeded(const ProcessPtr &process, DisableResult result, GroupPtr self) { TRACE_POINT(); + if (result != DR_SUCCESS && result != DR_CANCELED) { + return; + } + // Standard resource management boilerplate stuff... PoolPtr pool = getPool(); if (OXT_UNLIKELY(pool == NULL)) { @@ -367,11 +371,10 @@ Group::lockAndAsyncOOBWRequestIfNeeded(GroupPtr self, const ProcessPtr &process) } unique_lock lock(pool->syncher); pool = getPool(); - if (OXT_UNLIKELY(pool == NULL)) { + if (OXT_UNLIKELY(pool == NULL || process->detached())) { return; } - assert(process->enabled == Process::DISABLED); asyncOOBWRequestIfNeeded(process); } @@ -390,12 +393,12 @@ Group::asyncOOBWRequestIfNeeded(const ProcessPtr &process) { // We want the process to be disabled. However, disabling a process is potentially // asynchronous, so we pass a callback which will re-aquire the lock and call this // method again. - disable(process, boost::bind(&Group::lockAndAsyncOOBWRequestIfNeeded, this, - shared_from_this(), process)); + DisableResult result = disable(process, + boost::bind(&Group::lockAndAsyncOOBWRequestIfNeeded, this, _1, _2, shared_from_this())); + if (result == DR_DEFERRED) { return; } } if (process->enabled != Process::DISABLED) { - // The process is still not disabled (perhaps it is in the process of disabling). return; } @@ -426,7 +429,7 @@ Group::spawnThreadOOBWRequest(GroupPtr self, const ProcessPtr &process) { } unique_lock lock(pool->syncher); pool = getPool(); - if (OXT_UNLIKELY(pool == NULL)) { + if (OXT_UNLIKELY(pool == NULL || process->detached())) { return; } @@ -435,18 +438,19 @@ Group::spawnThreadOOBWRequest(GroupPtr self, const ProcessPtr &process) { assert(process->sessions == 0); assert(process->enabled == Process::DISABLED); socket = process->sessionSockets.top(); - - lock.unlock(); + assert(socket != NULL); } unsigned long long timeout = 1000 * 1000 * 60; // 1 min try { + ScopeGuard guard(boost::bind(&Socket::checkinConnection, socket, connection)); + // Grab a connection. The connection is marked as fail in order to // ensure it is closed / recycled after this request (otherwise we'd // need to completely read the response). connection = socket->checkoutConnection(); connection.fail = true; - FileDescriptor theFd = FileDescriptor(connection.fd, false); + // This is copied from RequestHandler when it is sending data using the // "session" protocol. @@ -466,21 +470,16 @@ Group::spawnThreadOOBWRequest(GroupPtr self, const ProcessPtr &process) { } Uint32Message::generate(sizeField, dataSize); - gatheredWrite(theFd, &data[0], data.size(), &timeout); + gatheredWrite(connection.fd, &data[0], data.size(), &timeout); // We do not care what the actual response is ... just wait for it. - waitUntilReadable(theFd, &timeout); + waitUntilReadable(connection.fd, &timeout); } catch (const SystemException &e) { P_ERROR("*** ERROR: " << e.what() << "\n" << e.backtrace()); } catch (const TimeoutException &e) { P_ERROR("*** ERROR: " << e.what() << "\n" << e.backtrace()); } - - // Clean up - if (socket != NULL && connection.fd != -1) { - socket->checkinConnection(connection); - } - + vector actions; { // Standard resource management boilerplate stuff... @@ -503,7 +502,8 @@ Group::spawnThreadOOBWRequest(GroupPtr self, const ProcessPtr &process) { assignSessionsToGetWaiters(actions); verifyInvariants(); - lock.unlock(); + verifyExpensiveInvariants(); + pool->verifyInvariants(); } runAllActions(actions); } diff --git a/test/cxx/RequestHandlerTest.cpp b/test/cxx/RequestHandlerTest.cpp index 5b4b5e3cfd..f7dac76240 100644 --- a/test/cxx/RequestHandlerTest.cpp +++ b/test/cxx/RequestHandlerTest.cpp @@ -790,19 +790,18 @@ namespace tut { "PATH_INFO", "/oobw", NULL); string response = readAll(connection); - ensure(containsSubstring(response, "Status: 200 OK\r\n")); - ensure(!containsSubstring(response, "X-Passenger-Request-OOB-Work:")); + ensure("status is not 200", containsSubstring(response, "Status: 200 OK\r\n")); + ensure("contains oowb header", !containsSubstring(response, "X-Passenger-Request-OOB-Work:")); pid_t origPid = atoi(stripHeaders(response)); // Get a reference to the orignal process and verify oobw has been requested. ProcessPtr origProcess; { unique_lock lock(pool->syncher); - origProcess = pool->superGroups.get(wsgiAppPath)->defaultGroup->processes.front(); + origProcess = pool->superGroups.get(wsgiAppPath)->defaultGroup->disablingProcesses.front(); ensure(origProcess->oobwRequested); - lock.unlock(); } - ensure(origPid == origProcess->pid); // just a sanity check + ensure("sanity check", origPid == origProcess->pid); // just a sanity check // Issue requests until the new process handles it. pid_t pid; @@ -815,7 +814,6 @@ namespace tut { string response = readAll(connection); ensure(containsSubstring(response, "Status: 200 OK\r\n")); pid = atoi(stripHeaders(response)); - result = (pid != origPid); ); @@ -824,17 +822,15 @@ namespace tut { { unique_lock lock(pool->syncher); result = (origProcess->oobwRequested == 0); - lock.unlock(); } ); // Final asserts. { unique_lock lock(pool->syncher); - ensure(pool->superGroups.get(wsgiAppPath)->defaultGroup->processes.size() == 2); - ensure(origProcess->oobwRequested == 0); - ensure(origProcess->enabled == Process::ENABLED); - lock.unlock(); + ensure("2 enabled processes", pool->superGroups.get(wsgiAppPath)->defaultGroup->enabledProcesses.size() == 2); + ensure("oobw is reset", origProcess->oobwRequested == 0); + ensure("process is enabled", origProcess->enabled == Process::ENABLED); } }