Skip to content

Commit

Permalink
be_filter: avoid data stuck under active watermarks
Browse files Browse the repository at this point in the history
Suppose we have bufferevent filter attached to bufferevent socket.
Read high watermark for bufferevent filter is configured to 4096 bytes.
Socket receives 4343 bytes. Due to watermark, 4096 bytes are transferred
from socket input buffer to filter input buffer and 247 bytes are left
in bufferevent socket.
Suppose that no more data is received through socket.

At this point 247 bytes will sit forever in input buffer of bufferevent
socket.
The patch attached solves this issue registering read callback to
filter's input buffer if it reaches its read high water mark and data
was left in corresponding underlying's input buffer.

This read callback calls filter process input function as soon as filter
input buffer falls below its read high watermark and there still is data
left in underlying input buffer. Callback is deregistered as soon as
filter input buffer falls below its read high watermark.
  • Loading branch information
Eduardo Panisset authored and azat committed Jun 19, 2016
1 parent 2851889 commit b627ad8
Showing 1 changed file with 73 additions and 6 deletions.
79 changes: 73 additions & 6 deletions bufferevent_filter.c
Expand Up @@ -71,6 +71,9 @@ static int be_filter_flush(struct bufferevent *bufev,
short iotype, enum bufferevent_flush_mode mode);
static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);

static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo, void *arg);

static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *info, void *arg);

Expand All @@ -79,6 +82,8 @@ struct bufferevent_filtered {

/** The bufferevent that we read/write filtered data from/to. */
struct bufferevent *underlying;
/** A callback on our inbuf to notice somebory removes data */
struct evbuffer_cb_entry *inbuf_cb;
/** A callback on our outbuf to notice when somebody adds data */
struct evbuffer_cb_entry *outbuf_cb;
/** True iff we have received an EOF callback from the underlying
Expand Down Expand Up @@ -203,6 +208,11 @@ bufferevent_filter_new(struct bufferevent *underlying,
bufferevent_setcb(bufev_f->underlying,
be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);

bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input,
bufferevent_filtered_inbuf_cb, bufev_f);
evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb,
EVBUFFER_CB_ENABLED);

bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
bufferevent_filtered_outbuf_cb, bufev_f);

Expand Down Expand Up @@ -251,6 +261,12 @@ be_filter_destruct(struct bufferevent *bev)
EVUTIL_ASSERT(bevf);
if (bevf->free_context)
bevf->free_context(bevf->context);

if (bevf->inbuf_cb)
evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb);

if (bevf->outbuf_cb)
evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb);
}

static int
Expand Down Expand Up @@ -418,9 +434,8 @@ bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
}
}

/* Called when the underlying socket has read. */
static void
be_filter_readcb(struct bufferevent *underlying, void *me_)
be_filter_read_nolock_(struct bufferevent *underlying, void *me_)
{
struct bufferevent_filtered *bevf = me_;
enum bufferevent_filter_result res;
Expand All @@ -429,8 +444,6 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
int processed_any = 0;

BEV_LOCK(bufev);

// It's possible our refcount is 0 at this point if another thread free'd our filterevent
EVUTIL_ASSERT(bufev_private->refcnt >= 0);

Expand All @@ -449,11 +462,65 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
/* XXX This should be in process_input, not here. There are
* other places that can call process-input, and they should
* force readcb calls as needed. */
if (processed_any)
if (processed_any) {
bufferevent_trigger_nolock_(bufev, EV_READ, 0);
if (evbuffer_get_length(underlying->input) > 0 &&
be_readbuf_full(bevf, state)) {
/* data left in underlying buffer and filter input buffer
* hit its read high watermark.
* Schedule callback to avoid data gets stuck in underlying
* input buffer.
*/
evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb,
EVBUFFER_CB_ENABLED);
}
}
}
}

BEV_UNLOCK(bufev);
/* Called when the size of our inbuf changes. */
static void
bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo, void *arg)
{
struct bufferevent_filtered *bevf = arg;
enum bufferevent_flush_mode state;
struct bufferevent *bev = downcast(bevf);

BEV_LOCK(bev);

if (bevf->got_eof)
state = BEV_FINISHED;
else
state = BEV_NORMAL;


if (!be_readbuf_full(bevf, state)) {
/* opportunity to read data which was left in underlying
* input buffer because filter input buffer hit read
* high watermark.
*/
evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb,
EVBUFFER_CB_ENABLED);
if (evbuffer_get_length(bevf->underlying->input) > 0)
be_filter_read_nolock_(bevf->underlying, bevf);
}

BEV_UNLOCK(bev);
}

/* Called when the underlying socket has read. */
static void
be_filter_readcb(struct bufferevent *underlying, void *me_)
{
struct bufferevent_filtered *bevf = me_;
struct bufferevent *bev = downcast(bevf);

BEV_LOCK(bev);

be_filter_read_nolock_(underlying, me_);

BEV_UNLOCK(bev);
}

/* Called when the underlying socket has drained enough that we can write to
Expand Down

0 comments on commit b627ad8

Please sign in to comment.