Skip to content

Commit

Permalink
Refactor our 'suspend operation' logic on bufferevents.
Browse files Browse the repository at this point in the history
There are lots of things we do internally in bufferevents to indicate
"the user would like this operation to happen, but we aren't going to
try until some other condition goes away."  Our logic here has gotten
entirely too complicated.

This patch tries to fix that by adding the idea of 'suspend flags' for
read and write.  To say "don't bother reading or writing until
condition X no longer holds," bufferevent_suspend_read/write(bev,
BEV_SUSPEND_X).  When X no longer holds, call
bufferevent_unsuspend_read/write(bev, BEV_SUSPEND_X).

Right now, only the read-watermark logic uses this.
  • Loading branch information
nmathewson committed Dec 4, 2009
1 parent 438f9ed commit 0d744aa
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 19 deletions.
60 changes: 52 additions & 8 deletions bufferevent-internal.h
Expand Up @@ -36,6 +36,28 @@ extern "C" {
#include "evthread-internal.h"
#include "event2/thread.h"

/* These flags are reasons that we might be declining to actually enable
reading or writing on a bufferevent.
*/

/* On a all bufferevents, for reading: used when we have read up to the
watermark value.
On a filtering bufferxevent, for writing: used when the underlying
bufferevent's write buffer has been filled up to its watermark
value.
*/
#define BEV_SUSPEND_WM 0x01
/* On a base bufferevent: when we have used up our bandwidth buckets. */
#define BEV_SUSPEND_BW 0x02

struct token_bucket {
uint32_t limit;
uint32_t rate;
uint32_t burst;
unsigned last_updated;
};

/** Parts of the bufferevent structure that are shared among all bufferevent
* types, but not exposed in bufferevent_struct.h. */
struct bufferevent_private {
Expand All @@ -45,8 +67,6 @@ struct bufferevent_private {
/** Evbuffer callback to enforce watermarks on input. */
struct evbuffer_cb_entry *read_watermarks_cb;

/** If set, read is suspended until evbuffer some. */
unsigned read_suspended : 1;
/** If set, we should free the lock when we free the bufferevent. */
unsigned own_lock : 1;

Expand All @@ -61,9 +81,21 @@ struct bufferevent_private {
/** Set to the events pending if we have deferred callbacks and
* an events callback is pending. */
short eventcb_pending;

/** If set, read is suspended until one or more conditions are over.
* The actual value here is a bitfield of those conditions; see the
* BEV_SUSPEND_* flags above. */
short read_suspended;

/** If set, writing is suspended until one or more conditions are over.
* The actual value here is a bitfield of those conditions; see the
* BEV_SUSPEND_* flags above. */
short write_suspended;

/** Set to the current socket errno if we have deferred callbacks and
* an events callback is pending. */
int errno_pending;

/** Used to implement deferred callbacks */
struct deferred_cb deferred;

Expand Down Expand Up @@ -155,12 +187,24 @@ extern const struct bufferevent_ops bufferevent_ops_async;
/** Initialize the shared parts of a bufferevent. */
int bufferevent_init_common(struct bufferevent_private *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options);

/** For internal use: temporarily stop all reads on bufev, because its
* read buffer is too full. */
void bufferevent_wm_suspend_read(struct bufferevent *bufev);
/** For internal use: temporarily stop all reads on bufev, because its
* read buffer is too full. */
void bufferevent_wm_unsuspend_read(struct bufferevent *bufev);
/** For internal use: temporarily stop all reads on bufev, until the conditions
* in 'what' are over. */
void bufferevent_suspend_read(struct bufferevent *bufev, short what);
/** For internal use: clear the conditions 'what' on bufev, and re-enable
* reading if there are no conditions left. */
void bufferevent_unsuspend_read(struct bufferevent *bufev, short what);

/** For internal use: temporarily stop all writes on bufev, until the conditions
* in 'what' are over. */
void bufferevent_suspend_write(struct bufferevent *bufev, short what);
/** For internal use: clear the conditions 'what' on bufev, and re-enable
* writing if there are no conditions left. */
void bufferevent_unsuspend_write(struct bufferevent *bufev, short what);

#define bufferevent_wm_suspend_read(b) \
bufferevent_suspend_read((b), BEV_SUSPEND_WM)
#define bufferevent_wm_unsuspend_read(b) \
bufferevent_unsuspend_read((b), BEV_SUSPEND_WM)

/** Internal: Set up locking on a bufferevent. If lock is set, use it.
* Otherwise, use a new lock. */
Expand Down
45 changes: 34 additions & 11 deletions bufferevent.c
Expand Up @@ -60,33 +60,54 @@
#include "util-internal.h"

void
bufferevent_wm_suspend_read(struct bufferevent *bufev)
bufferevent_suspend_read(struct bufferevent *bufev, short what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (!bufev_private->read_suspended) {
if (!bufev_private->read_suspended)
bufev->be_ops->disable(bufev, EV_READ);
bufev_private->read_suspended = 1;
}
bufev_private->read_suspended |= what;
BEV_UNLOCK(bufev);
}

void
bufferevent_wm_unsuspend_read(struct bufferevent *bufev)
bufferevent_unsuspend_read(struct bufferevent *bufev, short what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
bufev_private->read_suspended &= ~what;
if (!bufev_private->read_suspended)
bufev->be_ops->enable(bufev, EV_READ);
BEV_UNLOCK(bufev);
}

void
bufferevent_suspend_write(struct bufferevent *bufev, short what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (bufev_private->read_suspended) {
bufev_private->read_suspended = 0;
if (bufev->enabled & EV_READ)
bufev->be_ops->enable(bufev, EV_READ);
}
if (!bufev_private->write_suspended)
bufev->be_ops->disable(bufev, EV_WRITE);
bufev_private->write_suspended |= what;
BEV_UNLOCK(bufev);
}

void
bufferevent_unsuspend_write(struct bufferevent *bufev, short what)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
bufev_private->write_suspended &= ~what;
if (!bufev_private->write_suspended)
bufev->be_ops->enable(bufev, EV_WRITE);
BEV_UNLOCK(bufev);
}


/* Callback to implement watermarks on the input buffer. Only enabled
* if the watermark is set. */
static void
Expand Down Expand Up @@ -338,10 +359,12 @@ bufferevent_enable(struct bufferevent *bufev, short event)
_bufferevent_incref_and_lock(bufev);
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
if (bufev_private->write_suspended)
impl_events &= ~EV_WRITE;

bufev->enabled |= event;

if (bufev->be_ops->enable(bufev, impl_events) < 0)
if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1;

_bufferevent_decref_and_unlock(bufev);
Expand Down

0 comments on commit 0d744aa

Please sign in to comment.