Skip to content

Commit

Permalink
router: wrap is_async futures completely
Browse files Browse the repository at this point in the history
Router used to override :result() method of netbox futures. It is
needed because user functions are called via vshard.storage.call()
which returns some metadata - it must be truncated before
returning the user's data.

It worked fine while netbox futures were implemented as tables.
But in the newest Tarantool most of netbox state machine code is
moved into C. The futures now are C structs.

They allow to add new members, but can't override their methods.
As a result, on the newest Tarantool is_async in
vshard.router.call() simply didn't work.

The patch wraps netbox futures completely with a Lua table, not
just overrides one method. Now it works the same on all Tarantool
versions starting from 1.10.

Closes #294
  • Loading branch information
Gerold103 committed Sep 30, 2021
1 parent 2e50d4e commit 6f7d0a5
Show file tree
Hide file tree
Showing 4 changed files with 542 additions and 38 deletions.
11 changes: 11 additions & 0 deletions test/lua_libs/storage_template.lua
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ function bootstrap_storage(engine)
box.schema.role.grant('public', 'execute', 'function', 'raise_client_error')
box.schema.func.create('do_push')
box.schema.role.grant('public', 'execute', 'function', 'do_push')
box.schema.func.create('do_push_wait')
box.schema.role.grant('public', 'execute', 'function', 'do_push_wait')
box.snapshot()
end)
end
Expand Down Expand Up @@ -152,6 +154,15 @@ function do_push(push, retval)
return retval
end

is_push_wait_blocked = true
function do_push_wait(push, retval_arr)
box.session.push(push)
while is_push_wait_blocked do
fiber.sleep(0.001)
end
return unpack(retval_arr)
end

--
-- Wait a specified log message.
-- Requirements:
Expand Down
301 changes: 300 additions & 1 deletion test/router/router.result
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ messages
- - 100
...
--
-- gh-171: support is_async.
-- gh-171, gh-294: support is_async.
--
future = vshard.router.callro(bucket_id, 'space_get', {'test', {1}}, {is_async = true})
---
Expand All @@ -632,6 +632,11 @@ future = vshard.router.callrw(bucket_id, 'raise_client_error', {}, {is_async = t
res, err = future:wait_result()
---
...
-- VShard wraps all errors.
assert(type(err) == 'table')
---
- true
...
util.portable_error(err)
---
- type: ClientError
Expand Down Expand Up @@ -690,6 +695,300 @@ future:wait_result()
- [[1, 1]]
...
--
-- Error as a result of discard.
--
future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {20}}, \
{is_async = true})
---
...
future:discard()
---
...
res, err = future:result()
---
...
assert(not res and err.message:match('discarded') ~= nil)
---
- true
...
assert(type(err) == 'table')
---
- true
...
res, err = future:wait_result()
---
...
assert(not res and err.message:match('discarded') ~= nil)
---
- true
...
assert(type(err) == 'table')
---
- true
...
--
-- See how pairs behaves when the final result is not immediately ready.
--
future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {20}}, \
{is_async = true})
---
...
assert(not future:is_ready())
---
- true
...
-- Get the push successfully.
func, iter, i = future:pairs()
---
...
i, res = func(iter, i)
---
...
assert(i == 1)
---
- true
...
assert(res == 10)
---
- true
...
-- Fail to get the final result during the timeout. It is supposed to test how
-- the router knows which result is final and which is just a push. Even before
-- the request ends.
func, iter, i = future:pairs(0.001)
---
...
i, res = func(iter, i)
---
...
i, res = func(iter, i)
---
...
assert(not i and res.code == box.error.TIMEOUT)
---
- true
...
assert(type(res) == 'table')
---
- true
...
res, err = future:wait_result(0.001)
---
...
assert(not res and err.code == box.error.TIMEOUT)
---
- true
...
assert(type(err) == 'table')
---
- true
...
test_run:switch('storage_1_a')
---
- true
...
is_push_wait_blocked = false
---
...
test_run:switch('storage_2_a')
---
- true
...
is_push_wait_blocked = false
---
...
test_run:switch('router_1')
---
- true
...
func, iter, i = future:pairs()
---
...
i, res = func(iter, i)
---
...
assert(i == 1)
---
- true
...
assert(res == 10)
---
- true
...
i, res = func(iter, i)
---
...
assert(i == 2)
---
- true
...
assert(res[1] == 20 and not res[2])
---
- true
...
assert(future:is_ready())
---
- true
...
i, res = func(iter, i)
---
...
assert(not i)
---
- true
...
assert(not res)
---
- true
...
-- Repeat the same to ensure it returns the same.
i, res = func(iter, 1)
---
...
assert(i == 2)
---
- true
...
assert(res[1] == 20 and not res[2])
---
- true
...
-- Non-pairs functions return correctly unpacked successful results.
res, err = future:wait_result()
---
...
assert(res[1] == 20 and not res[2] and not err)
---
- true
...
res, err = future:result()
---
...
assert(res[1] == 20 and not res[2] and not err)
---
- true
...
-- Return 2 nils - shouldn't be treated as an error.
future = vshard.router.callrw(bucket_id, 'do_push_wait', \
{10, {nil, nil}}, {is_async = true})
---
...
res, err = future:wait_result()
---
...
assert(res[1] == nil and res[2] == nil and not err)
---
- true
...
res, err = future:result()
---
...
assert(res[1] == nil and res[2] == nil and not err)
---
- true
...
func, iter, i = future:pairs()
---
...
i, res = func(iter, i)
---
...
i, res = func(iter, i)
---
...
assert(res[1] == nil and res[2] == nil and not err)
---
- true
...
-- Serialize and tostring.
future
---
- []
...
future.key = 'value'
---
...
future
---
- key: value
...
tostring(future)
---
- vshard.net.box.request
...
--
-- The same, but the push function returns an error.
--
future = vshard.router.callrw(bucket_id, 'do_push_wait', {10, {nil, 'err'}}, \
{is_async = true})
---
...
func, iter, i = future:pairs()
---
...
i, res = func(iter, i)
---
...
assert(i == 1)
---
- true
...
assert(res == 10)
---
- true
...
i, res = func(iter, i)
---
...
-- This test is for the sake of checking how the async request handles nil,err
-- result.
assert(i == 2)
---
- true
...
assert(not res[1] and res[2].message == 'err')
---
- true
...
assert(type(res[2]) == 'table')
---
- true
...
i, res = func(iter, i)
---
...
assert(not i)
---
- true
...
assert(not res)
---
- true
...
-- Non-pairs getting of an error.
res, err = future:wait_result()
---
...
assert(not res and err.message == 'err')
---
- true
...
assert(type(err) == 'table')
---
- true
...
res, err = future:result()
---
...
assert(not res and err.message == 'err')
---
- true
...
assert(type(err) == 'table')
---
- true
...
--
-- Test errors from router call.
--
new_bid = vshard.consts.DEFAULT_BUCKET_COUNT + 1
Expand Down

0 comments on commit 6f7d0a5

Please sign in to comment.