Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "rpc: Don't delay notifications when request is pending (#6544)" #7540

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 3 additions & 29 deletions runtime/autoload/remote/define.vim
Original file line number Diff line number Diff line change
Expand Up @@ -169,40 +169,14 @@ function! remote#define#FunctionOnChannel(channel, method, sync, name, opts)
exe function_def
endfunction

let s:busy = {}
let s:pending_notifications = {}

function! s:GetRpcFunction(sync)
if a:sync ==# 'urgent'
return 'rpcnotify'
elseif a:sync
return 'remote#define#request'
if a:sync
return 'rpcrequest'
endif
return 'remote#define#notify'
return 'rpcnotify'
endfunction

function! remote#define#notify(chan, ...)
if get(s:busy, a:chan, 0) > 0
let pending = get(s:pending_notifications, a:chan, [])
call add(pending, deepcopy(a:000))
let s:pending_notifications[a:chan] = pending
else
call call('rpcnotify', [a:chan] + a:000)
endif
endfunction

function! remote#define#request(chan, ...)
let s:busy[a:chan] = get(s:busy, a:chan, 0)+1
let val = call('rpcrequest', [a:chan]+a:000)
let s:busy[a:chan] -= 1
if s:busy[a:chan] == 0
for msg in get(s:pending_notifications, a:chan, [])
call call('rpcnotify', [a:chan] + msg)
endfor
let s:pending_notifications[a:chan] = []
endif
return val
endfunction

function! s:GetCommandPrefix(name, opts)
return 'command!'.s:StringifyOpts(a:opts, ['nargs', 'complete', 'range',
Expand Down
1 change: 0 additions & 1 deletion src/clint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2531,7 +2531,6 @@ def CheckSpacing(filename, clean_lines, linenum, nesting_state, error):
r'(?<!\bkhash_t)'
r'(?<!\bkbtree_t)'
r'(?<!\bkbitr_t)'
r'(?<!\bPMap)'
r'\((?:const )?(?:struct )?[a-zA-Z_]\w*(?: *\*(?:const)?)*\)'
r' +'
r'-?(?:\*+|&)?(?:\w+|\+\+|--|\()', cast_line)
Expand Down
36 changes: 34 additions & 2 deletions src/nvim/msgpack_rpc/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef struct {
typedef struct {
uint64_t id;
size_t refcount;
size_t pending_requests;
PMap(cstr_t) *subscribed_events;
bool closed;
ChannelType type;
Expand All @@ -70,6 +71,7 @@ typedef struct {
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
MultiQueue *events;
} Channel;

Expand Down Expand Up @@ -203,7 +205,14 @@ bool channel_send_event(uint64_t id, const char *name, Array args)
}

if (channel) {
send_event(channel, name, args);
if (channel->pending_requests) {
// Pending request, queue the notification for later sending.
const String method = cstr_as_string((char *)name);
WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
kv_push(channel->delayed_notifications, buffer);
} else {
send_event(channel, name, args);
}
} else {
broadcast_event(name, args);
}
Expand Down Expand Up @@ -239,8 +248,10 @@ Object channel_send_call(uint64_t id,
// Push the frame
ChannelCallFrame frame = { request_id, false, false, NIL };
kv_push(channel->call_stack, &frame);
channel->pending_requests++;
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;

if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
Expand All @@ -265,6 +276,10 @@ Object channel_send_call(uint64_t id,
api_free_object(frame.result);
}

if (!channel->pending_requests) {
send_delayed_notifications(channel);
}

decref(channel);

return frame.errored ? NIL : frame.result;
Expand Down Expand Up @@ -689,7 +704,11 @@ static void broadcast_event(const char *name, Array args)

for (size_t i = 0; i < kv_size(subscribed); i++) {
Channel *channel = kv_A(subscribed, i);
channel_write(channel, buffer);
if (channel->pending_requests) {
kv_push(channel->delayed_notifications, buffer);
} else {
channel_write(channel, buffer);
}
}

end:
Expand Down Expand Up @@ -767,6 +786,7 @@ static void free_channel(Channel *channel)

pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
if (channel->type != kChannelTypeProc) {
multiqueue_free(channel->events);
}
Expand All @@ -791,9 +811,11 @@ static Channel *register_channel(ChannelType type, uint64_t id,
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
kv_init(rv->call_stack);
kv_init(rv->delayed_notifications);
pmap_put(uint64_t)(channels, rv->id, rv);

ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
Expand Down Expand Up @@ -890,6 +912,16 @@ static WBuffer *serialize_response(uint64_t channel_id,
return rv;
}

static void send_delayed_notifications(Channel* channel)
{
for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) {
WBuffer *buffer = kv_A(channel->delayed_notifications, i);
channel_write(channel, buffer);
}

kv_size(channel->delayed_notifications) = 0;
}

static void incref(Channel *channel)
{
channel->refcount++;
Expand Down
61 changes: 19 additions & 42 deletions test/functional/api/server_requests_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -109,28 +109,7 @@ describe('server -> client', function()
end)

describe('requests and notifications interleaved', function()
it('does not delay notifications during pending request', function()
local received = false
local function on_setup()
eq("retval", funcs.rpcrequest(cid, "doit"))
stop()
end
local function on_request(method)
if method == "doit" then
funcs.rpcnotify(cid, "headsup")
eq(true,received)
return "retval"
end
end
local function on_notification(method)
if method == "headsup" then
received = true
end
end
run(on_request, on_notification, on_setup)
end)

-- This tests the following scenario:
-- This tests that the following scenario won't happen:
--
-- server->client [request ] (1)
-- client->server [request ] (2) triggered by (1)
Expand All @@ -145,38 +124,36 @@ describe('server -> client', function()
-- only deals with one server->client request at a time. (In other words,
-- the client cannot send a response to a request that is not at the top
-- of nvim's request stack).
pending('will close connection if not properly synchronized', function()
--
-- But above scenario shoudn't happen by the way notifications are dealt in
-- Nvim: they are only sent after there are no pending server->client
-- request(the request stack fully unwinds). So (3) is only sent after the
-- client returns (6).
it('works', function()
local expected = 300
local notified = 0
local function on_setup()
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
end

local function on_request(method)
if method == "notify" then
eq(1, eval('rpcnotify('..cid..', "notification")'))
return 'notified!'
elseif method == "nested" then
-- do some busywork, so the first request will return
-- before this one
for _ = 1, 5 do
eq(2, eval("1+1"))
end
eq(1, eval('rpcnotify('..cid..', "nested_done")'))
return 'done!'
end
eq('notify', method)
eq(1, eval('rpcnotify('..cid..', "notification")'))
return 'notified!'
end

local function on_notification(method)
if method == "notification" then
eq('done!', eval('rpcrequest('..cid..', "nested")'))
elseif method == "nested_done" then
-- this should never have been sent
ok(false)
eq('notification', method)
if notified == expected then
stop()
return
end
notified = notified + 1
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
end

run(on_request, on_notification, on_setup)
-- ignore disconnect failure, otherwise detected by after_each
clear()
eq(expected, notified)
end)
end)

Expand Down