diff --git a/bufferevent_filter.c b/bufferevent_filter.c index 5d5f992bdc..d47f9452bb 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -71,6 +71,9 @@ static int be_filter_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode mode); static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); +static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf, + const struct evbuffer_cb_info *cbinfo, void *arg); + static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *info, void *arg); @@ -79,6 +82,8 @@ struct bufferevent_filtered { /** The bufferevent that we read/write filtered data from/to. */ struct bufferevent *underlying; + /** A callback on our inbuf to notice somebory removes data */ + struct evbuffer_cb_entry *inbuf_cb; /** A callback on our outbuf to notice when somebody adds data */ struct evbuffer_cb_entry *outbuf_cb; /** True iff we have received an EOF callback from the underlying @@ -203,6 +208,11 @@ bufferevent_filter_new(struct bufferevent *underlying, bufferevent_setcb(bufev_f->underlying, be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f); + bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input, + bufferevent_filtered_inbuf_cb, bufev_f); + evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb, + EVBUFFER_CB_ENABLED); + bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, bufferevent_filtered_outbuf_cb, bufev_f); @@ -251,6 +261,12 @@ be_filter_destruct(struct bufferevent *bev) EVUTIL_ASSERT(bevf); if (bevf->free_context) bevf->free_context(bevf->context); + + if (bevf->inbuf_cb) + evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb); + + if (bevf->outbuf_cb) + evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb); } static int @@ -418,9 +434,8 @@ bufferevent_filtered_outbuf_cb(struct evbuffer *buf, } } -/* Called when the underlying socket has read. */ static void -be_filter_readcb(struct bufferevent *underlying, void *me_) +be_filter_read_nolock_(struct bufferevent *underlying, void *me_) { struct bufferevent_filtered *bevf = me_; enum bufferevent_filter_result res; @@ -429,8 +444,6 @@ be_filter_readcb(struct bufferevent *underlying, void *me_) struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); int processed_any = 0; - BEV_LOCK(bufev); - // It's possible our refcount is 0 at this point if another thread free'd our filterevent EVUTIL_ASSERT(bufev_private->refcnt >= 0); @@ -449,11 +462,65 @@ be_filter_readcb(struct bufferevent *underlying, void *me_) /* XXX This should be in process_input, not here. There are * other places that can call process-input, and they should * force readcb calls as needed. */ - if (processed_any) + if (processed_any) { bufferevent_trigger_nolock_(bufev, EV_READ, 0); + if (evbuffer_get_length(underlying->input) > 0 && + be_readbuf_full(bevf, state)) { + /* data left in underlying buffer and filter input buffer + * hit its read high watermark. + * Schedule callback to avoid data gets stuck in underlying + * input buffer. + */ + evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb, + EVBUFFER_CB_ENABLED); + } + } } +} - BEV_UNLOCK(bufev); +/* Called when the size of our inbuf changes. */ +static void +bufferevent_filtered_inbuf_cb(struct evbuffer *buf, + const struct evbuffer_cb_info *cbinfo, void *arg) +{ + struct bufferevent_filtered *bevf = arg; + enum bufferevent_flush_mode state; + struct bufferevent *bev = downcast(bevf); + + BEV_LOCK(bev); + + if (bevf->got_eof) + state = BEV_FINISHED; + else + state = BEV_NORMAL; + + + if (!be_readbuf_full(bevf, state)) { + /* opportunity to read data which was left in underlying + * input buffer because filter input buffer hit read + * high watermark. + */ + evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb, + EVBUFFER_CB_ENABLED); + if (evbuffer_get_length(bevf->underlying->input) > 0) + be_filter_read_nolock_(bevf->underlying, bevf); + } + + BEV_UNLOCK(bev); +} + +/* Called when the underlying socket has read. */ +static void +be_filter_readcb(struct bufferevent *underlying, void *me_) +{ + struct bufferevent_filtered *bevf = me_; + struct bufferevent *bev = downcast(bevf); + + BEV_LOCK(bev); + + be_filter_read_nolock_(underlying, me_); + + BEV_UNLOCK(bev); } /* Called when the underlying socket has drained enough that we can write to