Skip to content

Commit

Permalink
Stop passing EVTHREAD_READ and EVTHREAD_WRITE to non-rw locks.
Browse files Browse the repository at this point in the history
Previously, our default lock model kind of assumed that every lock was
potentially a read-write lock.  This was a poor choice, since
read-write locks are far more expensive than regular locks, and so the
lock API should only use them when we can actually take advantage of
them.  Neither our pthreads or win32 lock implementation provided rw
locks.

Now that we have a way (not currently used!) to	indicate that we
really want a read-write lock, we shouldn't actually say "lock this
for reading" or "lock this for writing" unless we mean it.
  • Loading branch information
nmathewson committed Nov 27, 2009
1 parent 347952f commit 76cd2b7
Show file tree
Hide file tree
Showing 15 changed files with 155 additions and 144 deletions.
148 changes: 74 additions & 74 deletions buffer.c

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions buffer_iocp.c
Expand Up @@ -98,7 +98,7 @@ evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes)
struct evbuffer_iovec iov[2];
int n_vec;

EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
EVBUFFER_LOCK(evbuf);
EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress);
EVUTIL_ASSERT(nBytes >= 0); // XXXX Can this be false?

Expand Down Expand Up @@ -130,7 +130,7 @@ evbuffer_commit_write(struct evbuffer *evbuf, ev_ssize_t nBytes)
{
struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);

EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
EVBUFFER_LOCK(evbuf);
EVUTIL_ASSERT(buf->write_in_progress && !buf->read_in_progress);
evbuffer_unfreeze(evbuf, 1);
evbuffer_drain(evbuf, nBytes);
Expand Down Expand Up @@ -170,7 +170,7 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most,
return -1;
}

EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
EVBUFFER_LOCK(buf);
EVUTIL_ASSERT(!buf_o->read_in_progress);
if (buf->freeze_start || buf_o->write_in_progress)
goto done;
Expand Down Expand Up @@ -221,7 +221,7 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most,
buf_o->write_in_progress = 1;
r = 0;
done:
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
EVBUFFER_UNLOCK(buf);
return r;
}

Expand All @@ -240,7 +240,7 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most,

if (!buf_o)
return -1;
EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
EVBUFFER_LOCK(buf);
EVUTIL_ASSERT(!buf_o->write_in_progress);
if (buf->freeze_end || buf_o->read_in_progress)
goto done;
Expand Down Expand Up @@ -286,7 +286,7 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most,
buf_o->read_in_progress = 1;
r = 0;
done:
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
EVBUFFER_UNLOCK(buf);
return r;
}

Expand All @@ -301,9 +301,9 @@ void
_evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd)
{
struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
EVBUFFER_LOCK(buf);
/* XXX is this right?, should it cancel current I/O operations? */
if (buf_o)
buf_o->fd = fd;
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
EVBUFFER_UNLOCK(buf);
}
4 changes: 2 additions & 2 deletions bufferevent-internal.h
Expand Up @@ -228,14 +228,14 @@ void _bufferevent_generic_adj_timeouts(struct bufferevent *bev);
#define BEV_LOCK(b) do { \
struct bufferevent_private *locking = BEV_UPCAST(b); \
if (locking->lock) \
EVLOCK_LOCK(locking->lock, EVTHREAD_WRITE); \
EVLOCK_LOCK(locking->lock, 0); \
} while(0)

/** Internal: Release the lock (if any) on a bufferevent */
#define BEV_UNLOCK(b) do { \
struct bufferevent_private *locking = BEV_UPCAST(b); \
if (locking->lock) \
EVLOCK_UNLOCK(locking->lock, EVTHREAD_WRITE); \
EVLOCK_UNLOCK(locking->lock, 0); \
} while(0)

#ifdef __cplusplus
Expand Down
4 changes: 2 additions & 2 deletions devpoll.c
Expand Up @@ -176,11 +176,11 @@ devpoll_dispatch(struct event_base *base, struct timeval *tv)
dvp.dp_nfds = devpollop->nevents;
dvp.dp_timeout = timeout;

EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, th_base_lock);

res = ioctl(devpollop->dpfd, DP_POLL, &dvp);

EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);

if (res == -1) {
if (errno != EINTR) {
Expand Down
4 changes: 2 additions & 2 deletions epoll.c
Expand Up @@ -145,11 +145,11 @@ epoll_dispatch(struct event_base *base, struct timeval *tv)
timeout = MAX_EPOLL_TIMEOUT_MSEC;
}

EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, th_base_lock);

res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);

EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);

if (res == -1) {
if (errno != EINTR) {
Expand Down
14 changes: 6 additions & 8 deletions evbuffer-internal.h
Expand Up @@ -222,30 +222,28 @@ struct evbuffer_chain_reference {
((struct evbuffer*)(buffer))->lock_count--; \
} while (0)

#define EVBUFFER_LOCK(buffer, mode) \
#define EVBUFFER_LOCK(buffer) \
do { \
EVLOCK_LOCK((buffer)->lock, (mode)); \
EVLOCK_LOCK((buffer)->lock, 0); \
_EVBUFFER_INCREMENT_LOCK_COUNT(buffer); \
} while(0)
#define EVBUFFER_UNLOCK(buffer, mode) \
#define EVBUFFER_UNLOCK(buffer) \
do { \
_EVBUFFER_DECREMENT_LOCK_COUNT(buffer); \
EVLOCK_UNLOCK((buffer)->lock, (mode)); \
EVLOCK_UNLOCK((buffer)->lock, 0); \
} while(0)

#define EVBUFFER_LOCK2(buffer1, buffer2) \
do { \
EVLOCK_LOCK2((buffer1)->lock, (buffer2)->lock, \
EVTHREAD_WRITE, EVTHREAD_WRITE); \
EVLOCK_LOCK2((buffer1)->lock, (buffer2)->lock, 0, 0); \
_EVBUFFER_INCREMENT_LOCK_COUNT(buffer1); \
_EVBUFFER_INCREMENT_LOCK_COUNT(buffer2); \
} while(0)
#define EVBUFFER_UNLOCK2(buffer1, buffer2) \
do { \
_EVBUFFER_DECREMENT_LOCK_COUNT(buffer1); \
_EVBUFFER_DECREMENT_LOCK_COUNT(buffer2); \
EVLOCK_UNLOCK2((buffer1)->lock, (buffer2)->lock, \
EVTHREAD_WRITE, EVTHREAD_WRITE); \
EVLOCK_UNLOCK2((buffer1)->lock, (buffer2)->lock, 0, 0); \
} while(0)

/** Increase the reference count of buf by one. */
Expand Down
6 changes: 3 additions & 3 deletions evdns.c
Expand Up @@ -421,16 +421,16 @@ static int strtoint(const char *const str);
#define EVDNS_LOCK(base) \
do { \
if ((base)->lock) { \
EVLOCK_LOCK((base)->lock, EVTHREAD_WRITE); \
EVLOCK_LOCK((base)->lock, 0); \
} \
++(base)->lock_count; \
} while (0)
#define EVDNS_UNLOCK(base) \
do { \
EVUTIL_ASSERT((base)->lock_count > 0); \
EVUTIL_ASSERT((base)->lock_count > 0); \
--(base)->lock_count; \
if ((base)->lock) { \
EVLOCK_UNLOCK((base)->lock, EVTHREAD_WRITE); \
EVLOCK_UNLOCK((base)->lock, 0); \
} \
} while (0)
#define ASSERT_LOCKED(base) EVUTIL_ASSERT((base)->lock_count > 0)
Expand Down
49 changes: 24 additions & 25 deletions event.c
Expand Up @@ -760,7 +760,7 @@ common_timeout_callback(evutil_socket_t fd, short what, void *arg)
struct common_timeout_list *ctl = arg;
struct event_base *base = ctl->base;
struct event *ev = NULL;
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
gettime(base, &now);
while (1) {
ev = TAILQ_FIRST(&ctl->events);
Expand All @@ -773,7 +773,7 @@ common_timeout_callback(evutil_socket_t fd, short what, void *arg)
}
if (ev)
common_timeout_schedule(ctl, &now, ev);
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, th_base_lock);
}

#define MAX_COMMON_TIMEOUTS 256
Expand All @@ -787,7 +787,7 @@ event_base_init_common_timeout(struct event_base *base,
const struct timeval *result=NULL;
struct common_timeout_list *new_ctl;

EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (duration->tv_usec > 1000000) {
memcpy(&tv, duration, sizeof(struct timeval));
if (is_common_timeout(duration, base))
Expand Down Expand Up @@ -848,7 +848,7 @@ event_base_init_common_timeout(struct event_base *base,
if (result)
EVUTIL_ASSERT(is_common_timeout(result, base));

EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return result;
}

Expand Down Expand Up @@ -915,10 +915,9 @@ event_process_active_single_queue(struct event_base *base,

base->current_event = ev;

EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, current_event_lock);
EVBASE_ACQUIRE_LOCK(base, current_event_lock);

EVBASE_RELEASE_LOCK(base,
EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, th_base_lock);

switch (ev->ev_closure) {
case EV_CLOSURE_SIGNAL:
Expand All @@ -934,8 +933,8 @@ event_process_active_single_queue(struct event_base *base,
break;
}

EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, current_event_lock);
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, current_event_lock);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
base->current_event = NULL;

if (base->event_break)
Expand Down Expand Up @@ -1053,9 +1052,9 @@ event_base_loopbreak(struct event_base *event_base)
if (event_base == NULL)
return (-1);

EVBASE_ACQUIRE_LOCK(event_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(event_base, th_base_lock);
event_base->event_break = 1;
EVBASE_RELEASE_LOCK(event_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(event_base, th_base_lock);

if (!EVBASE_IN_THREAD(event_base)) {
return evthread_notify_base(event_base);
Expand All @@ -1068,19 +1067,19 @@ int
event_base_got_break(struct event_base *event_base)
{
int res;
EVBASE_ACQUIRE_LOCK(event_base, EVTHREAD_READ, th_base_lock);
EVBASE_ACQUIRE_LOCK(event_base, th_base_lock);
res = event_base->event_break;
EVBASE_RELEASE_LOCK(event_base, EVTHREAD_READ, th_base_lock);
EVBASE_RELEASE_LOCK(event_base, th_base_lock);
return res;
}

int
event_base_got_exit(struct event_base *event_base)
{
int res;
EVBASE_ACQUIRE_LOCK(event_base, EVTHREAD_READ, th_base_lock);
EVBASE_ACQUIRE_LOCK(event_base, th_base_lock);
res = event_base->event_gotterm;
EVBASE_RELEASE_LOCK(event_base, EVTHREAD_READ, th_base_lock);
EVBASE_RELEASE_LOCK(event_base, th_base_lock);
return res;
}

Expand All @@ -1102,7 +1101,7 @@ event_base_loop(struct event_base *base, int flags)

/* Grab the lock. We will release it inside evsel.dispatch, and again
* as we invoke user callbacks. */
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);

clear_time_cache(base);

Expand Down Expand Up @@ -1169,7 +1168,7 @@ event_base_loop(struct event_base *base, int flags)

clear_time_cache(base);

EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, th_base_lock);

event_debug(("%s: asked to terminate loop.", __func__));
return (0);
Expand Down Expand Up @@ -1419,11 +1418,11 @@ event_add(struct event *ev, const struct timeval *tv)
{
int res;

EVBASE_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

res = event_add_internal(ev, tv, 0);

EVBASE_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

return (res);
}
Expand Down Expand Up @@ -1602,11 +1601,11 @@ event_del(struct event *ev)
{
int res;

EVBASE_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

res = event_del_internal(ev);

EVBASE_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

return (res);
}
Expand Down Expand Up @@ -1634,7 +1633,7 @@ event_del_internal(struct event *ev)
base = ev->ev_base;
need_cur_lock = (base->current_event == ev);
if (need_cur_lock)
EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, current_event_lock);
EVBASE_ACQUIRE_LOCK(base, current_event_lock);

EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));

Expand Down Expand Up @@ -1678,19 +1677,19 @@ event_del_internal(struct event *ev)
evthread_notify_base(base);

if (need_cur_lock)
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, current_event_lock);
EVBASE_RELEASE_LOCK(base, current_event_lock);

return (res);
}

void
event_active(struct event *ev, int res, short ncalls)
{
EVBASE_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

event_active_nolock(ev, res, ncalls);

EVBASE_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}


Expand Down
4 changes: 2 additions & 2 deletions evport.c
Expand Up @@ -297,12 +297,12 @@ evport_dispatch(struct event_base *base, struct timeval *tv)
}
}

EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_RELEASE_LOCK(base, th_base_lock);

res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN,
(unsigned int *) &nevents, ts_p);

EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);

if (res == -1) {
if (errno == EINTR || errno == EAGAIN) {
Expand Down
14 changes: 7 additions & 7 deletions evthread-internal.h
Expand Up @@ -118,16 +118,16 @@ extern unsigned long (*_evthread_id_fn)(void);


/** Lock an event_base, if it is set up for locking. Acquires the lock
in the base structure whose field is named 'lck'. */
#define EVBASE_ACQUIRE_LOCK(base, mode, lockvar) do { \
in the base structure whose field is named 'lockvar'. */
#define EVBASE_ACQUIRE_LOCK(base, lockvar) do { \
if (EVBASE_USING_LOCKS(base)) \
_evthread_lock_fns.lock(mode, (base)->lockvar); \
_evthread_lock_fns.lock(0, (base)->lockvar); \
} while (0)

/** Unlock an event_base, if it is set up for locking. */
#define EVBASE_RELEASE_LOCK(base, mode, lockvar) do { \
#define EVBASE_RELEASE_LOCK(base, lockvar) do { \
if (EVBASE_USING_LOCKS(base)) \
_evthread_lock_fns.unlock(mode, (base)->lockvar); \
_evthread_lock_fns.unlock(0, (base)->lockvar); \
} while (0)
#else /* _EVENT_DISABLE_THREAD_SUPPORT */

Expand All @@ -141,8 +141,8 @@ extern unsigned long (*_evthread_id_fn)(void);
#define EVLOCK_UNLOCK2(lock1,lock2,mode1,mode2) _EVUTIL_NIL_STMT

#define EVBASE_IN_THREAD(base) 1
#define EVBASE_ACQUIRE_LOCK(base, mode, lock) _EVUTIL_NIL_STMT
#define EVBASE_RELEASE_LOCK(base, mode, lock) _EVUTIL_NIL_STMT
#define EVBASE_ACQUIRE_LOCK(base, lock) _EVUTIL_NIL_STMT
#define EVBASE_RELEASE_LOCK(base, lock) _EVUTIL_NIL_STMT

#endif

Expand Down

0 comments on commit 76cd2b7

Please sign in to comment.