Skip to content

Commit

Permalink
box: export box_session_push to the public C API
Browse files Browse the repository at this point in the history
API is different from box.session.push() - sync argument was
removed. It will disappear from Lua API as well, because it just
is not needed here. Session is omitted as well. Indeed, a user
can't push to a foreign session, and the current session can be
obtained inside box_session_push(). And anyway session is not in
the public C API.

Internally dump into iproto is done using obuf_dup(), just like
tuple_to_obuf() does. obuf_alloc() would be a bad call here,
because it wouldn't be able to split the pushed data into several
obuf chunks, and would cause obuf fragmentation.

Dump into plain text behaves just like a Lua push - it produces a
YAML formatted text or Lua text depending on output format. But to
turn MessagePack into YAML or Lua text an intermediate Lua
representation is used, because there are no a MessagePack -> YAML
and MessagePack -> Lua text translators yet.

Closes #4734

@TarantoolBot document
Title: box_session_push() C API

There is a new function in the public C API:
```C
    int
    box_session_push(const char *data, const char *data_end);
```

It takes raw MessagePack, and behaves just like Lua
`box.session.push()`.
  • Loading branch information
Gerold103 authored and kyukhin committed Apr 14, 2020
1 parent ce8c6ef commit 5d79552
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 10 deletions.
1 change: 1 addition & 0 deletions extra/exports
Expand Up @@ -223,6 +223,7 @@ box_sequence_next
box_sequence_current
box_sequence_set
box_sequence_reset
box_session_push
box_index_iterator
box_iterator_next
box_iterator_free
Expand Down
14 changes: 14 additions & 0 deletions src/box/box.cc
Expand Up @@ -1455,6 +1455,20 @@ box_sequence_reset(uint32_t seq_id)
return sequence_data_delete(seq_id);
}

int
box_session_push(const char *data, const char *data_end)
{
struct session *session = current_session();
if (session == NULL)
return -1;
struct port_msgpack port;
struct port *base = (struct port *)&port;
port_msgpack_create(base, data, data_end - data);
int rc = session_push(session, session_sync(session), base);
port_msgpack_destroy(base);
return rc;
}

static inline void
box_register_replica(uint32_t id, const struct tt_uuid *uuid)
{
Expand Down
14 changes: 14 additions & 0 deletions src/box/box.h
Expand Up @@ -457,6 +457,20 @@ box_sequence_set(uint32_t seq_id, int64_t value);
API_EXPORT int
box_sequence_reset(uint32_t seq_id);

/**
* Push MessagePack data into a session data channel - socket,
* console or whatever is behind the session. Note, that
* successful push does not guarantee delivery in case it was sent
* into the network. Just like with write()/send() system calls.
*
* \param data begin of MessagePack to push
* \param data_end end of MessagePack to push
* \retval -1 on error (check box_error_last())
* \retval 0 on success
*/
API_EXPORT int
box_session_push(const char *data, const char *data_end);

/** \endcond public */

/**
Expand Down
44 changes: 41 additions & 3 deletions src/box/call.c
Expand Up @@ -64,17 +64,55 @@ port_msgpack_get_msgpack(struct port *base, uint32_t *size)
return port->data;
}

static int
port_msgpack_dump_msgpack(struct port *base, struct obuf *out)
{
struct port_msgpack *port = (struct port_msgpack *)base;
assert(port->vtab == &port_msgpack_vtab);
size_t size = port->data_sz;
if (obuf_dup(out, port->data, size) == size)
return 0;
diag_set(OutOfMemory, size, "obuf_dup", "port->data");
return -1;
}

extern void
port_msgpack_dump_lua(struct port *base, struct lua_State *L, bool is_flat);

extern const char *
port_msgpack_dump_plain(struct port *base, uint32_t *size);

void
port_msgpack_destroy(struct port *base)
{
struct port_msgpack *port = (struct port_msgpack *)base;
assert(port->vtab == &port_msgpack_vtab);
free(port->plain);
}

int
port_msgpack_set_plain(struct port *base, const char *plain, uint32_t len)
{
struct port_msgpack *port = (struct port_msgpack *)base;
assert(port->plain == NULL);
port->plain = (char *)malloc(len + 1);
if (port->plain == NULL) {
diag_set(OutOfMemory, len + 1, "malloc", "port->plain");
return -1;
}
memcpy(port->plain, plain, len);
port->plain[len] = 0;
return 0;
}

static const struct port_vtab port_msgpack_vtab = {
.dump_msgpack = NULL,
.dump_msgpack = port_msgpack_dump_msgpack,
.dump_msgpack_16 = NULL,
.dump_lua = port_msgpack_dump_lua,
.dump_plain = NULL,
.dump_plain = port_msgpack_dump_plain,
.get_msgpack = port_msgpack_get_msgpack,
.get_vdbemem = NULL,
.destroy = NULL,
.destroy = port_msgpack_destroy,
};

int
Expand Down
85 changes: 78 additions & 7 deletions src/box/lua/console.c
Expand Up @@ -37,6 +37,7 @@
#include "lua/fiber.h"
#include "fiber.h"
#include "coio.h"
#include "lua/msgpack.h"
#include "lua-yaml/lyaml.h"
#include <lua.h>
#include <lauxlib.h>
Expand Down Expand Up @@ -390,19 +391,17 @@ console_set_output_format(enum output_format output_format)
}

/**
* Dump port lua data with respect to output format:
* Dump Lua data to text with respect to output format:
* YAML document tagged with !push! global tag or Lua string.
* @param port Port lua.
* @param L Lua state.
* @param[out] size Size of the result.
*
* @retval not NULL Tagged YAML document.
* @retval not NULL Tagged YAML document or Lua text.
* @retval NULL Error.
*/
const char *
port_lua_dump_plain(struct port *port, uint32_t *size)
static const char *
console_dump_plain(struct lua_State *L, uint32_t *size)
{
struct port_lua *port_lua = (struct port_lua *) port;
struct lua_State *L = port_lua->L;
enum output_format fmt = console_get_output_format();
if (fmt == OUTPUT_FORMAT_YAML) {
int rc = lua_yaml_encode(L, luaL_yaml_default, "!push!",
Expand Down Expand Up @@ -435,6 +434,78 @@ port_lua_dump_plain(struct port *port, uint32_t *size)
return result;
}

/** Plain text converter for port Lua data. */
const char *
port_lua_dump_plain(struct port *base, uint32_t *size)
{
return console_dump_plain(((struct port_lua *)base)->L, size);
}

/**
* A helper for port_msgpack_dump_plain() to execute it safely
* regarding Lua errors.
*/
static int
port_msgpack_dump_plain_via_lua(struct lua_State *L)
{
void **ctx = (void **)lua_touserdata(L, 1);
struct port_msgpack *port = (struct port_msgpack *)ctx[0];
uint32_t *size = (uint32_t *)ctx[1];
const char *data = port->data;
/*
* Need to pop, because YAML decoder will consume all what
* can be found on the stack.
*/
lua_pop(L, 1);
/*
* MessagePack -> Lua object -> YAML/Lua text. The middle
* is not really needed here, but there is no
* MessagePack -> YAML encoder yet. Neither
* MessagePack -> Lua text.
*/
luamp_decode(L, luaL_msgpack_default, &data);
data = console_dump_plain(L, size);
if (data == NULL) {
assert(port->plain == NULL);
} else {
/*
* Result is ignored, because in case of an error
* port->plain will stay NULL. And it will be
* returned by port_msgpack_dump_plain() as is.
*/
port_msgpack_set_plain((struct port *)port, data, *size);
}
return 0;
}

/** Plain text converter for raw MessagePack. */
const char *
port_msgpack_dump_plain(struct port *base, uint32_t *size)
{
struct lua_State *L = tarantool_L;
void *ctx[2] = {(void *)base, (void *)size};
/*
* lua_cpcall() protects from errors thrown from Lua which
* may break a caller, not knowing about Lua and not
* expecting any exceptions.
*/
if (lua_cpcall(L, port_msgpack_dump_plain_via_lua, ctx) != 0) {
/*
* Error string is pushed in case it was a Lua
* error.
*/
assert(lua_isstring(L, -1));
diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1));
lua_pop(L, 1);
return NULL;
}
/*
* If there was an error, port->plain stayed NULL with
* installed diag.
*/
return ((struct port_msgpack *)base)->plain;
}

/**
* Push a tagged YAML document or a Lua string into a console
* socket.
Expand Down
15 changes: 15 additions & 0 deletions src/box/port.h
Expand Up @@ -86,6 +86,11 @@ struct port_msgpack {
const struct port_vtab *vtab;
const char *data;
uint32_t data_sz;
/**
* Buffer for dump_plain() function. It is created during
* dump on demand and is deleted together with the port.
*/
char *plain;
};

static_assert(sizeof(struct port_msgpack) <= sizeof(struct port),
Expand All @@ -95,6 +100,16 @@ static_assert(sizeof(struct port_msgpack) <= sizeof(struct port),
void
port_msgpack_create(struct port *port, const char *data, uint32_t data_sz);

/** Destroy a MessagePack port. */
void
port_msgpack_destroy(struct port *base);

/**
* Set plain text version of data in the given port. It is copied.
*/
int
port_msgpack_set_plain(struct port *base, const char *plain, uint32_t len);

/** Port for storing the result of a Lua CALL/EVAL. */
struct port_lua {
const struct port_vtab *vtab;
Expand Down
7 changes: 7 additions & 0 deletions test/box/function1.c
Expand Up @@ -245,3 +245,10 @@ test_sleep(box_function_ctx_t *ctx, const char *args, const char *args_end)
fiber_sleep(0);
return 0;
}

int
test_push(box_function_ctx_t *ctx, const char *args, const char *args_end)
{
(void)ctx;
return box_session_push(args, args_end);
}
121 changes: 121 additions & 0 deletions test/box/push.result
Expand Up @@ -563,3 +563,124 @@ box.schema.func.drop('do_long_and_push')
box.session.on_disconnect(nil, on_disconnect)
---
...
--
-- gh-4734: C API for session push.
--
build_path = os.getenv("BUILDDIR")
---
...
old_cpath = package.cpath
---
...
package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath
---
...
box.schema.func.create('function1.test_push', {language = 'C'})
---
...
box.schema.user.grant('guest', 'super')
---
...
c = netbox.connect(box.cfg.listen)
---
...
messages = {}
---
...
c:call('function1.test_push', \
{1, 2, 3}, \
{on_push = table.insert, \
on_push_ctx = messages})
---
- []
...
messages
---
- - [1, 2, 3]
...
c:close()
---
...
--
-- C can push to the console.
--
-- A string having 0 byte inside. Check that it is handled fine.
s = '\x41\x00\x43'
---
...
console = require('console')
---
...
fio = require('fio')
---
...
socket = require('socket')
---
...
sock_path = fio.pathjoin(fio.cwd(), 'console.sock')
---
...
_ = fio.unlink(sock_path)
---
...
server = console.listen(sock_path)
---
...
client = socket.tcp_connect('unix/', sock_path)
---
...
_ = client:read({chunk = 128})
---
...
_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n")
---
...
client:read("\n...\n")
---
- '%TAG !push! tag:tarantool.io/push,2018

--- [1, 2, 3, "A\0C"]

...

'
...
_ = client:read("\n...\n")
---
...
-- Lua output format is supported too.
_ = client:write("\\set output lua\n")
---
...
_ = client:read(";")
---
...
_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n")
---
...
client:read(";")
---
- '-- Push

{1, 2, 3, "A\0C"};'
...
_ = client:read(";")
---
...
client:close()
---
- true
...
server:close()
---
- true
...
box.schema.user.revoke('guest', 'super')
---
...
box.schema.func.drop('function1.test_push')
---
...
package.cpath = old_cpath
---
...

0 comments on commit 5d79552

Please sign in to comment.