Permalink
Browse files

Restore our priority-inversion-prevention code with deferreds

Back when deferred_cb stuff had its own queue, the queue was always
executed, but we never ran more than 16 callbacks per iteration.
That made for two problems:

1: Because deferred_cb stuff would always run, and had no priority,
it could cause priority inversion.

2: It doesn't respect the max_dispatch_interval code.

Then, when I refactored deferred_cb to be a special case of
event_callback, that solved the above issues, but made for two more
issues:

3: Because deferred_cb stuff would always get the default priority,
it could could low-priority bufferevents to get too much priority.

4: With code like bufferevent_pair, it's easy to get into a
situation where two deferreds keep adding one another, preventing
the event loop from ever actually scanning for more events.

This commit fixes the above by giving deferreds a better notion of
priorities, and by limiting the number of deferreds that can be
added to the _current_ loop iteration's active queues.  (Extra
deferreds are put into the active_later state.)

That isn't an all-purpose priority inversion solution, of course: for
that, you may need to mess around with max_dispatch_interval.
  • Loading branch information...
1 parent 581b5be commit c0e425abdcfc883fa70b6deafdf7327bfb75f02d Nick Mathewson committed May 9, 2012
Showing with 80 additions and 23 deletions.
  1. +2 −1 buffer.c
  2. +2 −2 bufferevent.c
  3. +2 −1 bufferevent_pair.c
  4. +4 −0 bufferevent_sock.c
  5. +6 −1 defer-internal.h
  6. +4 −2 evdns.c
  7. +6 −0 event-internal.h
  8. +29 −7 event.c
  9. +2 −1 http.c
  10. +2 −1 listener.c
  11. +21 −7 test/regress_thread.c
View
3 buffer.c
@@ -404,7 +404,8 @@ evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base)
EVBUFFER_LOCK(buffer);
buffer->cb_queue = base;
buffer->deferred_cbs = 1;
- event_deferred_cb_init_(base, &buffer->deferred,
+ event_deferred_cb_init_(&buffer->deferred,
+ event_base_get_npriorities(base) / 2,
evbuffer_deferred_callback, buffer);
EVBUFFER_UNLOCK(buffer);
return 0;
View
4 bufferevent.c
@@ -324,14 +324,14 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
if (options & BEV_OPT_DEFER_CALLBACKS) {
if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init_(
- bufev->ev_base,
&bufev_private->deferred,
+ event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_unlocked,
bufev_private);
else
event_deferred_cb_init_(
- bufev->ev_base,
&bufev_private->deferred,
+ event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_locked,
bufev_private);
}
View
3 bufferevent_pair.c
@@ -260,8 +260,9 @@ be_pair_disable(struct bufferevent *bev, short events)
if (events & EV_READ) {
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
}
- if (events & EV_WRITE)
+ if (events & EV_WRITE) {
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
+ }
return 0;
}
View
4 bufferevent_sock.c
@@ -636,6 +636,8 @@ int
bufferevent_priority_set(struct bufferevent *bufev, int priority)
{
int r = -1;
+ struct bufferevent_private *bufev_p =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
BEV_LOCK(bufev);
if (bufev->be_ops != &bufferevent_ops_socket)
@@ -646,6 +648,8 @@ bufferevent_priority_set(struct bufferevent *bufev, int priority)
if (event_priority_set(&bufev->ev_write, priority) == -1)
goto done;
+ event_deferred_cb_set_priority_(&bufev_p->deferred, priority);
+
r = 0;
done:
BEV_UNLOCK(bufev);
View
7 defer-internal.h
@@ -42,10 +42,15 @@ typedef void (*deferred_cb_fn)(struct event_callback *, void *);
Initialize an empty, non-pending event_callback.
@param deferred The struct event_callback structure to initialize.
+ @param priority The priority that the callback should run at.
@param cb The function to run when the struct event_callback executes.
@param arg The function's second argument.
*/
-void event_deferred_cb_init_(struct event_base *base, struct event_callback *, deferred_cb_fn, void *);
+void event_deferred_cb_init_(struct event_callback *, ev_uint8_t, deferred_cb_fn, void *);
+/**
+ Change the priority of a non-pending event_callback.
+ */
+void event_deferred_cb_set_priority_(struct event_callback *, ev_uint8_t);
/**
Cancel a struct event_callback if it is currently scheduled in an event_base.
*/
View
6 evdns.c
@@ -836,8 +836,10 @@ reply_schedule_callback(struct request *const req, u32 ttl, u32 err, struct repl
d->handle = req->handle;
}
- event_deferred_cb_init_(req->base->event_base,
- &d->deferred, reply_run_callback,
+ event_deferred_cb_init_(
+ &d->deferred,
+ event_get_priority(&req->timeout_event),
+ reply_run_callback,
req->user_pointer);
event_deferred_cb_schedule_(
req->base->event_base,
View
6 event-internal.h
@@ -218,6 +218,12 @@ struct event_base {
* reentrant invocation. */
int running_loop;
+ /** Set to the number of deferred_cbs we've made 'active' in the
+ * loop. This is a hack to prevent starvation; it would be smarter
+ * to just use event_config_set_max_dispatch_interval's max_callbacks
+ * feature */
+ int n_deferreds_queued;
+
/* Active event management. */
/** An array of nactivequeues queues for active event_callbacks (ones
* that have triggered, and whose callbacks need to be called). Low
View
36 event.c
@@ -1072,8 +1072,8 @@ event_config_set_max_dispatch_interval(struct event_config *cfg,
cfg->max_dispatch_interval.tv_sec = -1;
cfg->max_dispatch_callbacks =
max_callbacks >= 0 ? max_callbacks : INT_MAX;
- if (min_priority <= 0)
- min_priority = 1;
+ if (min_priority < 0)
+ min_priority = 0;
cfg->limit_callbacks_after_prio = min_priority;
return (0);
}
@@ -1683,6 +1683,7 @@ event_base_loop(struct event_base *base, int flags)
while (!done) {
base->event_continue = 0;
+ base->n_deferreds_queued = 0;
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
@@ -2593,35 +2594,52 @@ event_callback_cancel_nolock_(struct event_base *base,
case 0:
break;
}
+
+ event_base_assert_ok_(base);
+
return 0;
}
void
-event_deferred_cb_init_(struct event_base *base, struct event_callback *cb, deferred_cb_fn fn, void *arg)
+event_deferred_cb_init_(struct event_callback *cb, ev_uint8_t priority, deferred_cb_fn fn, void *arg)
{
- if (!base)
- base = current_base;
memset(cb, 0, sizeof(*cb));
cb->evcb_cb_union.evcb_selfcb = fn;
cb->evcb_arg = arg;
- cb->evcb_pri = base->nactivequeues - 1;
+ cb->evcb_pri = priority;
cb->evcb_closure = EV_CLOSURE_CB_SELF;
}
void
+event_deferred_cb_set_priority_(struct event_callback *cb, ev_uint8_t priority)
+{
+ cb->evcb_pri = priority;
+}
+
+void
event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb)
{
if (!base)
base = current_base;
event_callback_cancel_(base, cb);
}
+#define MAX_DEFERREDS_QUEUED 32
int
event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb)
{
+ int r = 1;
if (!base)
base = current_base;
- return event_callback_activate_(base, cb);
+ EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+ if (base->n_deferreds_queued > MAX_DEFERREDS_QUEUED) {
+ event_callback_activate_later_nolock_(base, cb);
+ } else {
+ ++base->n_deferreds_queued;
+ r = event_callback_activate_nolock_(base, cb);
+ }
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
+ return r;
}
static int
@@ -2868,6 +2886,7 @@ event_queue_insert_active(struct event_base *base, struct event_callback *evcb)
evcb->evcb_flags |= EVLIST_ACTIVE;
base->event_count_active++;
+ EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri],
evcb, evcb_active_next);
}
@@ -2884,6 +2903,7 @@ event_queue_insert_active_later(struct event_base *base, struct event_callback *
INCR_EVENT_COUNT(base, evcb->evcb_flags);
evcb->evcb_flags |= EVLIST_ACTIVE_LATER;
base->event_count_active++;
+ EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next);
}
@@ -2920,7 +2940,9 @@ event_queue_make_later_events_active(struct event_base *base)
while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE;
+ EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next);
+ base->n_deferreds_queued += (evcb->evcb_closure == EV_CLOSURE_CB_SELF);
}
}
View
3 http.c
@@ -2197,8 +2197,9 @@ evhttp_connection_base_bufferevent_new(struct event_base *base, struct evdns_bas
bufferevent_base_set(base, evcon->bufev);
}
- event_deferred_cb_init_(evcon->base,
+ event_deferred_cb_init_(
&evcon->read_more_deferred_cb,
+ bufferevent_get_priority(bev),
evhttp_deferred_read_cb, evcon);
evcon->dns_base = dnsbase;
View
3 listener.c
@@ -498,7 +498,8 @@ new_accepting_socket(struct evconnlistener_iocp *lev, int family)
res->family = family;
event_deferred_cb_init_(&res->deferred,
- accepted_socket_invoke_user_cb, res);
+ event_base_get_npriorities(base) / 2,
+ accepted_socket_invoke_user_cb, res);
InitializeCriticalSectionAndSpinCount(&res->lock, 1000);
View
28 test/regress_thread.c
@@ -439,7 +439,8 @@ load_deferred_queue(void *arg)
size_t i;
for (i = 0; i < CB_COUNT; ++i) {
- event_deferred_cb_init_(data->queue, &data->cbs[i], deferred_callback, NULL);
+ event_deferred_cb_init_(&data->cbs[i], 0, deferred_callback,
+ NULL);
event_deferred_cb_schedule_(data->queue, &data->cbs[i]);
SLEEP_MS(1);
}
@@ -469,20 +470,28 @@ thread_deferred_cb_skew(void *arg)
{
struct basic_test_data *data = arg;
struct timeval tv_timer = {1, 0};
- struct event_base *queue = data->base;
+ struct event_base *base = NULL;
+ struct event_config *cfg = NULL;
struct timeval elapsed;
int elapsed_usec;
int i;
+ cfg = event_config_new();
+ tt_assert(cfg);
+ event_config_set_max_dispatch_interval(cfg, NULL, 16, 0);
+
+ base = event_base_new_with_config(cfg);
+ tt_assert(base);
+
for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
- deferred_data[i].queue = queue;
+ deferred_data[i].queue = base;
evutil_gettimeofday(&timer_start, NULL);
- event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL,
+ event_base_once(base, -1, EV_TIMEOUT, timer_callback, NULL,
&tv_timer);
- event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback,
+ event_base_once(base, -1, EV_TIMEOUT, start_threads_callback,
NULL, NULL);
- event_base_dispatch(data->base);
+ event_base_dispatch(base);
evutil_timersub(&timer_end, &timer_start, &elapsed);
TT_BLATHER(("callback count, %u", callback_count));
@@ -497,6 +506,10 @@ thread_deferred_cb_skew(void *arg)
end:
for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
THREAD_JOIN(load_threads[i]);
+ if (base)
+ event_base_free(base);
+ if (cfg)
+ event_config_free(cfg);
}
static struct event time_events[5];
@@ -580,7 +593,8 @@ struct testcase_t thread_testcases[] = {
&basic_setup, (char*)"forking" },
#endif
TEST(conditions_simple),
- TEST(deferred_cb_skew),
+ { "deferred_cb_skew", thread_deferred_cb_skew, TT_FORK|TT_NEED_THREADS,
+ &basic_setup, NULL },
TEST(no_events),
END_OF_TESTCASES
};

0 comments on commit c0e425a

Please sign in to comment.