Skip to content

Commit

Permalink
Revert "rpc: Don't delay notifications when request is pending (neovi…
Browse files Browse the repository at this point in the history
…m#6544)"

This reverts commit 2a3bcd1.
  • Loading branch information
justinmk committed Nov 11, 2017
1 parent b6a603f commit ad7e990
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 74 deletions.
32 changes: 3 additions & 29 deletions runtime/autoload/remote/define.vim
Expand Up @@ -169,40 +169,14 @@ function! remote#define#FunctionOnChannel(channel, method, sync, name, opts)
exe function_def
endfunction

let s:busy = {}
let s:pending_notifications = {}

function! s:GetRpcFunction(sync)
if a:sync ==# 'urgent'
return 'rpcnotify'
elseif a:sync
return 'remote#define#request'
if a:sync
return 'rpcrequest'
endif
return 'remote#define#notify'
return 'rpcnotify'
endfunction

function! remote#define#notify(chan, ...)
if get(s:busy, a:chan, 0) > 0
let pending = get(s:pending_notifications, a:chan, [])
call add(pending, deepcopy(a:000))
let s:pending_notifications[a:chan] = pending
else
call call('rpcnotify', [a:chan] + a:000)
endif
endfunction

function! remote#define#request(chan, ...)
let s:busy[a:chan] = get(s:busy, a:chan, 0)+1
let val = call('rpcrequest', [a:chan]+a:000)
let s:busy[a:chan] -= 1
if s:busy[a:chan] == 0
for msg in get(s:pending_notifications, a:chan, [])
call call('rpcnotify', [a:chan] + msg)
endfor
let s:pending_notifications[a:chan] = []
endif
return val
endfunction

function! s:GetCommandPrefix(name, opts)
return 'command!'.s:StringifyOpts(a:opts, ['nargs', 'complete', 'range',
Expand Down
1 change: 0 additions & 1 deletion src/clint.py
Expand Up @@ -2531,7 +2531,6 @@ def CheckSpacing(filename, clean_lines, linenum, nesting_state, error):
r'(?<!\bkhash_t)'
r'(?<!\bkbtree_t)'
r'(?<!\bkbitr_t)'
r'(?<!\bPMap)'
r'\((?:const )?(?:struct )?[a-zA-Z_]\w*(?: *\*(?:const)?)*\)'
r' +'
r'-?(?:\*+|&)?(?:\w+|\+\+|--|\()', cast_line)
Expand Down
36 changes: 34 additions & 2 deletions src/nvim/msgpack_rpc/channel.c
Expand Up @@ -56,6 +56,7 @@ typedef struct {
typedef struct {
uint64_t id;
size_t refcount;
size_t pending_requests;
PMap(cstr_t) *subscribed_events;
bool closed;
ChannelType type;
Expand All @@ -70,6 +71,7 @@ typedef struct {
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
MultiQueue *events;
} Channel;

Expand Down Expand Up @@ -203,7 +205,14 @@ bool channel_send_event(uint64_t id, const char *name, Array args)
}

if (channel) {
send_event(channel, name, args);
if (channel->pending_requests) {
// Pending request, queue the notification for later sending.
const String method = cstr_as_string((char *)name);
WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
kv_push(channel->delayed_notifications, buffer);
} else {
send_event(channel, name, args);
}
} else {
broadcast_event(name, args);
}
Expand Down Expand Up @@ -239,8 +248,10 @@ Object channel_send_call(uint64_t id,
// Push the frame
ChannelCallFrame frame = { request_id, false, false, NIL };
kv_push(channel->call_stack, &frame);
channel->pending_requests++;
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;

if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
Expand All @@ -265,6 +276,10 @@ Object channel_send_call(uint64_t id,
api_free_object(frame.result);
}

if (!channel->pending_requests) {
send_delayed_notifications(channel);
}

decref(channel);

return frame.errored ? NIL : frame.result;
Expand Down Expand Up @@ -689,7 +704,11 @@ static void broadcast_event(const char *name, Array args)

for (size_t i = 0; i < kv_size(subscribed); i++) {
Channel *channel = kv_A(subscribed, i);
channel_write(channel, buffer);
if (channel->pending_requests) {
kv_push(channel->delayed_notifications, buffer);
} else {
channel_write(channel, buffer);
}
}

end:
Expand Down Expand Up @@ -767,6 +786,7 @@ static void free_channel(Channel *channel)

pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
if (channel->type != kChannelTypeProc) {
multiqueue_free(channel->events);
}
Expand All @@ -791,9 +811,11 @@ static Channel *register_channel(ChannelType type, uint64_t id,
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
kv_init(rv->call_stack);
kv_init(rv->delayed_notifications);
pmap_put(uint64_t)(channels, rv->id, rv);

ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
Expand Down Expand Up @@ -890,6 +912,16 @@ static WBuffer *serialize_response(uint64_t channel_id,
return rv;
}

static void send_delayed_notifications(Channel* channel)
{
for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) {
WBuffer *buffer = kv_A(channel->delayed_notifications, i);
channel_write(channel, buffer);
}

kv_size(channel->delayed_notifications) = 0;
}

static void incref(Channel *channel)
{
channel->refcount++;
Expand Down
61 changes: 19 additions & 42 deletions test/functional/api/server_requests_spec.lua
Expand Up @@ -109,28 +109,7 @@ describe('server -> client', function()
end)

describe('requests and notifications interleaved', function()
it('does not delay notifications during pending request', function()
local received = false
local function on_setup()
eq("retval", funcs.rpcrequest(cid, "doit"))
stop()
end
local function on_request(method)
if method == "doit" then
funcs.rpcnotify(cid, "headsup")
eq(true,received)
return "retval"
end
end
local function on_notification(method)
if method == "headsup" then
received = true
end
end
run(on_request, on_notification, on_setup)
end)

-- This tests the following scenario:
-- This tests that the following scenario won't happen:
--
-- server->client [request ] (1)
-- client->server [request ] (2) triggered by (1)
Expand All @@ -145,38 +124,36 @@ describe('server -> client', function()
-- only deals with one server->client request at a time. (In other words,
-- the client cannot send a response to a request that is not at the top
-- of nvim's request stack).
pending('will close connection if not properly synchronized', function()
--
-- But above scenario shoudn't happen by the way notifications are dealt in
-- Nvim: they are only sent after there are no pending server->client
-- request(the request stack fully unwinds). So (3) is only sent after the
-- client returns (6).
it('works', function()
local expected = 300
local notified = 0
local function on_setup()
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
end

local function on_request(method)
if method == "notify" then
eq(1, eval('rpcnotify('..cid..', "notification")'))
return 'notified!'
elseif method == "nested" then
-- do some busywork, so the first request will return
-- before this one
for _ = 1, 5 do
eq(2, eval("1+1"))
end
eq(1, eval('rpcnotify('..cid..', "nested_done")'))
return 'done!'
end
eq('notify', method)
eq(1, eval('rpcnotify('..cid..', "notification")'))
return 'notified!'
end

local function on_notification(method)
if method == "notification" then
eq('done!', eval('rpcrequest('..cid..', "nested")'))
elseif method == "nested_done" then
-- this should never have been sent
ok(false)
eq('notification', method)
if notified == expected then
stop()
return
end
notified = notified + 1
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
end

run(on_request, on_notification, on_setup)
-- ignore disconnect failure, otherwise detected by after_each
clear()
eq(expected, notified)
end)
end)

Expand Down

0 comments on commit ad7e990

Please sign in to comment.