Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

shutdown-and-flushing: Improve shutdown, support/test FlushHtml #936

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
126 changes: 89 additions & 37 deletions src/ngx_base_fetch.cc
Expand Up @@ -16,6 +16,7 @@

// Author: jefftk@google.com (Jeff Kaufman)
#include <unistd.h> //for usleep
#include <inttypes.h> // PRId64

#include "ngx_base_fetch.h"
#include "ngx_event_connection.h"
Expand All @@ -36,7 +37,29 @@ const char kFlush = 'F';
const char kDone = 'D';

NgxEventConnection* NgxBaseFetch::event_connection = NULL;
int NgxBaseFetch::active_base_fetches = 0;
// We'll set this event to uncancelable to prevent nginx from exiting before we
// are up for it. See ngx_worker_process_cycle().
ngx_event_t* NgxBaseFetch::shutdown_event = NULL;
int64 NgxBaseFetch::active_base_fetches = 0;
int64 NgxBaseFetch::request_ctx_count = 0;


// Periodically checks whether the conditions are met for ::Terminate() to be
// called:
// - A forced/quick shutdown, e.g. SIGTERM was received.
// - A graceful shutdown, e.g. SIGQUIT was recevied, after all outstanding
// work as been finished.
// When one of those conditions is met, ev->cancelable will have been set to
// false by us, and nginx will end up calling ::Terminate() next after calling
// Done(false) on any outstanding proxy fetches.
// Terminate() will clear out any pending events to make sure we release all
// associated NgxBaseFetch instances.
static void ps_base_fetch_shutdown_event_handler(ngx_event_t *ev) {
NgxBaseFetch::CheckShutdownEvent();
if (!ev->cancelable) {
ngx_add_timer(ev, 1000);
}
}

NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
NgxServerContext* server_context,
Expand Down Expand Up @@ -65,40 +88,56 @@ NgxBaseFetch::~NgxBaseFetch() {
bool NgxBaseFetch::Initialize(ngx_cycle_t* cycle) {
CHECK(event_connection == NULL) << "event connection already set";
event_connection = new NgxEventConnection(ReadCallback);

shutdown_event = reinterpret_cast<ngx_event_t*>(
ngx_pcalloc(cycle->pool, sizeof(ngx_event_t)));
shutdown_event->handler = ps_base_fetch_shutdown_event_handler;

shutdown_event->data = cycle;
shutdown_event->log = cycle->log;
// Prevents nginx from exiting until we are up for it.
shutdown_event->cancelable = 0;
ngx_add_timer(shutdown_event, 1000);

return event_connection->Init(cycle);
}

void NgxBaseFetch::Terminate() {
if (event_connection != NULL) {
GoogleMessageHandler handler;
PosixTimer timer;
int64 timeout_us = Timer::kSecondUs * 30;
// A second should be more then enough.
int64 timeout_us = Timer::kSecondUs;
int64 end_us = timer.NowUs() + timeout_us;
static unsigned int sleep_microseconds = 100;

handler.Message(
kInfo,"NgxBaseFetch::Terminate rounding up %d active base fetches.",
kInfo,"NgxBaseFetch::Terminate start: %" PRId64 " base fetches.",
NgxBaseFetch::active_base_fetches);

// Try to continue processing and get the active base fetch count to 0
// untill the timeout expires.
// TODO(oschaaf): This needs more work.
// Drain any events after a quick or graceful shutdown to clear out
// associated NgxBaseFetch instances.
while (NgxBaseFetch::active_base_fetches > 0 && end_us > timer.NowUs()) {
event_connection->Drain();
usleep(sleep_microseconds);
}

if (NgxBaseFetch::active_base_fetches != 0) {
DCHECK_EQ(NgxBaseFetch::active_base_fetches, 0);
DCHECK_EQ(NgxBaseFetch::request_ctx_count, 0);

if (NgxBaseFetch::active_base_fetches != 0 || NgxBaseFetch::request_ctx_count) {
handler.Message(
kWarning,"NgxBaseFetch::Terminate timed out with %d active base fetches.",
NgxBaseFetch::active_base_fetches);
kWarning,"NgxBaseFetch::Terminate exit: %" PRId64 " base fetches, "
"%" PRId64 " request contexts", NgxBaseFetch::active_base_fetches,
NgxBaseFetch::request_ctx_count);
}

// Close down the named pipe.
event_connection->Shutdown();
delete event_connection;
event_connection = NULL;
}
NgxBaseFetch::CheckShutdownEvent();
}

const char* BaseFetchTypeToCStr(NgxBaseFetchType type) {
Expand Down Expand Up @@ -133,36 +172,38 @@ void NgxBaseFetch::ReadCallback(const ps_event_data& data) {

// If we ended up destructing the base fetch, or the request context is
// detached, skip this event.
if (refcount == 0 || detached) {
return;
}
ps_request_ctx_t* ctx = ps_get_request_context(r);

CHECK(data.sender == ctx->base_fetch);
CHECK(r->count > 0) << "r->count: " << r->count;

int rc;
// If we are unlucky enough to have our connection finalized mid-ipro-lookup,
// we must enter a different flow. Also see ps_in_place_check_header_filter().
if ((ctx->base_fetch->base_fetch_type_ != kIproLookup)
&& r->connection->error) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] request already finalized", r);
rc = NGX_ERROR;
} else {
rc = ps_base_fetch::ps_base_fetch_handler(r);
}

if (refcount != 0 && !detached) {
ps_request_ctx_t* ctx = ps_get_request_context(r);

CHECK(data.sender == ctx->base_fetch);
CHECK(r->count > 0) << "r->count: " << r->count;

int rc;
// If we are unlucky enough to have our connection finalized mid-ipro-lookup,
// we must enter a different flow. Also see ps_in_place_check_header_filter().
if ((ctx->base_fetch->base_fetch_type_ != kIproLookup)
&& r->connection->error) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] request already finalized", r);
rc = NGX_ERROR;
} else {
rc = ps_base_fetch::ps_base_fetch_handler(r);
}
#if (NGX_DEBUG)
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] ps_base_fetch_handler() returned %d for %c",
r, rc, data.type);
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] ps_base_fetch_handler() returned %d for %c",
r, rc, data.type);
#endif
if (ngx_terminate) {
return;
}

ngx_connection_t* c = r->connection;
ngx_http_finalize_request(r, rc);
// See http://forum.nginx.org/read.php?2,253006,253061
ngx_http_run_posted_requests(c);
ngx_connection_t* c = r->connection;
ngx_http_finalize_request(r, rc);
// See http://forum.nginx.org/read.php?2,253006,253061
ngx_http_run_posted_requests(c);
}
NgxBaseFetch::CheckShutdownEvent();
}

void NgxBaseFetch::Lock() {
Expand Down Expand Up @@ -193,7 +234,8 @@ ngx_int_t NgxBaseFetch::CopyBufferToNginx(ngx_chain_t** link_ptr) {
}

int rc = string_piece_to_buffer_chain(
request_->pool, buffer_, link_ptr, done_called_ /* send_last_buf */);
request_->pool, buffer_, link_ptr, done_called_ /* send_last_buf */,
true /* flush */);
if (rc != NGX_OK) {
return rc;
}
Expand Down Expand Up @@ -292,6 +334,16 @@ int NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
return r;
}

void NgxBaseFetch::CheckShutdownEvent() {
bool quit = ngx_terminate || ((ngx_quit || ngx_exiting) &&
NgxBaseFetch::request_ctx_count == 0);
if (NgxBaseFetch::shutdown_event != NULL && quit) {
NgxBaseFetch::shutdown_event->cancelable = 1;
NgxBaseFetch::shutdown_event = NULL;
}
}


void NgxBaseFetch::HandleDone(bool success) {
// TODO(jefftk): it's possible that instead of locking here we can just modify
// CopyBufferToNginx to only read done_called_ once.
Expand Down
6 changes: 4 additions & 2 deletions src/ngx_base_fetch.h
Expand Up @@ -118,11 +118,13 @@ class NgxBaseFetch : public AsyncFetch {
// this to be able to handle events which nginx request context has been
// released while the event was in-flight.
void Detach() { detached_ = true; DecrementRefCount(); }
static void CheckShutdownEvent();

bool detached() { return detached_; }

ngx_http_request_t* request() { return request_; }
NgxBaseFetchType base_fetch_type() { return base_fetch_type_; }
static int64 request_ctx_count;

private:
virtual bool HandleWrite(const StringPiece& sp, MessageHandler* handler);
Expand All @@ -149,11 +151,11 @@ class NgxBaseFetch : public AsyncFetch {
// Called by Done() and Release(). Decrements our reference count, and if
// it's zero we delete ourself.
int DecrefAndDeleteIfUnreferenced();

static NgxEventConnection* event_connection;
static ngx_event_t* shutdown_event;

// Live count of NgxBaseFetch instances that are currently in use.
static int active_base_fetches;
static int64 active_base_fetches;

ngx_http_request_t* request_;
GoogleString buffer_;
Expand Down
6 changes: 4 additions & 2 deletions src/ngx_fetch.cc
Expand Up @@ -508,11 +508,13 @@ bool NgxFetch::ParseUrl() {
url_.url.len = str_url_.length();
url_.url.data = static_cast<u_char*>(ngx_palloc(pool_, url_.url.len));
if (url_.url.data == NULL) {
DCHECK(false) << "NgxFetch::ParseUrl() without data";
return false;
}
str_url_.copy(reinterpret_cast<char*>(url_.url.data), str_url_.length(), 0);

return NgxUrlAsyncFetcher::ParseUrl(&url_, pool_);
bool r = NgxUrlAsyncFetcher::ParseUrl(&url_, pool_);
DCHECK(r) << "NgxUrlAsyncFetcher::ParseUrl(&url_, pool_) failed";
return r;
}

// Issue a request after the resolver is done
Expand Down