diff --git a/bufferevent-internal.h b/bufferevent-internal.h index f369659ffa..b1026f6813 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -33,6 +33,8 @@ extern "C" { #include "event-config.h" #include "evutil.h" #include "defer-internal.h" +#include "evthread-internal.h" +#include "event2/thread.h" struct bufferevent_private { struct bufferevent bev; @@ -42,6 +44,7 @@ struct bufferevent_private { /** If set, read is suspended until evbuffer some. */ unsigned read_suspended : 1; + unsigned own_lock : 1; enum bufferevent_options options; @@ -106,6 +109,22 @@ void bufferevent_wm_suspend_read(struct bufferevent *bufev); * read buffer is too full. */ void bufferevent_wm_unsuspend_read(struct bufferevent *bufev); +int bufferevent_enable_locking(struct bufferevent *bufev, void *lock); + +#define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev) + +#define BEV_LOCK(b) do { \ + struct bufferevent_private *locking = BEV_UPCAST(b); \ + if (locking->lock) \ + EVLOCK_LOCK(locking->lock, EVTHREAD_WRITE); \ + } while(0) + +#define BEV_UNLOCK(b) do { \ + struct bufferevent_private *locking = BEV_UPCAST(b); \ + if (locking->lock) \ + EVLOCK_UNLOCK(locking->lock, EVTHREAD_WRITE); \ + } while(0) + #ifdef __cplusplus } #endif diff --git a/bufferevent.c b/bufferevent.c index 69b8670243..f92a52bcd9 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -59,16 +59,17 @@ #include "bufferevent-internal.h" #include "util-internal.h" - void bufferevent_wm_suspend_read(struct bufferevent *bufev) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + BEV_LOCK(bufev); if (!bufev_private->read_suspended) { bufev->be_ops->disable(bufev, EV_READ); bufev_private->read_suspended = 1; } + BEV_LOCK(bufev); } void @@ -76,11 +77,14 @@ bufferevent_wm_unsuspend_read(struct bufferevent *bufev) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + + BEV_LOCK(bufev); if (bufev_private->read_suspended) { bufev_private->read_suspended = 0; if (bufev->enabled & EV_READ) bufev->be_ops->enable(bufev, EV_READ); } + BEV_LOCK(bufev); } /* Callback to implement watermarks on the input buffer. Only enabled @@ -91,7 +95,9 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf, void *arg) { struct bufferevent *bufev = arg; - size_t size = evbuffer_get_length(buf); + size_t size; + + size = evbuffer_get_length(buf); if (cbinfo->n_added > cbinfo->n_deleted) { /* Data got added. If it put us over the watermark, stop @@ -137,6 +143,15 @@ bufferevent_init_common(struct bufferevent_private *bufev_private, */ bufev->enabled = EV_WRITE; +#ifndef _EVENT_DISABLE_THREAD_SUPPORT + if (options & BEV_OPT_THREADSAFE) { + if (bufferevent_enable_locking(bufev, NULL) < 0) { + /* cleanup */ + return -1; + } + } +#endif + bufev_private->options = options; return 0; @@ -146,11 +161,14 @@ void bufferevent_setcb(struct bufferevent *bufev, evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg) { + BEV_LOCK(bufev); + bufev->readcb = readcb; bufev->writecb = writecb; bufev->errorcb = errorcb; bufev->cbarg = cbarg; + BEV_UNLOCK(bufev); } struct evbuffer * @@ -206,15 +224,19 @@ bufferevent_enable(struct bufferevent *bufev, short event) struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); short impl_events = event; + int r = 0; + + BEV_LOCK(bufev); if (bufev_private->read_suspended) impl_events &= ~EV_READ; bufev->enabled |= event; if (bufev->be_ops->enable(bufev, impl_events) < 0) - return -1; + r = -1; - return (0); + BEV_UNLOCK(bufev); + return r; } void @@ -222,6 +244,7 @@ bufferevent_set_timeouts(struct bufferevent *bufev, const struct timeval *tv_read, const struct timeval *tv_write) { + BEV_LOCK(bufev); if (tv_read) { bufev->timeout_read = *tv_read; } else { @@ -235,6 +258,7 @@ bufferevent_set_timeouts(struct bufferevent *bufev, if (bufev->be_ops->adj_timeouts) bufev->be_ops->adj_timeouts(bufev); + BEV_UNLOCK(bufev); } @@ -265,12 +289,16 @@ bufferevent_settimeout(struct bufferevent *bufev, int bufferevent_disable(struct bufferevent *bufev, short event) { + int r = 0; + + BEV_LOCK(bufev); bufev->enabled &= ~event; if (bufev->be_ops->disable(bufev, event) < 0) - return (-1); + r = -1; - return (0); + BEV_UNLOCK(bufev); + return r; } /* @@ -284,6 +312,7 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events, struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + BEV_LOCK(bufev); if (events & EV_WRITE) { bufev->wm_write.low = lowmark; bufev->wm_write.high = highmark; @@ -321,6 +350,7 @@ bufferevent_setwatermark(struct bufferevent *bufev, short events, bufferevent_wm_unsuspend_read(bufev); } } + BEV_UNLOCK(bufev); } int @@ -328,10 +358,12 @@ bufferevent_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode mode) { + int r = -1; + BEV_LOCK(bufev); if (bufev->be_ops->flush) - return bufev->be_ops->flush(bufev, iotype, mode); - else - return -1; + r = bufev->be_ops->flush(bufev, iotype, mode); + BEV_UNLOCK(bufev); + return r; } void @@ -347,5 +379,32 @@ bufferevent_free(struct bufferevent *bufev) /* Free the actual allocated memory. */ mm_free(bufev - bufev->be_ops->mem_offset); + /* Free lock XXX */ +} + +int +bufferevent_enable_locking(struct bufferevent *bufev, void *lock) +{ +#ifdef _EVENT_DISABLE_THREAD_SUPPORT + return -1; +#else + if (BEV_UPCAST(bufev)->lock) + return -1; + + if (!lock) { + EVTHREAD_ALLOC_LOCK(lock); + if (!lock) + return -1; + BEV_UPCAST(bufev)->lock = lock; + BEV_UPCAST(bufev)->own_lock = 1; + } else { + BEV_UPCAST(bufev)->lock = lock; + BEV_UPCAST(bufev)->own_lock = 0; + } + evbuffer_enable_locking(bufev->input, lock); + evbuffer_enable_locking(bufev->output, lock); + + return 0; +#endif } diff --git a/bufferevent_filter.c b/bufferevent_filter.c index 54fd23c828..b427cfb207 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -170,6 +170,7 @@ bufferevent_filter_new(struct bufferevent *underlying, void *ctx) { struct bufferevent_filtered *bufev_f; + enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE; if (!input_filter) input_filter = be_null_filter; @@ -181,10 +182,18 @@ bufferevent_filter_new(struct bufferevent *underlying, return NULL; if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base, - &bufferevent_ops_filter, options) < 0) { + &bufferevent_ops_filter, tmp_options) < 0) { mm_free(bufev_f); return NULL; } + if (options & BEV_OPT_THREADSAFE) { + void *lock = BEV_UPCAST(underlying)->lock; + if (!lock) { + bufferevent_enable_locking(underlying, NULL); + lock = BEV_UPCAST(underlying)->lock; + } + bufferevent_enable_locking(downcast(bufev_f), lock); + } bufev_f->underlying = underlying; bufev_f->process_in = input_filter; diff --git a/bufferevent_pair.c b/bufferevent_pair.c index 6fed0e0331..6080421c0c 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -75,6 +75,7 @@ run_callback(struct deferred_cb *cb, void *arg) struct bufferevent_pair *bufev = arg; struct bufferevent *bev = downcast(bufev); + BEV_LOCK(bev); if (cb == &bufev->deferred_read_cb) { if (bev->readcb) { bev->readcb(bev, bev->cbarg); @@ -84,6 +85,7 @@ run_callback(struct deferred_cb *cb, void *arg) bev->writecb(bev, bev->cbarg); } } + BEV_UNLOCK(bev); } static struct bufferevent_pair * @@ -115,16 +117,22 @@ bufferevent_pair_new(struct event_base *base, enum bufferevent_options options, struct bufferevent *pair[2]) { struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; + enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE; bufev1 = bufferevent_pair_elt_new(base, options); if (!bufev1) return -1; - bufev2 = bufferevent_pair_elt_new(base, options); + bufev2 = bufferevent_pair_elt_new(base, tmp_options); if (!bufev2) { bufferevent_free(downcast(bufev1)); return -1; } + if (options & BEV_OPT_THREADSAFE) { + /*XXXX check return */ + bufferevent_enable_locking(downcast(bufev2), bufev1->bev.lock); + } + bufev1->partner = bufev2; bufev2->partner = bufev1; diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 0e5e3a7b77..5d6b92be27 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -342,6 +342,7 @@ be_socket_flush(struct bufferevent *bev, short iotype, void bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd) { + BEV_LOCK(bufev); assert(bufev->be_ops == &bufferevent_ops_socket); event_del(&bufev->ev_read); @@ -351,37 +352,48 @@ bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd) EV_READ|EV_PERSIST, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); + BEV_UNLOCK(bufev); } /* XXXX Should non-socket buffferevents support this? */ int bufferevent_priority_set(struct bufferevent *bufev, int priority) { + int r = -1; + + BEV_LOCK(bufev); if (bufev->be_ops != &bufferevent_ops_socket) - return -1; + goto done; if (event_priority_set(&bufev->ev_read, priority) == -1) - return (-1); + goto done; if (event_priority_set(&bufev->ev_write, priority) == -1) - return (-1); + goto done; - return (0); + r = 0; +done: + BEV_UNLOCK(bufev); + return r; } /* XXXX Should non-socket buffferevents support this? */ int bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) { - int res; + int res = -1; + + BEV_LOCK(bufev); if (bufev->be_ops != &bufferevent_ops_socket) - return -1; + goto done; bufev->ev_base = base; res = event_base_set(base, &bufev->ev_read); if (res == -1) - return (res); + goto done; res = event_base_set(base, &bufev->ev_write); - return (res); +done: + BEV_UNLOCK(bufev); + return res; } diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h index bbb70ad86d..41f976b68d 100644 --- a/include/event2/bufferevent.h +++ b/include/event2/bufferevent.h @@ -118,6 +118,7 @@ typedef void (*everrorcb)(struct bufferevent *bev, short what, void *ctx); enum bufferevent_options { BEV_OPT_CLOSE_ON_FREE = (1<<0), + BEV_OPT_THREADSAFE = (1<<1), }; /**