Skip to content

Commit

Permalink
input.c: Process only safe events before blocking.
Browse files Browse the repository at this point in the history
Introduce multiqueue_process_priority() to process only events at or
above a certain priority.
  • Loading branch information
justinmk committed Mar 14, 2017
1 parent 72b9471 commit 0707650
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 64 deletions.
5 changes: 5 additions & 0 deletions src/nvim/event/defs.h
Expand Up @@ -6,6 +6,11 @@

#define EVENT_HANDLER_MAX_ARGC 6

typedef enum {
kEvPriorityNormal = 1,
kEvPriorityAsync = 2, // safe to run in any state
} EventPriority;

typedef void (*argv_callback)(void **argv);
typedef struct message {
int priority;
Expand Down
93 changes: 50 additions & 43 deletions src/nvim/event/multiqueue.c
Expand Up @@ -124,6 +124,7 @@ void multiqueue_free(MultiQueue *this)
xfree(this);
}

/// Removes the next item and returns its Event.
Event multiqueue_get(MultiQueue *this)
{
return multiqueue_empty(this) ? NILEVENT : multiqueue_remove(this);
Expand All @@ -142,45 +143,38 @@ void multiqueue_process_events(MultiQueue *this)
{
assert(this);
while (!multiqueue_empty(this)) {
Event event = multiqueue_get(this);
Event event = multiqueue_remove(this);
if (event.handler) {
event.handler(event.argv);
}
}
}

void multiqueue_process_debug(MultiQueue *this)
void multiqueue_process_priority(MultiQueue *this, int priority)
{
assert(this);
QUEUE *start = QUEUE_HEAD(&this->headtail);
QUEUE *cur = start;
// MultiQueue *start = this;
// MultiQueue *cur = start;
do {
QUEUE *cur = start;
while (!multiqueue_empty(this)) {
MultiQueueItem *item = multiqueue_node_data(cur);
Event ev;
if (item->link) {
assert(!this->parent);
// get the next node in the linked queue
MultiQueue *linked = item->data.queue;
assert(!multiqueue_empty(linked));
MultiQueueItem *child =
multiqueue_node_data(QUEUE_HEAD(&linked->headtail));
ev = child->data.item.event;
assert(!item->link || !this->parent); // Only a parent queue has link-nodes
Event ev = multiqueueitem_get_event(item, false);
ILOG("ev: priority=%d, handler=%p", ev.priority, ev.handler);

if (ev.priority >= priority) {
// processed
if (ev.handler) {
ev.handler(ev.argv);
}
multiqueue_remove(this);
} else {
ev = item->data.item.event;
// not processed
cur = cur->next->next;
if (!cur || cur == start) {
break;
}
}

// Event event = multiqueue_get(this);
// if (event.handler) {
// event.handler(event.argv);
// }

ILOG("ev: priority=%d, handler=%p arg1=%s", ev.priority, ev.handler,
ev.argv ? ev.argv[0] : "(null)");

cur = cur->next;
} while (cur && cur != start);
}
}

/// Removes all events without processing them.
Expand Down Expand Up @@ -210,43 +204,56 @@ size_t multiqueue_size(MultiQueue *this)
return this->size;
}

static Event multiqueue_remove(MultiQueue *this)
/// Gets the item's event.
///
/// @param remove Free and remove the item node from its queue.
static Event multiqueueitem_get_event(MultiQueueItem *item, bool remove)
{
assert(!multiqueue_empty(this));
QUEUE *h = QUEUE_HEAD(&this->headtail);
QUEUE_REMOVE(h);
MultiQueueItem *item = multiqueue_node_data(h);
Event rv;

assert(item != NULL);
Event ev;
if (item->link) {
assert(!this->parent);
// remove the next node in the linked queue
// get the next node in the linked queue
MultiQueue *linked = item->data.queue;
assert(!multiqueue_empty(linked));
MultiQueueItem *child =
multiqueue_node_data(QUEUE_HEAD(&linked->headtail));
QUEUE_REMOVE(&child->node);
rv = child->data.item.event;
xfree(child);
ev = child->data.item.event;
// remove the child node
if (remove) {
QUEUE_REMOVE(&child->node);
xfree(child);
}
} else {
if (this->parent) {
// remove the corresponding link node in the parent queue
// remove the corresponding link node in the parent queue
if (remove && item->data.item.parent_item) {
QUEUE_REMOVE(&item->data.item.parent_item->node);
xfree(item->data.item.parent_item);
item->data.item.parent_item = NULL;
}
rv = item->data.item.event;
ev = item->data.item.event;
}
return ev;
}

static Event multiqueue_remove(MultiQueue *this)
{
assert(!multiqueue_empty(this));
QUEUE *h = QUEUE_HEAD(&this->headtail);
QUEUE_REMOVE(h);
MultiQueueItem *item = multiqueue_node_data(h);
assert(!item->link || !this->parent); // Only a parent queue has link-nodes
Event ev = multiqueueitem_get_event(item, true);
this->size--;
xfree(item);
return rv;
return ev;
}

static void multiqueue_push(MultiQueue *this, Event event)
{
MultiQueueItem *item = xmalloc(sizeof(MultiQueueItem));
item->link = false;
item->data.item.event = event;
item->data.item.parent_item = NULL;
QUEUE_INSERT_TAIL(&this->headtail, &item->node);
if (this->parent) {
// push link node to the parent queue
Expand Down
2 changes: 1 addition & 1 deletion src/nvim/event/multiqueue.h
Expand Up @@ -10,7 +10,7 @@ typedef struct multiqueue MultiQueue;
typedef void (*put_callback)(MultiQueue *multiq, void *data);

#define multiqueue_put(q, h, ...) \
multiqueue_put_event(q, event_create(1, h, __VA_ARGS__));
multiqueue_put_event(q, event_create(kEvPriorityNormal, h, __VA_ARGS__));


#ifdef INCLUDE_GENERATED_DECLARATIONS
Expand Down
32 changes: 17 additions & 15 deletions src/nvim/msgpack_rpc/channel.c
Expand Up @@ -432,24 +432,26 @@ static void handle_request(Channel *channel, msgpack_object *request)
handler.async = true;
}

if (handler.async) {
char buf[256] = { 0 };
memcpy(buf, method->via.bin.ptr, MIN(255, method->via.bin.size));
if (strcmp("nvim_get_mode", buf) == 0) {
handler.async = input_blocking();
}
}

RequestEvent *event_data = xmalloc(sizeof(RequestEvent));
event_data->channel = channel;
event_data->handler = handler;
event_data->args = args;
event_data->request_id = request_id;
RequestEvent *evdata = xmalloc(sizeof(RequestEvent));
evdata->channel = channel;
evdata->handler = handler;
evdata->args = args;
evdata->request_id = request_id;
incref(channel);
if (handler.async) {
on_request_event((void **)&event_data);
bool is_get_mode = sizeof("nvim_get_mode") == method->via.bin.size
&& !strncmp("nvim_get_mode", method->via.bin.ptr, method->via.bin.size);

if (input_blocking() || is_get_mode) {
// Invoke immediately.
on_request_event((void **)&evdata);
} else {
// Schedule on the main loop with special priority. #6247
Event ev = event_create(kEvPriorityAsync, on_request_event, 1, evdata);
multiqueue_put_event(channel->events, ev);
}
} else {
multiqueue_put(channel->events, on_request_event, 1, event_data);
multiqueue_put(channel->events, on_request_event, 1, evdata);
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/nvim/os/input.c
Expand Up @@ -337,12 +337,12 @@ static bool input_poll(int ms)
prof_inchar_enter();
}

if ((ms == - 1 || ms > 0)
&& !(events_enabled || input_ready() || input_eof)
) {
if ((ms == - 1 || ms > 0) && !events_enabled && !input_eof) {
// We have discovered that the pending input will provoke a blocking wait.
// Process any events marked with priority `kEvPriorityAsync`: these events
// must be handled after flushing input. See channel.c:handle_request #6247
blocking = true;
multiqueue_process_debug(main_loop.events);
multiqueue_process_events(main_loop.events);
multiqueue_process_priority(main_loop.events, kEvPriorityAsync);
}
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, ms, input_ready() || input_eof);
blocking = false;
Expand Down

0 comments on commit 0707650

Please sign in to comment.