Skip to content

Commit

Permalink
msgpack_rpc: don't delay notifications when request is pending
Browse files Browse the repository at this point in the history
for compatibily, rplugin infrastructure reimplements behavior by
default, it can be turned off using sync="urgent"
  • Loading branch information
bfredl committed Oct 24, 2017
1 parent fdd9b19 commit fca8fcc
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 47 deletions.
32 changes: 29 additions & 3 deletions runtime/autoload/remote/define.vim
Expand Up @@ -169,14 +169,40 @@ function! remote#define#FunctionOnChannel(channel, method, sync, name, opts)
exe function_def
endfunction

let s:busy = {}
let s:pending_notifications = {}

function! s:GetRpcFunction(sync)
if a:sync
return 'rpcrequest'
if a:sync ==# 'urgent'
return 'rpcnotify'
elseif a:sync
return 'remote#define#request'
endif
return 'rpcnotify'
return 'remote#define#notify'
endfunction

function! remote#define#notify(chan, ...)
if get(s:busy, a:chan, 0) > 0
let pending = get(s:pending_notifications, a:chan, [])
call add(pending, deepcopy(a:000))
let s:pending_notifications[a:chan] = pending
else
call call('rpcnotify', [a:chan] + a:000)
endif
endfunction

function! remote#define#request(chan, ...)
let s:busy[a:chan] = get(s:busy, a:chan, 0)+1
let val = call('rpcrequest', [a:chan]+a:000)
let s:busy[a:chan] -= 1
if s:busy[a:chan] == 0
for msg in get(s:pending_notifications, a:chan, [])
call call('rpcnotify', [a:chan] + msg)
endfor
let s:pending_notifications[a:chan] = []
endif
return val
endfunction

function! s:GetCommandPrefix(name, opts)
return 'command!'.s:StringifyOpts(a:opts, ['nargs', 'complete', 'range',
Expand Down
36 changes: 2 additions & 34 deletions src/nvim/msgpack_rpc/channel.c
Expand Up @@ -56,7 +56,6 @@ typedef struct {
typedef struct {
uint64_t id;
size_t refcount;
size_t pending_requests;
PMap(cstr_t) *subscribed_events;
bool closed;
ChannelType type;
Expand All @@ -71,7 +70,6 @@ typedef struct {
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
MultiQueue *events;
} Channel;

Expand Down Expand Up @@ -205,14 +203,7 @@ bool channel_send_event(uint64_t id, const char *name, Array args)
}

if (channel) {
if (channel->pending_requests) {
// Pending request, queue the notification for later sending.
const String method = cstr_as_string((char *)name);
WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
kv_push(channel->delayed_notifications, buffer);
} else {
send_event(channel, name, args);
}
send_event(channel, name, args);
} else {
broadcast_event(name, args);
}
Expand Down Expand Up @@ -248,10 +239,8 @@ Object channel_send_call(uint64_t id,
// Push the frame
ChannelCallFrame frame = { request_id, false, false, NIL };
kv_push(channel->call_stack, &frame);
channel->pending_requests++;
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;

if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
Expand All @@ -276,10 +265,6 @@ Object channel_send_call(uint64_t id,
api_free_object(frame.result);
}

if (!channel->pending_requests) {
send_delayed_notifications(channel);
}

decref(channel);

return frame.errored ? NIL : frame.result;
Expand Down Expand Up @@ -704,11 +689,7 @@ static void broadcast_event(const char *name, Array args)

for (size_t i = 0; i < kv_size(subscribed); i++) {
Channel *channel = kv_A(subscribed, i);
if (channel->pending_requests) {
kv_push(channel->delayed_notifications, buffer);
} else {
channel_write(channel, buffer);
}
channel_write(channel, buffer);
}

end:
Expand Down Expand Up @@ -786,7 +767,6 @@ static void free_channel(Channel *channel)

pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
if (channel->type != kChannelTypeProc) {
multiqueue_free(channel->events);
}
Expand All @@ -811,11 +791,9 @@ static Channel *register_channel(ChannelType type, uint64_t id,
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
kv_init(rv->call_stack);
kv_init(rv->delayed_notifications);
pmap_put(uint64_t)(channels, rv->id, rv);

ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
Expand Down Expand Up @@ -912,16 +890,6 @@ static WBuffer *serialize_response(uint64_t channel_id,
return rv;
}

static void send_delayed_notifications(Channel* channel)
{
for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) {
WBuffer *buffer = kv_A(channel->delayed_notifications, i);
channel_write(channel, buffer);
}

kv_size(channel->delayed_notifications) = 0;
}

static void incref(Channel *channel)
{
channel->refcount++;
Expand Down
23 changes: 22 additions & 1 deletion test/functional/api/server_notifications_spec.lua
Expand Up @@ -2,7 +2,7 @@ local helpers = require('test.functional.helpers')(after_each)
local eq, clear, eval, command, nvim, next_message =
helpers.eq, helpers.clear, helpers.eval, helpers.command, helpers.nvim,
helpers.next_message
local meths = helpers.meths
local meths, run, funcs = helpers.meths, helpers.run, helpers.funcs

describe('notify', function()
local channel
Expand All @@ -22,6 +22,27 @@ describe('notify', function()
end)
end)

it('does not delay notifications during pending request', function()
local received = false
local function on_setup()
eq("retval", funcs.rpcrequest(channel, "doit"))
helpers.stop()
end
local function on_request(method)
if method == "doit" then
funcs.rpcnotify(channel, "headsup")
eq(true,received)
return "retval"
end
end
local function on_notification(method)
if method == "headsup" then
received = true
end
end
run(on_request, on_notification, on_setup)
end)

describe('passing 0 as the channel id', function()
it('sends the notification/args to all subscribed channels', function()
nvim('subscribe', 'event2')
Expand Down
13 changes: 4 additions & 9 deletions test/functional/api/server_requests_spec.lua
Expand Up @@ -109,7 +109,7 @@ describe('server -> client', function()
end)

describe('requests and notifications interleaved', function()
-- This tests that the following scenario won't happen:
-- This tests that the following scenario:
--
-- server->client [request ] (1)
-- client->server [request ] (2) triggered by (1)
Expand All @@ -124,13 +124,7 @@ describe('server -> client', function()
-- only deals with one server->client request at a time. (In other words,
-- the client cannot send a response to a request that is not at the top
-- of nvim's request stack).
--
-- But above scenario shoudn't happen by the way notifications are dealt in
-- Nvim: they are only sent after there are no pending server->client
-- request(the request stack fully unwinds). So (3) is only sent after the
-- client returns (6).
it('works', function()
local expected = 300
local notified = 0
local function on_setup()
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
Expand All @@ -144,7 +138,7 @@ describe('server -> client', function()

local function on_notification(method)
eq('notification', method)
if notified == expected then
if notified > 1 then
stop()
return
end
Expand All @@ -153,7 +147,8 @@ describe('server -> client', function()
end

run(on_request, on_notification, on_setup)
eq(expected, notified)
-- the recursion was aborted.
eq(1, notified)
end)
end)

Expand Down

0 comments on commit fca8fcc

Please sign in to comment.