Skip to content

Commit

Permalink
iproto: introduce IPROTO_WATCH_ONCE request
Browse files Browse the repository at this point in the history
Part of #6493

@TarantoolBot document
Title: Document `IPROTO_WATCH_ONCE`

The new request type `IPROTO_WATCH_ONCE` has code 77 and can be used to
synchronously fetch the value currently associated with a specified
notification key. You can use it instead of `IPROTO_WATCH` if you just
need to get the current value without subscribing to future changes.

The new request is a standard synchronous request. Like any other
synchronous request (for example, `IPROTO_SELECT`), it takes a sync
number (`IPROTO_SYNC`) and optionally the desired schema version
(`IPROTO_SCHEMA_VERSION`) in the header. The same sync number and
the actual schema version are returned in the response header.

The request body is supposed to contain a single key `IPROTO_EVENT_KEY`
(which is also used by `IPROTO_WATCH`) with a string value that stores
the notification key of interest. The actual value is returned in the
response body, in the first entry of the `IPROTO_DATA` array. If there's
no value associated with a notification key (the key has never been
broadcast or was last set to nil), the `IPROTO_DATA` array will be
empty. (Note that `IPROTO_DATA` is also used by most other synchronous
requests. For example, `IPROTO_SELECT` returns the selected tuple array
in it.)

For example, suppose a key was broadcast with the following command on
the server:

```Lua
box.broadcast('foo', {1, 2, 3})
```

Then `IPROTO_WATCH_ONCE` for `IPROTO_EVENT_KEY` equal to 'foo' will
return `IPROTO_DATA' equal to `[[1, 2, 3]]` (an array of one entry
containing the current value).

If the key didn't exist or was set to nil with

```Lua
box.broadcast('foo', nil)
```

then `IPROTO_WATCH_ONCE` would return `IPROTO_DATA` equal to `[]`
(an empty array).

The request shouldn't normally fail. It may fail only on some sort of
system error (out of memory; socket error), on schema version mismatch,
or on invalid input.

Like `IPROTO_WATCH`, the new request doesn't require authentication.

Like `IPROTO_WATCH`, the new request can't processed in a stream
(`IPROTO_STREAM_ID` must not be set in the request header).

If a server supports the `IPROTO_WATCH_ONCE` request, it'll set the
`IPROTO_FEATURE_WATCH_ONCE = 6` bit in the protocol feature mask and
report the protocol version >= 6 in response to `IPROTO_ID`.
  • Loading branch information
locker committed Jun 13, 2023
1 parent f899bb6 commit 6dc1433
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 18 deletions.
5 changes: 5 additions & 0 deletions changelogs/unreleased/gh-6493-iproto-watch-once.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## feature/core

* Introduced the new `IPROTO_WATCH_ONCE` request to get the value currently
associated with a notification key on a remote instance without subscribing
to future changes (gh-6493).
14 changes: 14 additions & 0 deletions src/box/iproto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
#include "on_shutdown.h"
#include "flightrec.h"
#include "security.h"
#include "watcher.h"

enum {
IPROTO_SALT_SIZE = 32,
Expand Down Expand Up @@ -1696,6 +1697,7 @@ iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route)
return 0;
case IPROTO_WATCH:
case IPROTO_UNWATCH:
case IPROTO_WATCH_ONCE:
*route = iproto_thread->misc_route;
ERROR_INJECT(ERRINJ_IPROTO_DISABLE_WATCH, {
*route = NULL;
Expand Down Expand Up @@ -2457,6 +2459,18 @@ tx_process_misc(struct cmsg *m)
msg->watch.key_len);
/* Sic: no reply. */
break;
case IPROTO_WATCH_ONCE: {
const char *data, *data_end;
data = box_watch_once(msg->watch.key,
msg->watch.key_len, &data_end);
if (iproto_prepare_select(out, &header) != 0)
diag_raise();
obuf_dup_xc(out, data, data_end - data);
iproto_reply_select(out, &header, msg->header.sync,
::schema_version,
data != NULL ? 1 : 0);
break;
}
default:
unreachable();
}
Expand Down
5 changes: 5 additions & 0 deletions src/box/iproto_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ iproto_key_bit(unsigned char key)
_(WATCH, 74) \
_(UNWATCH, 75) \
_(EVENT, 76) \
/**
* Synchronous request to fetch the data that is currently attached to
* a notification key without subscribing to changes.
*/ \
_(WATCH_ONCE, 77) \
\
/**
* The following three requests are reserved for vinyl types.
Expand Down
2 changes: 2 additions & 0 deletions src/box/iproto_features.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ iproto_features_init(void)
IPROTO_FEATURE_PAGINATION);
iproto_features_set(&IPROTO_CURRENT_FEATURES,
IPROTO_FEATURE_SPACE_AND_INDEX_NAMES);
iproto_features_set(&IPROTO_CURRENT_FEATURES,
IPROTO_FEATURE_WATCH_ONCE);
}
4 changes: 3 additions & 1 deletion src/box/iproto_features.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ extern "C" {
* IPROTO_UPDATE and IPROTO_UPSERT request body.
*/ \
_(SPACE_AND_INDEX_NAMES, 5) \
/** IPROTO_WATCH_ONCE request support. */ \
_(WATCH_ONCE, 6) \

#define IPROTO_FEATURE_MEMBER(s, v) IPROTO_FEATURE_ ## s = v,

Expand All @@ -81,7 +83,7 @@ struct iproto_features {
* `box.iproto.protocol_version` needs to be updated correspondingly.
*/
enum {
IPROTO_CURRENT_VERSION = 5,
IPROTO_CURRENT_VERSION = 6,
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ local reference_table = {
WATCH = 74,
UNWATCH = 75,
EVENT = 76,
WATCH_ONCE = 77,
CHUNK = 128,
TYPE_ERROR = bit.lshift(1, 15),
UNKNOWN = -1,
Expand All @@ -163,7 +164,7 @@ local reference_table = {
},

-- `IPROTO_CURRENT_VERSION` constant
protocol_version = 5,
protocol_version = 6,

-- `feature_id` enumeration
protocol_features = {
Expand All @@ -173,6 +174,7 @@ local reference_table = {
watchers = true,
pagination = true,
space_and_index_names = true,
watch_once = true,
},
feature = {
streams = 0,
Expand All @@ -181,6 +183,7 @@ local reference_table = {
watchers = 3,
pagination = 4,
space_and_index_names = 5,
watch_once = 6,
},
}

Expand Down
38 changes: 36 additions & 2 deletions test/box-py/iproto.result
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ Invalid MsgPack - request body
# Invalid auth_type
Invalid MsgPack - request body
# Empty request body
version=5, features=[0, 1, 2, 3, 4, 5], auth_type=chap-sha1
version=6, features=[0, 1, 2, 3, 4, 5, 6], auth_type=chap-sha1
# Unknown version and features
version=5, features=[0, 1, 2, 3, 4, 5], auth_type=chap-sha1
version=6, features=[0, 1, 2, 3, 4, 5, 6], auth_type=chap-sha1

#
# gh-6257 Watchers
Expand Down Expand Up @@ -260,6 +260,40 @@ box.broadcast('foo', nil)
box.broadcast('bar', nil)
---
...

#
# gh-6493 IPROTO_WATCH_ONCE
#

# Schema version
sync=1001
Error on invalid schema_id: True
Got schema_id: True
sync=1002
Success on valid schema_id: True
Same schema_id: True
# Streaming unsupported
sync=1003, error: Unable to process WATCH_ONCE request in stream
# Invalid key
sync=1004, error: Missing mandatory field 'EVENT_KEY' in request
sync=1005, error: Invalid MsgPack - packet body
# Missing key
sync=1006, ok
Data: []
Existing key
box.broadcast('foo', {1, 2, 3})
---
...
sync=1007, ok
Data: [[1, 2, 3]]
box.broadcast('foo', nil)
---
...

#
# gh-7639 Pagination
#

space = box.schema.space.create('test', { id = 567 })
---
...
Expand Down
102 changes: 91 additions & 11 deletions test/box-py/iproto.test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# FIXME: Remove after the new constants are added to the Python connector.
if not 'REQUEST_TYPE_ID' in locals():
REQUEST_TYPE_ID = 73
IPROTO_SCHEMA_VERSION = 0x05
IPROTO_STREAM_ID = 0x0a
IPROTO_VERSION = 0x54
IPROTO_FEATURES = 0x55
IPROTO_AUTH_TYPE = 0x5b
Expand All @@ -23,6 +25,7 @@
REQUEST_TYPE_WATCH = 74
REQUEST_TYPE_UNWATCH = 75
REQUEST_TYPE_EVENT = 76
REQUEST_TYPE_WATCH_ONCE = 77
IPROTO_EVENT_KEY = 0x57
IPROTO_EVENT_DATA = 0x58

Expand Down Expand Up @@ -301,30 +304,34 @@ def resp_status(resp):
IPROTO_LIMIT: 1 }
resp = test_request(header, body)
print("Normal connect done w/o errors:", resp["header"][0] == 0)
print("Got schema_id:", resp["header"][5] > 0)
schema_id = resp["header"][5]
print("Got schema_id:", resp["header"][IPROTO_SCHEMA_VERSION] > 0)
schema_id = resp["header"][IPROTO_SCHEMA_VERSION]

header = { IPROTO_CODE : REQUEST_TYPE_SELECT, 5 : 0 }
header = { IPROTO_CODE : REQUEST_TYPE_SELECT, IPROTO_SCHEMA_VERSION : 0 }
resp = test_request(header, body)
print("Zero-schema_id connect done w/o errors:", resp["header"][0] == 0)
print("Same schema_id:", resp["header"][5] == schema_id)
print("Same schema_id:", resp["header"][IPROTO_SCHEMA_VERSION] == schema_id)

header = { IPROTO_CODE : REQUEST_TYPE_SELECT, 5 : schema_id }
header = { IPROTO_CODE : REQUEST_TYPE_SELECT,
IPROTO_SCHEMA_VERSION : schema_id }
resp = test_request(header, body)
print("Normal connect done w/o errors:", resp["header"][0] == 0)
print("Same schema_id:", resp["header"][5] == schema_id)
print("Same schema_id:", resp["header"][IPROTO_SCHEMA_VERSION] == schema_id)

header = { IPROTO_CODE : REQUEST_TYPE_SELECT, 5 : schema_id + 1 }
header = { IPROTO_CODE : REQUEST_TYPE_SELECT,
IPROTO_SCHEMA_VERSION : schema_id + 1 }
resp = test_request(header, body)
print("Wrong schema_id leads to error:", resp["header"][0] != 0)
print("Same schema_id:", resp["header"][5] == schema_id)
print("Same schema_id:", resp["header"][IPROTO_SCHEMA_VERSION] == schema_id)

admin("space2 = box.schema.create_space('test2')")

header = { IPROTO_CODE : REQUEST_TYPE_SELECT, 5 : schema_id }
header = { IPROTO_CODE : REQUEST_TYPE_SELECT,
IPROTO_SCHEMA_VERSION : schema_id }
resp = test_request(header, body)
print("Schema changed -> error:", resp["header"][0] != 0)
print("Got another schema_id:", resp["header"][5] != schema_id)
print("Got another schema_id:",
resp["header"][IPROTO_SCHEMA_VERSION] != schema_id)

#
# gh-2334 Lost SYNC in JOIN response.
Expand Down Expand Up @@ -592,7 +599,80 @@ def check_no_event():
admin("box.broadcast('foo', nil)")
admin("box.broadcast('bar', nil)")

# Pagination
print("""
#
# gh-6493 IPROTO_WATCH_ONCE
#
""")
c = Connection(None, server.iproto.port)
c.connect()
s = c._socket
sync = 1000

print("# Schema version")
sync += 1
header = { IPROTO_CODE: REQUEST_TYPE_WATCH_ONCE, IPROTO_SYNC: sync,
IPROTO_SCHEMA_VERSION: 9999 }
body = { IPROTO_EVENT_KEY: 'foo' }
resp = test_request(header, body)
print("sync={}".format(resp["header"][IPROTO_SYNC]))
print("Error on invalid schema_id:", resp["header"][IPROTO_CODE] != 0)
schema_id = resp["header"][IPROTO_SCHEMA_VERSION]
print("Got schema_id:", schema_id > 0)
sync += 1
header[IPROTO_SYNC] = sync
header[IPROTO_SCHEMA_VERSION] = schema_id
resp = test_request(header, body)
print("sync={}".format(resp["header"][IPROTO_SYNC]))
print("Success on valid schema_id:", resp["header"][IPROTO_CODE] == 0)
print("Same schema_id:", resp["header"][IPROTO_SCHEMA_VERSION] == schema_id)

print("# Streaming unsupported")
sync += 1
header = { IPROTO_CODE: REQUEST_TYPE_WATCH_ONCE, IPROTO_SYNC: sync,
IPROTO_STREAM_ID: 555 }
body = { IPROTO_EVENT_KEY: 'foo' }
resp = test_request(header, body)
print("sync={}, {}".format(resp["header"][IPROTO_SYNC], resp_status(resp)))

print("# Invalid key")
sync += 1
header = { IPROTO_CODE: REQUEST_TYPE_WATCH_ONCE, IPROTO_SYNC: sync }
body = {}
resp = test_request(header, body)
print("sync={}, {}".format(resp["header"][IPROTO_SYNC], resp_status(resp)))
sync += 1
header[IPROTO_SYNC] = sync
body[IPROTO_EVENT_KEY] = 123
resp = test_request(header, body)
print("sync={}, {}".format(resp["header"][IPROTO_SYNC], resp_status(resp)))

print("# Missing key")
sync += 1
header = { IPROTO_CODE: REQUEST_TYPE_WATCH_ONCE, IPROTO_SYNC: sync }
body = { IPROTO_EVENT_KEY: 'foo' }
resp = test_request(header, body)
print("sync={}, {}".format(resp["header"][IPROTO_SYNC], resp_status(resp)))
print("Data:", resp["body"][IPROTO_DATA])

print(" Existing key")
admin("box.broadcast('foo', {1, 2, 3})")
sync += 1
header = { IPROTO_CODE: REQUEST_TYPE_WATCH_ONCE, IPROTO_SYNC: sync }
body = { IPROTO_EVENT_KEY: 'foo' }
resp = test_request(header, body)
print("sync={}, {}".format(resp["header"][IPROTO_SYNC], resp_status(resp)))
print("Data:", resp["body"][IPROTO_DATA])
admin("box.broadcast('foo', nil)")

# Cleanup
c.close()

print("""
#
# gh-7639 Pagination
#
""")
admin("space = box.schema.space.create('test', { id = 567 })")
admin("index = space:create_index('primary', { type = 'tree' })")
admin("box.schema.user.grant('guest', 'read,write,execute', 'space', 'test')")
Expand Down
11 changes: 8 additions & 3 deletions test/box/net.box_iproto_id.result
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ c = net.connect(box.cfg.listen)
| ...
c.peer_protocol_version
| ---
| - 5
| - 6
| ...
c.peer_protocol_features
| ---
Expand All @@ -25,6 +25,7 @@ c.peer_protocol_features
| streams: true
| pagination: true
| space_and_index_names: true
| watch_once: true
| ...
c:close()
| ---
Expand Down Expand Up @@ -54,6 +55,7 @@ c.peer_protocol_features
| streams: false
| pagination: false
| space_and_index_names: false
| watch_once: false
| ...
errinj.set('ERRINJ_IPROTO_DISABLE_ID', false)
| ---
Expand Down Expand Up @@ -104,6 +106,7 @@ c.peer_protocol_features
| streams: true
| pagination: true
| space_and_index_names: true
| watch_once: true
| ...
c:close()
| ---
Expand Down Expand Up @@ -149,7 +152,7 @@ c.error -- error
| ...
c.peer_protocol_version
| ---
| - 5
| - 6
| ...
c.peer_protocol_features
| ---
Expand All @@ -159,6 +162,7 @@ c.peer_protocol_features
| streams: true
| pagination: true
| space_and_index_names: true
| watch_once: true
| ...
c:close()
| ---
Expand All @@ -177,7 +181,7 @@ c.error -- error
| ...
c.peer_protocol_version
| ---
| - 5
| - 6
| ...
c.peer_protocol_features
| ---
Expand All @@ -187,6 +191,7 @@ c.peer_protocol_features
| streams: true
| pagination: true
| space_and_index_names: true
| watch_once: true
| ...
c:close()
| ---
Expand Down

0 comments on commit 6dc1433

Please sign in to comment.