Skip to content

Commit

Permalink
Locking support for bufferevents.
Browse files Browse the repository at this point in the history
svn:r1170
  • Loading branch information
nmathewson committed Apr 13, 2009
1 parent 1becc4c commit 915193e
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 19 deletions.
19 changes: 19 additions & 0 deletions bufferevent-internal.h
Expand Up @@ -33,6 +33,8 @@ extern "C" {
#include "event-config.h"
#include "evutil.h"
#include "defer-internal.h"
#include "evthread-internal.h"
#include "event2/thread.h"

struct bufferevent_private {
struct bufferevent bev;
Expand All @@ -42,6 +44,7 @@ struct bufferevent_private {

/** If set, read is suspended until evbuffer some. */
unsigned read_suspended : 1;
unsigned own_lock : 1;

enum bufferevent_options options;

Expand Down Expand Up @@ -106,6 +109,22 @@ void bufferevent_wm_suspend_read(struct bufferevent *bufev);
* read buffer is too full. */
void bufferevent_wm_unsuspend_read(struct bufferevent *bufev);

int bufferevent_enable_locking(struct bufferevent *bufev, void *lock);

#define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev)

#define BEV_LOCK(b) do { \
struct bufferevent_private *locking = BEV_UPCAST(b); \
if (locking->lock) \
EVLOCK_LOCK(locking->lock, EVTHREAD_WRITE); \
} while(0)

#define BEV_UNLOCK(b) do { \
struct bufferevent_private *locking = BEV_UPCAST(b); \
if (locking->lock) \
EVLOCK_UNLOCK(locking->lock, EVTHREAD_WRITE); \
} while(0)

#ifdef __cplusplus
}
#endif
Expand Down
77 changes: 68 additions & 9 deletions bufferevent.c
Expand Up @@ -59,28 +59,32 @@
#include "bufferevent-internal.h"
#include "util-internal.h"


void
bufferevent_wm_suspend_read(struct bufferevent *bufev)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (!bufev_private->read_suspended) {
bufev->be_ops->disable(bufev, EV_READ);
bufev_private->read_suspended = 1;
}
BEV_LOCK(bufev);
}

void
bufferevent_wm_unsuspend_read(struct bufferevent *bufev)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

BEV_LOCK(bufev);
if (bufev_private->read_suspended) {
bufev_private->read_suspended = 0;
if (bufev->enabled & EV_READ)
bufev->be_ops->enable(bufev, EV_READ);
}
BEV_LOCK(bufev);
}

/* Callback to implement watermarks on the input buffer. Only enabled
Expand All @@ -91,7 +95,9 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf,
void *arg)
{
struct bufferevent *bufev = arg;
size_t size = evbuffer_get_length(buf);
size_t size;

size = evbuffer_get_length(buf);

if (cbinfo->n_added > cbinfo->n_deleted) {
/* Data got added. If it put us over the watermark, stop
Expand Down Expand Up @@ -137,6 +143,15 @@ bufferevent_init_common(struct bufferevent_private *bufev_private,
*/
bufev->enabled = EV_WRITE;

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
if (options & BEV_OPT_THREADSAFE) {
if (bufferevent_enable_locking(bufev, NULL) < 0) {
/* cleanup */
return -1;
}
}
#endif

bufev_private->options = options;

return 0;
Expand All @@ -146,11 +161,14 @@ void
bufferevent_setcb(struct bufferevent *bufev,
evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
{
BEV_LOCK(bufev);

bufev->readcb = readcb;
bufev->writecb = writecb;
bufev->errorcb = errorcb;

bufev->cbarg = cbarg;
BEV_UNLOCK(bufev);
}

struct evbuffer *
Expand Down Expand Up @@ -206,22 +224,27 @@ bufferevent_enable(struct bufferevent *bufev, short event)
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
short impl_events = event;
int r = 0;

BEV_LOCK(bufev);
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;

bufev->enabled |= event;

if (bufev->be_ops->enable(bufev, impl_events) < 0)
return -1;
r = -1;

return (0);
BEV_UNLOCK(bufev);
return r;
}

void
bufferevent_set_timeouts(struct bufferevent *bufev,
const struct timeval *tv_read,
const struct timeval *tv_write)
{
BEV_LOCK(bufev);
if (tv_read) {
bufev->timeout_read = *tv_read;
} else {
Expand All @@ -235,6 +258,7 @@ bufferevent_set_timeouts(struct bufferevent *bufev,

if (bufev->be_ops->adj_timeouts)
bufev->be_ops->adj_timeouts(bufev);
BEV_UNLOCK(bufev);
}


Expand Down Expand Up @@ -265,12 +289,16 @@ bufferevent_settimeout(struct bufferevent *bufev,
int
bufferevent_disable(struct bufferevent *bufev, short event)
{
int r = 0;

BEV_LOCK(bufev);
bufev->enabled &= ~event;

if (bufev->be_ops->disable(bufev, event) < 0)
return (-1);
r = -1;

return (0);
BEV_UNLOCK(bufev);
return r;
}

/*
Expand All @@ -284,6 +312,7 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events,
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

BEV_LOCK(bufev);
if (events & EV_WRITE) {
bufev->wm_write.low = lowmark;
bufev->wm_write.high = highmark;
Expand Down Expand Up @@ -321,17 +350,20 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events,
bufferevent_wm_unsuspend_read(bufev);
}
}
BEV_UNLOCK(bufev);
}

int
bufferevent_flush(struct bufferevent *bufev,
short iotype,
enum bufferevent_flush_mode mode)
{
int r = -1;
BEV_LOCK(bufev);
if (bufev->be_ops->flush)
return bufev->be_ops->flush(bufev, iotype, mode);
else
return -1;
r = bufev->be_ops->flush(bufev, iotype, mode);
BEV_UNLOCK(bufev);
return r;
}

void
Expand All @@ -347,5 +379,32 @@ bufferevent_free(struct bufferevent *bufev)

/* Free the actual allocated memory. */
mm_free(bufev - bufev->be_ops->mem_offset);
/* Free lock XXX */
}

int
bufferevent_enable_locking(struct bufferevent *bufev, void *lock)
{
#ifdef _EVENT_DISABLE_THREAD_SUPPORT
return -1;
#else
if (BEV_UPCAST(bufev)->lock)
return -1;

if (!lock) {
EVTHREAD_ALLOC_LOCK(lock);
if (!lock)
return -1;
BEV_UPCAST(bufev)->lock = lock;
BEV_UPCAST(bufev)->own_lock = 1;
} else {
BEV_UPCAST(bufev)->lock = lock;
BEV_UPCAST(bufev)->own_lock = 0;
}
evbuffer_enable_locking(bufev->input, lock);
evbuffer_enable_locking(bufev->output, lock);

return 0;
#endif
}

11 changes: 10 additions & 1 deletion bufferevent_filter.c
Expand Up @@ -170,6 +170,7 @@ bufferevent_filter_new(struct bufferevent *underlying,
void *ctx)
{
struct bufferevent_filtered *bufev_f;
enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;

if (!input_filter)
input_filter = be_null_filter;
Expand All @@ -181,10 +182,18 @@ bufferevent_filter_new(struct bufferevent *underlying,
return NULL;

if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base,
&bufferevent_ops_filter, options) < 0) {
&bufferevent_ops_filter, tmp_options) < 0) {
mm_free(bufev_f);
return NULL;
}
if (options & BEV_OPT_THREADSAFE) {
void *lock = BEV_UPCAST(underlying)->lock;
if (!lock) {
bufferevent_enable_locking(underlying, NULL);
lock = BEV_UPCAST(underlying)->lock;
}
bufferevent_enable_locking(downcast(bufev_f), lock);
}

bufev_f->underlying = underlying;
bufev_f->process_in = input_filter;
Expand Down
10 changes: 9 additions & 1 deletion bufferevent_pair.c
Expand Up @@ -75,6 +75,7 @@ run_callback(struct deferred_cb *cb, void *arg)
struct bufferevent_pair *bufev = arg;
struct bufferevent *bev = downcast(bufev);

BEV_LOCK(bev);
if (cb == &bufev->deferred_read_cb) {
if (bev->readcb) {
bev->readcb(bev, bev->cbarg);
Expand All @@ -84,6 +85,7 @@ run_callback(struct deferred_cb *cb, void *arg)
bev->writecb(bev, bev->cbarg);
}
}
BEV_UNLOCK(bev);
}

static struct bufferevent_pair *
Expand Down Expand Up @@ -115,16 +117,22 @@ bufferevent_pair_new(struct event_base *base, enum bufferevent_options options,
struct bufferevent *pair[2])
{
struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;

bufev1 = bufferevent_pair_elt_new(base, options);
if (!bufev1)
return -1;
bufev2 = bufferevent_pair_elt_new(base, options);
bufev2 = bufferevent_pair_elt_new(base, tmp_options);
if (!bufev2) {
bufferevent_free(downcast(bufev1));
return -1;
}

if (options & BEV_OPT_THREADSAFE) {
/*XXXX check return */
bufferevent_enable_locking(downcast(bufev2), bufev1->bev.lock);
}

bufev1->partner = bufev2;
bufev2->partner = bufev1;

Expand Down
28 changes: 20 additions & 8 deletions bufferevent_sock.c
Expand Up @@ -342,6 +342,7 @@ be_socket_flush(struct bufferevent *bev, short iotype,
void
bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
BEV_LOCK(bufev);
assert(bufev->be_ops == &bufferevent_ops_socket);

event_del(&bufev->ev_read);
Expand All @@ -351,37 +352,48 @@ bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
BEV_UNLOCK(bufev);
}

/* XXXX Should non-socket buffferevents support this? */
int
bufferevent_priority_set(struct bufferevent *bufev, int priority)
{
int r = -1;

BEV_LOCK(bufev);
if (bufev->be_ops != &bufferevent_ops_socket)
return -1;
goto done;

if (event_priority_set(&bufev->ev_read, priority) == -1)
return (-1);
goto done;
if (event_priority_set(&bufev->ev_write, priority) == -1)
return (-1);
goto done;

return (0);
r = 0;
done:
BEV_UNLOCK(bufev);
return r;
}

/* XXXX Should non-socket buffferevents support this? */
int
bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
{
int res;
int res = -1;

BEV_LOCK(bufev);
if (bufev->be_ops != &bufferevent_ops_socket)
return -1;
goto done;

bufev->ev_base = base;

res = event_base_set(base, &bufev->ev_read);
if (res == -1)
return (res);
goto done;

res = event_base_set(base, &bufev->ev_write);
return (res);
done:
BEV_UNLOCK(bufev);
return res;
}
1 change: 1 addition & 0 deletions include/event2/bufferevent.h
Expand Up @@ -118,6 +118,7 @@ typedef void (*everrorcb)(struct bufferevent *bev, short what, void *ctx);

enum bufferevent_options {
BEV_OPT_CLOSE_ON_FREE = (1<<0),
BEV_OPT_THREADSAFE = (1<<1),
};

/**
Expand Down

0 comments on commit 915193e

Please sign in to comment.