Skip to content

Commit

Permalink
box: introduce box.watch_once Lua function
Browse files Browse the repository at this point in the history
Part of #6493

@TarantoolBot document
Title: Document `box.watch_once()`

The function takes a notification key and returns the value currently
associated with it.

```yaml
tarantool> box.watch_once('foo')
---
...

tarantool> box.broadcast('foo', {a = 1, b = 2})
---
...

tarantool> box.watch_once('foo')
---
- {'a': 1, 'b': 2}
...
```

The new function can be used instead of `box.watch()` in case the caller
only needs to retrieve the current value without subscribing to future
changes.
  • Loading branch information
locker committed Jun 13, 2023
1 parent 2e936e4 commit f899bb6
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 5 deletions.
5 changes: 5 additions & 0 deletions changelogs/unreleased/gh-6493-box-watch-once.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## feature/core

* Introduced the `box.watch_once()` function to get the value currently
associated with a notification key on the local instance without subscribing
to future changes (gh-6493).
1 change: 1 addition & 0 deletions src/box/lua/load_cfg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,7 @@ local box_cfg_guard_whitelist = {
malloc = true;
ctl = true;
watch = true;
watch_once = true;
broadcast = true;
txn_isolation_level = true;
NULL = true;
Expand Down
24 changes: 24 additions & 0 deletions src/box/lua/watcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,27 @@ lbox_watch(struct lua_State *L)
return 1;
}

/**
* Lua wrapper around box_watch_once().
*/
static int
lbox_watch_once(struct lua_State *L)
{
if (lua_gettop(L) != 1)
return luaL_error(L, "Usage: box.watch_once(key)");
size_t key_len;
const char *key = luaL_checklstring(L, 1, &key_len);
const char *data_end;
const char *data = box_watch_once(key, key_len, &data_end);
if (data == NULL) {
assert(data_end == NULL);
return 0;
}
luamp_decode(L, luaL_msgpack_default, &data);
assert(data == data_end);
return 1;
}

/**
* Lua wrapper around box_broadcast().
*/
Expand Down Expand Up @@ -215,6 +236,9 @@ box_lua_watcher_init(struct lua_State *L)
lua_pushstring(L, "watch");
lua_pushcfunction(L, lbox_watch);
lua_settable(L, -3);
lua_pushstring(L, "watch_once");
lua_pushcfunction(L, lbox_watch_once);
lua_settable(L, -3);
lua_pushstring(L, "broadcast");
lua_pushcfunction(L, lbox_broadcast);
lua_settable(L, -3);
Expand Down
32 changes: 32 additions & 0 deletions src/box/watcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ watchable_node_delete(struct watchable_node *node)
free(node);
}

/**
* Looks up an returns a node by key name. Returns NULL if not found.
*/
static struct watchable_node *
watchable_find_node(struct watchable *watchable,
const char *key, size_t key_len)
{
struct mh_strnptr_t *h = watchable->node_by_key;
uint32_t key_hash = mh_strn_hash(key, key_len);
struct mh_strnptr_key_t k = {key, key_len, key_hash};
mh_int_t i = mh_strnptr_find(h, &k, NULL);
if (i != mh_end(h)) {
struct watchable_node *node = mh_strnptr_node(h, i)->val;
assert(strncmp(node->key, key, key_len) == 0);
return node;
}
return NULL;
}

/**
* Looks up and returns a node by key name. Creates a new node if not found.
*/
Expand Down Expand Up @@ -433,6 +452,19 @@ box_broadcast_fmt(const char *key, const char *format, ...)
box_broadcast(key, strlen(key), data, data + size);
}

const char *
box_watch_once(const char *key, size_t key_len, const char **end)
{
struct watchable_node *node = watchable_find_node(&box_watchable,
key, key_len);
if (node == NULL) {
*end = NULL;
return NULL;
}
*end = node->data_end;
return node->data;
}

void
box_watcher_init(void)
{
Expand Down
19 changes: 19 additions & 0 deletions src/box/watcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,25 @@ box_broadcast(const char *key, size_t key_len,
void
box_broadcast_fmt(const char *key, const char *format, ...);

/**
* Returns the data attached to a notification key.
*
* @param key Name of the notification key
* @param key_len Length of the key name
* @param[out] end End of the key data
* @retval Key data
*
* The function never fails.
*
* If there's no data attached to the given notification key (box_broadcast()
* has never been called for this key), the function returns NULL.
*
* Note that the data returned by this function may be updated by a concurrent
* call to box_broadcast() so the caller must copy it if it intends to yield.
*/
const char *
box_watch_once(const char *key, size_t key_len, const char **end);

void
box_watcher_init(void);

Expand Down
31 changes: 31 additions & 0 deletions test/box-luatest/watcher_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ g.test_no_box_cfg = function()
box.broadcast('foo')
t.helpers.retrying({}, function() t.assert_equals(count, 2) end)
w:unregister()
t.assert_equals(box.watch_once('foo'), nil)
end

-- Invalid arguments.
Expand All @@ -42,6 +43,13 @@ g.test_invalid_args = function(cg)
t.assert_error_msg_equals(
"bad argument #1 to '?' (string expected, got table)",
box.broadcast, {})
t.assert_error_msg_equals("Usage: box.watch_once(key)",
box.watch_once)
t.assert_error_msg_equals("Usage: box.watch_once(key)",
box.watch_once, 'a', 'b')
t.assert_error_msg_equals(
"bad argument #1 to '?' (string expected, got table)",
box.watch_once, {})
end)
end

Expand Down Expand Up @@ -96,6 +104,29 @@ g.after_test('test_basic', function(cg)
end)
end)

g.test_once = function(cg)
cg.server:exec(function()
t.assert_equals(box.watch_once('foo'), nil)
t.assert_equals(box.watch_once('bar'), nil)
box.broadcast('foo', {1, 2, 3})
t.assert_equals(box.watch_once('foo'), {1, 2, 3})
t.assert_equals(box.watch_once('bar'), nil)
box.broadcast('bar', 'baz')
t.assert_equals(box.watch_once('foo'), {1, 2, 3})
t.assert_equals(box.watch_once('bar'), 'baz')
box.broadcast('foo')
t.assert_equals(box.watch_once('foo'), nil)
t.assert_equals(box.watch_once('bar'), 'baz')
end)
end

g.after_test('test_once', function(cg)
cg.server:exec(function()
box.broadcast('foo')
box.broadcast('bar')
end)
end)

-- Callback is garbage collected.
g.test_callback_gc = function(cg)
cg.server:exec(function()
Expand Down
1 change: 1 addition & 0 deletions test/box/misc.result
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ t
- txn_isolation_level
- unprepare
- watch
- watch_once
...
t = nil
---
Expand Down
53 changes: 48 additions & 5 deletions test/unit/watcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,18 @@ test_watcher_key_equal(struct test_watcher *w, const char *key)
}

static bool
test_watcher_value_equal(struct test_watcher *w, const char *value)
test_value_equal(const char *data, const char *data_end, const char *value)
{
size_t data_size = w->data_end - w->data;
return ((value == NULL && w->data == NULL && w->data_end == NULL) ||
size_t data_size = data_end - data;
return ((value == NULL && data == NULL && data_end == NULL) ||
(value != NULL && strlen((value)) == data_size &&
strncmp(value, w->data, data_size) == 0));
strncmp(value, data, data_size) == 0));
}

static bool
test_watcher_value_equal(struct test_watcher *w, const char *value)
{
return test_value_equal(w->data, w->data_end, value);
}

#define test_watcher_check_args(w, key, value) \
Expand All @@ -165,6 +171,13 @@ test_broadcast(const char *key, const char *value)
value != NULL ? value + strlen(value) : NULL);
}

#define test_watch_once(key, value) \
do { \
const char *data, *data_end; \
data = box_watch_once((key), strlen(key), &data_end); \
ok(test_value_equal(data, data_end, value), "value"); \
} while (0)

/**
* Checks that watchers are invoked with correct arguments on broadcast.
*/
Expand Down Expand Up @@ -500,11 +513,40 @@ test_free(void)
footer();
}

static void
test_value(void)
{
header();
plan(8);

test_watch_once("foo", NULL);
test_watch_once("fuzz", NULL);

test_broadcast("foo", "bar");
test_broadcast("fuzz", "buzz");

test_watch_once("foo", "bar");
test_watch_once("fuzz", "buzz");

test_broadcast("foo", NULL);

test_watch_once("foo", NULL);
test_watch_once("fuzz", "buzz");

test_broadcast("fuzz", NULL);

test_watch_once("foo", NULL);
test_watch_once("fuzz", NULL);

check_plan();
footer();
}

static int
main_f(va_list ap)
{
header();
plan(8);
plan(9);
box_watcher_init();
test_basic();
test_async();
Expand All @@ -513,6 +555,7 @@ main_f(va_list ap)
test_ack();
test_ack_unregistered();
test_parallel();
test_value();
test_free(); /* must be last */
test_result = check_plan();
footer();
Expand Down

0 comments on commit f899bb6

Please sign in to comment.