Skip to content

Commit

Permalink
net.box: introduce conn:watch_once method
Browse files Browse the repository at this point in the history
Closes tarantool#6493

@TarantoolBot document
Title: Document `conn:watch_once()` net.box connection method

The new method takes a notification key and returns the value currently
associated with it.

For example, let's assume that a Tarantool server was started with the
following script:

```lua
box.cfg{listen = 3301}
box.broadcast('foo', {1, 2, 3})
```

Then the `conn:watch_once()` method would yield the following results:

```yaml
tarantool> conn = require('net.box').connect(3301)
---
...

tarantool> conn:watch_once('foo')
---
- [1, 2, 3]
...

tarantool> conn:watch_once('bar')
---
- null
...
```

The new method can be used instead of `conn:watch()` in case the caller
only needs to retrieve the current associated with a notification key
value without subscribing to future changes.

The method can also take a net.box options table as a second argument.
It supports all the standard request options: `is_async`, `return_raw`,
`timeout`, and others. They work exactly in the same way as with other
net.box method, for example `conn:call`. For example,

```lua
local future = conn:watch_once('foo', {is_async = true})
future:wait_result()

local obj = conn:watch_once('foo', {return_raw = true})
require('msgpack').is_object(obj)
```

Like `conn:watch()`, the new method doesn't require authentication.

Like `conn:watch()`, the new method can be executed in a stream
(see `conn:new_stream()`), but it isn't streamlined (i.e. calling it
as a stream method has the same effect as calling it as a connection
method).

The net.box connection will set `conn.peer_protocol_features.watch_once`
to true if the remote end supports `conn:watch_once()`.

The new method is implemented using the `IPROTO_WATCH_ONCE` request.
  • Loading branch information
locker committed Jun 6, 2023
1 parent 0f6c6ba commit 0900816
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 1 deletion.
6 changes: 6 additions & 0 deletions changelogs/unreleased/gh-6493-net-box-watch-once.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
## feature/core

* Introduced the `conn:watch_once()` net.box connection method to get the value
currently associated with a notification key on a remote instance without
subscribing to future changes. The new method is implemented using the
`IPROTO_WATCH_ONCE` request type (gh-6493).
26 changes: 25 additions & 1 deletion src/box/lua/net_box.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ enum {
/**
* IPROTO protocol version supported by the netbox connector.
*/
NETBOX_IPROTO_VERSION = 5,
NETBOX_IPROTO_VERSION = 6,
};

/**
Expand Down Expand Up @@ -109,6 +109,7 @@ static struct iproto_features NETBOX_IPROTO_FEATURES;
_(BEGIN) \
_(COMMIT) \
_(ROLLBACK) \
_(WATCH_ONCE) \
_(INJECT) \

#define NETBOX_METHOD_MEMBER(s) \
Expand Down Expand Up @@ -1334,6 +1335,25 @@ netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream,
sync, stream_id);
}

/**
* Encodes an IPROTO_WATCH_ONCE request for the notification key stored in
* the Lua stack at the index idx.
*/
static int
netbox_encode_watch_once(struct lua_State *L, int idx, struct mpstream *stream,
uint64_t sync, uint64_t stream_id)
{
size_t key_len;
const char *key = lua_tolstring(L, idx, &key_len);
size_t svp = netbox_begin_encode(stream, sync, IPROTO_WATCH_ONCE,
stream_id);
mpstream_encode_map(stream, 1);
mpstream_encode_uint(stream, IPROTO_EVENT_KEY);
mpstream_encode_strn(stream, key, key_len);
netbox_end_encode(stream, svp);
return 0;
}

static int
netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
uint64_t sync, uint64_t stream_id)
Expand Down Expand Up @@ -1385,6 +1405,7 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
[NETBOX_BEGIN] = netbox_encode_begin,
[NETBOX_COMMIT] = netbox_encode_commit,
[NETBOX_ROLLBACK] = netbox_encode_rollback,
[NETBOX_WATCH_ONCE] = netbox_encode_watch_once,
[NETBOX_INJECT] = netbox_encode_inject,
};
struct mpstream stream;
Expand Down Expand Up @@ -1862,6 +1883,7 @@ netbox_decode_method(struct lua_State *L, enum netbox_method method,
[NETBOX_BEGIN] = netbox_decode_nil,
[NETBOX_COMMIT] = netbox_decode_nil,
[NETBOX_ROLLBACK] = netbox_decode_nil,
[NETBOX_WATCH_ONCE] = netbox_decode_value,
[NETBOX_INJECT] = netbox_decode_table,
};
method_decoder[method](L, data, data_end, return_raw, format);
Expand Down Expand Up @@ -3054,6 +3076,8 @@ luaopen_net_box(struct lua_State *L)
IPROTO_FEATURE_PAGINATION);
iproto_features_set(&NETBOX_IPROTO_FEATURES,
IPROTO_FEATURE_SPACE_AND_INDEX_NAMES);
iproto_features_set(&NETBOX_IPROTO_FEATURES,
IPROTO_FEATURE_WATCH_ONCE);

lua_pushcfunction(L, luaT_netbox_request_iterator_next);
luaT_netbox_request_iterator_next_ref = luaL_ref(L, LUA_REGISTRYINDEX);
Expand Down
9 changes: 9 additions & 0 deletions src/box/lua/net_box.lua
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,15 @@ function remote_methods:watch(key, func)
return watcher
end

function remote_methods:watch_once(key, opts)
check_remote_arg(self, 'watch_once')
if type(key) ~= 'string' then
box.error(E_PROC_LUA, 'key must be a string')
end
check_param_table(opts, REQUEST_OPTION_TYPES)
return self:_request('WATCH_ONCE', opts, nil, nil, key)
end

function remote_methods:close()
check_remote_arg(self, 'close')
self._transport:stop(true)
Expand Down
1 change: 1 addition & 0 deletions test/box-luatest/gh_6305_net_box_autocomplete_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ g.test_autocomplete = function()
'conn1:on_shutdown(',
'conn1:wait_connected(',
'conn1:watch(',
'conn1:watch_once(',
'conn1:execute(',
'conn1:wait_state(',
'conn1:ping(',
Expand Down
123 changes: 123 additions & 0 deletions test/box-luatest/net_box_watcher_test.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local fiber = require('fiber')
local msgpack = require('msgpack')
local net = require('net.box')
local server = require('luatest.server')
local t = require('luatest')
Expand Down Expand Up @@ -29,6 +30,15 @@ g.test_invalid_args = function(cg)
conn.watch, conn, 123, function() end)
t.assert_error_msg_equals("func must be a function",
conn.watch, conn, 'abc', 123)
t.assert_error_msg_equals(
"Use remote:watch_once(...) instead of remote.watch_once(...):",
conn.watch_once)
t.assert_error_msg_equals("key must be a string",
conn.watch_once, conn, 123)
t.assert_error_msg_equals("Illegal parameters, options should be a table",
conn.watch_once, conn, 'abc', 123)
t.assert_error_msg_equals("Illegal parameters, unexpected option 'foo'",
conn.watch_once, conn, 'abc', {foo = 'bar'})
conn:close()
end

Expand Down Expand Up @@ -305,3 +315,116 @@ g.after_test('test_reconnect', function(cg)
box.broadcast('foo')
end)
end)

-- Check peer protocol features reported by net.box connection.
g.test_features = function(cg)
local conn = net.connect(cg.server.net_box_uri)
t.assert_ge(conn.peer_protocol_version, 6)
t.assert(conn.peer_protocol_features.watchers)
t.assert(conn.peer_protocol_features.watch_once)
conn:close()
end

-- Check that conn.watch_once returns the actual state value.
g.test_watch_once = function(cg)
local conn = net.connect(cg.server.net_box_uri)
t.assert_equals(conn:watch_once('foo'), nil)
t.assert_equals(conn:watch_once('bar'), nil)
cg.broadcast('foo', {1, 2, 3})
t.assert_equals(conn:watch_once('foo'), {1, 2, 3})
t.assert_equals(conn:watch_once('bar'), nil)
cg.broadcast('bar', 'test')
t.assert_equals(conn:watch_once('foo'), {1, 2, 3})
t.assert_equals(conn:watch_once('bar'), 'test')
cg.broadcast('foo', nil)
t.assert_equals(conn:watch_once('foo'), nil)
t.assert_equals(conn:watch_once('bar'), 'test')
conn:close()
end

g.after_test('test_watch_once', function(cg)
cg.broadcast('foo', nil)
cg.broadcast('bar', nil)
end)

-- Check that conn.watch_once may be used with return_raw.
g.test_watch_once_raw = function(cg)
local conn = net.connect(cg.server.net_box_uri)
cg.broadcast('foo', {a = 1, b = 2})
local o = conn:watch_once('foo', {return_raw = true})
t.assert(msgpack.is_object(o))
t.assert_equals(o:decode(), {a = 1, b = 2})
conn:close()
end

g.after_test('test_watch_once_raw', function(cg)
cg.broadcast('foo', nil)
end)

-- Check that conn.watch_once may be used with is_async.
g.test_watch_once_async = function(cg)
local conn = net.connect(cg.server.net_box_uri)
cg.broadcast('foo', {'foo', 'bar'})
local f = conn:watch_once('foo', {is_async = true})
t.assert(f:wait_result(), {'foo', 'bar'})
conn:close()
end

g.after_test('test_watch_once_async', function(cg)
cg.broadcast('foo', nil)
end)

-- Check that conn.watch and conn.watch_once may be used in a stream but
-- they do not participate in request streamlining.
g.before_test('test_streams', function(cg)
cg.server:exec(function()
local ch = require('fiber').channel()
rawset(_G, 'wait', function() return ch:get() end)
rawset(_G, 'wakeup', function() ch:put(true) end)
end)
end)

g.test_streams = function(cg)
local conn = net.connect(cg.server.net_box_uri)
local stream = conn:new_stream()

-- Block the stream.
local f1 = stream:call('wait', {}, {is_async = true})
local f2 = stream:eval('return true', {}, {is_async = true})
t.assert_not(f1:wait_result(0.01)) -- blocked until woken up
t.assert_not(f2:wait_result(0.01)) -- blocked by f1

-- Check that watch and watch_once still work.
local f = stream:watch_once('foo', {is_async = true})
t.assert_equals(f:wait_result(60), nil)
local count = 0
local key, value
stream:watch('foo', function(k, v)
count = count + 1
key = k
value = v
end)
t.helpers.retrying({}, function() t.assert_equals(count, 1) end)
t.assert_equals(key, 'foo')
t.assert_equals(value, nil)
cg.broadcast('foo', 'bar')
f = stream:watch_once('foo', {is_async = true})
t.assert_equals(f:wait_result(60), 'bar')
t.helpers.retrying({}, function() t.assert_equals(count, 2) end)
t.assert_equals(key, 'foo')
t.assert_equals(value, 'bar')

-- Unblock the stream.
conn:call('wakeup')
t.assert(f1:wait_result(60))
t.assert(f2:wait_result(60))
conn:close()
end

g.after_test('test_streams', function(cg)
cg.server:exec(function()
rawset(_G, 'wait', nil)
rawset(_G, 'wakeup', nil)
box.broadcast('foo', nil)
end)
end)

0 comments on commit 0900816

Please sign in to comment.