Skip to content
This repository has been archived by the owner on Apr 21, 2023. It is now read-only.

Oschaaf single fd fixed #876

Merged
merged 2 commits into from
Jan 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ if [ $ngx_found = yes ]; then
$ps_src/log_message_handler.h \
$ps_src/ngx_base_fetch.h \
$ps_src/ngx_caching_headers.h \
$ps_src/ngx_event_connection.h \
$ps_src/ngx_fetch.h \
$ps_src/ngx_gzip_setter.h \
$ps_src/ngx_list_iterator.h \
Expand All @@ -183,6 +184,7 @@ if [ $ngx_found = yes ]; then
$ps_src/log_message_handler.cc \
$ps_src/ngx_base_fetch.cc \
$ps_src/ngx_caching_headers.cc \
$ps_src/ngx_event_connection.cc \
$ps_src/ngx_fetch.cc \
$ps_src/ngx_gzip_setter.cc \
$ps_src/ngx_list_iterator.cc \
Expand Down
124 changes: 95 additions & 29 deletions src/ngx_base_fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// Author: jefftk@google.com (Jeff Kaufman)

#include "ngx_base_fetch.h"
#include "ngx_event_connection.h"
#include "ngx_list_iterator.h"

#include "ngx_pagespeed.h"
Expand All @@ -28,7 +29,13 @@

namespace net_instaweb {

NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
const char kHeadersComplete = 'H';
const char kFlush = 'F';
const char kDone = 'D';

NgxEventConnection* NgxBaseFetch::event_connection = NULL;

NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r,
NgxServerContext* server_context,
const RequestContextPtr& request_ctx,
PreserveCachingHeaders preserve_caching_headers)
Expand All @@ -37,17 +44,78 @@ NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
server_context_(server_context),
done_called_(false),
last_buf_sent_(false),
pipe_fd_(pipe_fd),
references_(2),
ipro_lookup_(false),
preserve_caching_headers_(preserve_caching_headers) {
preserve_caching_headers_(preserve_caching_headers),
detached_(false) {
if (pthread_mutex_init(&mutex_, NULL)) CHECK(0);
}

NgxBaseFetch::~NgxBaseFetch() {
pthread_mutex_destroy(&mutex_);
}

bool NgxBaseFetch::Initialize(ngx_cycle_t* cycle) {
CHECK(event_connection == NULL) << "event connection already set";
event_connection = new NgxEventConnection(ReadCallback);
return event_connection->Init(cycle);
}

void NgxBaseFetch::Terminate() {
if (event_connection != NULL) {
event_connection->Shutdown();
delete event_connection;
event_connection = NULL;
}
}

void NgxBaseFetch::ReadCallback(const ps_event_data& data) {
NgxBaseFetch* base_fetch = reinterpret_cast<NgxBaseFetch*>(data.sender);
ngx_http_request_t* r = base_fetch->request();
bool detached = base_fetch->detached();
int refcount = base_fetch->DecrementRefCount();

#if (NGX_DEBUG)
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] event: %c. bf:%p - refcnt:%d - det: %c", r,
data.type, base_fetch, refcount, detached ? 'Y': 'N');
#endif

// If we ended up destructing the base fetch, or the request context is
// detached, skip this event.
if (refcount == 0 || detached) {
return;
}
ps_request_ctx_t* ctx = ps_get_request_context(r);
CHECK(data.sender == ctx->base_fetch);

// ngx_base_fetch_handler() ends up setting ctx->fetch_done, which
// means we shouldn't call it anymore.
if (ctx->fetch_done) {
return;
}

CHECK(r->count > 0) << "r->count: " << r->count;

// If we are unlucky enough to have our connection finalized mid-ipro-lookup,
// we must enter a different flow. Also see ps_in_place_check_header_filter().
if (!ctx->base_fetch->ipro_lookup_ && r->connection->error) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] request already finalized", r);
ngx_http_finalize_request(r, NGX_ERROR);
return;
}

int rc = ps_base_fetch::ps_base_fetch_handler(r);

#if (NGX_DEBUG)
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
"pagespeed [%p] ps_base_fetch_handler() returned %d for %c",
r, rc, data.type);
#endif
ngx_http_finalize_request(r, rc);
}

void NgxBaseFetch::Lock() {
pthread_mutex_lock(&mutex_);
}
Expand Down Expand Up @@ -115,21 +183,15 @@ ngx_int_t NgxBaseFetch::CollectHeaders(ngx_http_headers_out_t* headers_out) {
preserve_caching_headers_);
}

void NgxBaseFetch::RequestCollection() {
int rc;
char c = 'A'; // What byte we write is arbitrary.
while (true) {
rc = write(pipe_fd_, &c, 1);
if (rc == 1) {
break;
} else if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// TODO(jefftk): is this rare enough that spinning isn't a problem? Could
// we get into a case where the pipe fills up and we spin forever?

} else {
perror("NgxBaseFetch::RequestCollection");
break;
}
void NgxBaseFetch::RequestCollection(char type) {
// We must optimistically increment the refcount, and decrement it
// when we conclude we failed. If we only increment on a successfull write,
// there's a small chance that between writing and adding to the refcount
// both pagespeed and nginx will release their refcount -- destructing
// this NgxBaseFetch instance.
IncrementRefCount();
if (!event_connection->WriteEvent(type, this)) {
DecrementRefCount();
}
}

Expand All @@ -147,38 +209,42 @@ void NgxBaseFetch::HandleHeadersComplete() {
// For the IPRO lookup, supress notification of the nginx side here.
// If we send both this event and the one from done, nasty stuff will happen
// if we loose the race with with the nginx side destructing this base fetch
// instance (and thereby clearing the byte and its pending extraneous event.
// instance (and thereby clearing the byte and its pending extraneous event).
if (!ipro_lookup_) {
RequestCollection(); // Headers available.
RequestCollection(kHeadersComplete); // Headers available.
}
}

bool NgxBaseFetch::HandleFlush(MessageHandler* handler) {
RequestCollection(); // A new part of the response body is available.
RequestCollection(kFlush); // A new part of the response body is available
return true;
}

void NgxBaseFetch::Release() {
DecrefAndDeleteIfUnreferenced();
int NgxBaseFetch::DecrementRefCount() {
return DecrefAndDeleteIfUnreferenced();
}

int NgxBaseFetch::IncrementRefCount() {
return __sync_add_and_fetch(&references_, 1);
}

void NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
int NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
// Creates a full memory barrier.
if (__sync_add_and_fetch(&references_, -1) == 0) {
int r = __sync_add_and_fetch(&references_, -1);
if (r == 0) {
delete this;
}
return r;
}

void NgxBaseFetch::HandleDone(bool success) {
// TODO(jefftk): it's possible that instead of locking here we can just modify
// CopyBufferToNginx to only read done_called_ once.
CHECK(!done_called_) << "Done already called!";
Lock();
done_called_ = true;
Unlock();

close(pipe_fd_); // Indicates to nginx that we're done with the rewrite.
pipe_fd_ = -1;

RequestCollection(kDone);
DecrefAndDeleteIfUnreferenced();
}

Expand Down
72 changes: 54 additions & 18 deletions src/ngx_base_fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,32 @@
// Author: jefftk@google.com (Jeff Kaufman)
//
// Collects output from pagespeed and buffers it until nginx asks for it.
// Notifies nginx via pipe to call CollectAccumulatedWrites() on flush.
// Notifies nginx via NgxEventConnection to call ReadCallback() when
// the headers are computed, when a flush should be performed, and when done.
//
// - nginx creates a base fetch and passes it to a new proxy fetch.
// - The proxy fetch manages rewriting and thread complexity, and through
// several chained steps passes rewritten html to HandleWrite().
// - Written data is buffered.
// - When Flush() is called the base fetch writes a byte to a pipe nginx is
// watching so nginx knows to call CollectAccumulatedWrites() to pick up the
// rewritten html.
// - When Done() is called the base fetch closes the pipe, which tells nginx to
// make a final call to CollectAccumulatedWrites().
// - When HandleHeadersComplete(), HandleFlush(), or HandleDone() is called by
// PSOL, events are written to NgxEventConnection which will end up being
// handled by ReadCallback() on nginx's thread.
// When applicable, request processing will be continued via a call to
// ps_base_fetch_handler().
// - ps_base_fetch_handler() will pull the header and body bytes from PSOL
// via CollectAccumulatedWrites() and write those to the module's output.
//
// This class is referred two in two places: the proxy fetch and nginx's
// request. It must stay alive until both are finished. The proxy fetch will
// call Done() to indicate this; nginx will call Release(). Once both Done()
// and Release() have been called this class will delete itself.
// This class is referred to in three places: the proxy fetch, nginx's request,
// and pending events written to the associated NgxEventConnection. It must stay
// alive until the proxy fetch and nginx request are finished, and no more
// events are pending.
// - The proxy fetch will call Done() to indicate this.
// - nginx will call Detach() when the associated request is handled
// completely (e.g. the request context is about to be destroyed).
// - ReadCallback() will call DecrementRefCount() on instances associated to
// events it handles.
//
// When the last reference is dropped, this class will delete itself.

#ifndef NGX_BASE_FETCH_H_
#define NGX_BASE_FETCH_H_
Expand All @@ -45,6 +55,7 @@ extern "C" {

#include "ngx_pagespeed.h"

#include "ngx_event_connection.h"
#include "ngx_server_context.h"

#include "net/instaweb/http/public/async_fetch.h"
Expand All @@ -53,13 +64,19 @@ extern "C" {

namespace net_instaweb {


class NgxBaseFetch : public AsyncFetch {
public:
NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
NgxServerContext* server_context,
NgxBaseFetch(ngx_http_request_t* r, NgxServerContext* server_context,
const RequestContextPtr& request_ctx,
PreserveCachingHeaders preserve_caching_headers);
virtual ~NgxBaseFetch();
// Statically initializes event_connection, require for PSOL and nginx to
// communicate.
static bool Initialize(ngx_cycle_t* cycle);
// Statically terminates and NULLS event_connection.
static void Terminate();
static void ReadCallback(const ps_event_data& data);

// Puts a chain in link_ptr if we have any output data buffered. Returns
// NGX_OK on success, NGX_ERROR on errors. If there's no data to send, sends
Expand All @@ -77,10 +94,24 @@ class NgxBaseFetch : public AsyncFetch {
// time for resource fetches. Not called at all for proxy fetches.
ngx_int_t CollectHeaders(ngx_http_headers_out_t* headers_out);

// Called by nginx when it's done with us.
void Release();
// Called by nginx to decrement the refcount.
int DecrementRefCount();

// Called by pagespeed to increment the refcount.
int IncrementRefCount();

void set_ipro_lookup(bool x) { ipro_lookup_ = x; }

// Detach() is called when the nginx side releases this base fetch. It
// sets detached_ to true and decrements the refcount. We need to know
// this to be able to handle events which nginx request context has been
// released while the event was in-flight.
void Detach() { detached_ = true; DecrementRefCount(); }

bool detached() { return detached_; }

ngx_http_request_t* request() { return request_; }

private:
virtual bool HandleWrite(const StringPiece& sp, MessageHandler* handler);
virtual bool HandleFlush(MessageHandler* handler);
Expand All @@ -89,7 +120,7 @@ class NgxBaseFetch : public AsyncFetch {

// Indicate to nginx that we would like it to call
// CollectAccumulatedWrites().
void RequestCollection();
void RequestCollection(char type);

// Lock must be acquired first.
// Returns:
Expand All @@ -105,20 +136,25 @@ class NgxBaseFetch : public AsyncFetch {

// Called by Done() and Release(). Decrements our reference count, and if
// it's zero we delete ourself.
void DecrefAndDeleteIfUnreferenced();
int DecrefAndDeleteIfUnreferenced();

static NgxEventConnection* event_connection;

ngx_http_request_t* request_;
GoogleString buffer_;
NgxServerContext* server_context_;
bool done_called_;
bool last_buf_sent_;
int pipe_fd_;
// How many active references there are to this fetch. Starts at two,
// decremented once when Done() is called and once when Release() is called.
// decremented once when Done() is called and once when Detach() is called.
// Incremented for each event written by pagespeed for this NgxBaseFetch, and
// decremented on the nginx side for each event read for it.
int references_;
pthread_mutex_t mutex_;
bool ipro_lookup_;
PreserveCachingHeaders preserve_caching_headers_;
// Set to true just before the nginx side releases its reference
bool detached_;

DISALLOW_COPY_AND_ASSIGN(NgxBaseFetch);
};
Expand Down
Loading