Skip to content

Commit

Permalink
msgpack_rpc: Don't delay notifications when request is pending
Browse files Browse the repository at this point in the history
For compatibily, rplugin infrastructure reimplements this behavior
by default, it can be turned off using sync="urgent".
  • Loading branch information
bfredl committed Oct 27, 2017
1 parent f0c2f82 commit 1c84427
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 56 deletions.
32 changes: 29 additions & 3 deletions runtime/autoload/remote/define.vim
Expand Up @@ -169,14 +169,40 @@ function! remote#define#FunctionOnChannel(channel, method, sync, name, opts)
exe function_def
endfunction

let s:busy = {}
let s:pending_notifications = {}

function! s:GetRpcFunction(sync)
if a:sync
return 'rpcrequest'
if a:sync ==# 'urgent'
return 'rpcnotify'
elseif a:sync
return 'remote#define#request'
endif
return 'rpcnotify'
return 'remote#define#notify'
endfunction

function! remote#define#notify(chan, ...)
if get(s:busy, a:chan, 0) > 0
let pending = get(s:pending_notifications, a:chan, [])
call add(pending, deepcopy(a:000))
let s:pending_notifications[a:chan] = pending
else
call call('rpcnotify', [a:chan] + a:000)
endif
endfunction

function! remote#define#request(chan, ...)
let s:busy[a:chan] = get(s:busy, a:chan, 0)+1
let val = call('rpcrequest', [a:chan]+a:000)
let s:busy[a:chan] -= 1
if s:busy[a:chan] == 0
for msg in get(s:pending_notifications, a:chan, [])
call call('rpcnotify', [a:chan] + msg)
endfor
let s:pending_notifications[a:chan] = []
endif
return val
endfunction

function! s:GetCommandPrefix(name, opts)
return 'command!'.s:StringifyOpts(a:opts, ['nargs', 'complete', 'range',
Expand Down
1 change: 1 addition & 0 deletions src/clint.py
Expand Up @@ -2531,6 +2531,7 @@ def CheckSpacing(filename, clean_lines, linenum, nesting_state, error):
r'(?<!\bkhash_t)'
r'(?<!\bkbtree_t)'
r'(?<!\bkbitr_t)'
r'(?<!\bPMap)'
r'\((?:const )?(?:struct )?[a-zA-Z_]\w*(?: *\*(?:const)?)*\)'
r' +'
r'-?(?:\*+|&)?(?:\w+|\+\+|--|\()', cast_line)
Expand Down
36 changes: 2 additions & 34 deletions src/nvim/msgpack_rpc/channel.c
Expand Up @@ -56,7 +56,6 @@ typedef struct {
typedef struct {
uint64_t id;
size_t refcount;
size_t pending_requests;
PMap(cstr_t) *subscribed_events;
bool closed;
ChannelType type;
Expand All @@ -71,7 +70,6 @@ typedef struct {
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
MultiQueue *events;
} Channel;

Expand Down Expand Up @@ -205,14 +203,7 @@ bool channel_send_event(uint64_t id, const char *name, Array args)
}

if (channel) {
if (channel->pending_requests) {
// Pending request, queue the notification for later sending.
const String method = cstr_as_string((char *)name);
WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
kv_push(channel->delayed_notifications, buffer);
} else {
send_event(channel, name, args);
}
send_event(channel, name, args);
} else {
broadcast_event(name, args);
}
Expand Down Expand Up @@ -248,10 +239,8 @@ Object channel_send_call(uint64_t id,
// Push the frame
ChannelCallFrame frame = { request_id, false, false, NIL };
kv_push(channel->call_stack, &frame);
channel->pending_requests++;
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;

if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
Expand All @@ -276,10 +265,6 @@ Object channel_send_call(uint64_t id,
api_free_object(frame.result);
}

if (!channel->pending_requests) {
send_delayed_notifications(channel);
}

decref(channel);

return frame.errored ? NIL : frame.result;
Expand Down Expand Up @@ -704,11 +689,7 @@ static void broadcast_event(const char *name, Array args)

for (size_t i = 0; i < kv_size(subscribed); i++) {
Channel *channel = kv_A(subscribed, i);
if (channel->pending_requests) {
kv_push(channel->delayed_notifications, buffer);
} else {
channel_write(channel, buffer);
}
channel_write(channel, buffer);
}

end:
Expand Down Expand Up @@ -786,7 +767,6 @@ static void free_channel(Channel *channel)

pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
if (channel->type != kChannelTypeProc) {
multiqueue_free(channel->events);
}
Expand All @@ -811,11 +791,9 @@ static Channel *register_channel(ChannelType type, uint64_t id,
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
kv_init(rv->call_stack);
kv_init(rv->delayed_notifications);
pmap_put(uint64_t)(channels, rv->id, rv);

ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
Expand Down Expand Up @@ -912,16 +890,6 @@ static WBuffer *serialize_response(uint64_t channel_id,
return rv;
}

static void send_delayed_notifications(Channel* channel)
{
for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) {
WBuffer *buffer = kv_A(channel->delayed_notifications, i);
channel_write(channel, buffer);
}

kv_size(channel->delayed_notifications) = 0;
}

static void incref(Channel *channel)
{
channel->refcount++;
Expand Down
61 changes: 42 additions & 19 deletions test/functional/api/server_requests_spec.lua
Expand Up @@ -109,7 +109,28 @@ describe('server -> client', function()
end)

describe('requests and notifications interleaved', function()
-- This tests that the following scenario won't happen:
it('does not delay notifications during pending request', function()
local received = false
local function on_setup()
eq("retval", funcs.rpcrequest(cid, "doit"))
stop()
end
local function on_request(method)
if method == "doit" then
funcs.rpcnotify(cid, "headsup")
eq(true,received)
return "retval"
end
end
local function on_notification(method)
if method == "headsup" then
received = true
end
end
run(on_request, on_notification, on_setup)
end)

-- This tests the following scenario:
--
-- server->client [request ] (1)
-- client->server [request ] (2) triggered by (1)
Expand All @@ -124,36 +145,38 @@ describe('server -> client', function()
-- only deals with one server->client request at a time. (In other words,
-- the client cannot send a response to a request that is not at the top
-- of nvim's request stack).
--
-- But above scenario shoudn't happen by the way notifications are dealt in
-- Nvim: they are only sent after there are no pending server->client
-- request(the request stack fully unwinds). So (3) is only sent after the
-- client returns (6).
it('works', function()
local expected = 300
local notified = 0
it('will close connection if not properly synchronized', function()
local function on_setup()
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
end

local function on_request(method)
eq('notify', method)
eq(1, eval('rpcnotify('..cid..', "notification")'))
return 'notified!'
if method == "notify" then
eq(1, eval('rpcnotify('..cid..', "notification")'))
return 'notified!'
elseif method == "nested" then
-- do some busywork, so the first request will return
-- before this one
for _ = 1, 5 do
eq(2, eval("1+1"))
end
eq(1, eval('rpcnotify('..cid..', "nested_done")'))
return 'done!'
end
end

local function on_notification(method)
eq('notification', method)
if notified == expected then
stop()
return
if method == "notification" then
eq('done!', eval('rpcrequest('..cid..', "nested")'))
elseif method == "nested_done" then
-- this should never have been sent
ok(false)
end
notified = notified + 1
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
end

run(on_request, on_notification, on_setup)
eq(expected, notified)
-- ignore disconnect failure, otherwise detected by after_each
clear()
end)
end)

Expand Down

0 comments on commit 1c84427

Please sign in to comment.