diff --git a/aeon_crud.proto b/aeon_crud.proto index 3cdee31..dfc3f4e 100644 --- a/aeon_crud.proto +++ b/aeon_crud.proto @@ -43,6 +43,63 @@ service CRUDService { // Transactionally executes a set of read and write operations. message ExecuteRequest { + // Message queue push section in a transaction request. + message Push { + // Definition of messages pushed to a shard-local queue. + message Message { + // Topic of the message. + string topic = 1; + // Data of the message. + Value data = 2; + // Time to live of the message. + double ttl = 3; + } + // The source code of the Lua function that will be used to generate + // messages. It's optional: if omitted, messages from the `messages` + // field will be sent instead. + // + // The function is passed three arguments: the resulting read set, + // the resulting write set, and an optional additional argument specified + // as `func_arg`. If the function raises an error, the error will be + // returned in the `push_err` field. The transaction will not be affected. + // + // The function must return an array of messages. Message is an array + // of `{topic, data, ttl}`, where `topic` is required and is a string + // value, `data` is required and is of any supported value type, + // and `ttl` is optional and represents a number of seconds. + // + // A read/write operation is passed in an array: {space, key, tuple}. + // (without string key names). + // + // Below is an example of a Lua function that returns messages + // with `read_topic` as topic and the first fields of the read tuples + // as data if the optional argument is `true`, and `write_topic` + // as topic and the first fields of the written tuples as data otherwise. + // + // function(rs, ws, arg) + // local messages = {} + // if arg == true then + // for _, r in ipairs(rs) do + // if r.tuple ~= nil then + // table.insert(messages, {'read_topic', r.tuple[1]}) + // end + // end + // else + // for _, w in ipairs(ws) do + // if w.tuple ~= nil then + // table.insert(messages, {'write_topic', w.tuple[1]}) + // end + // end + // end + // return messages + // end + // + string func = 1; + // Additional argument to the push function. Optional. + Value func_arg = 2; + // Messages to send if the push function is not provided. + repeated Message messages = 3; + } // Array of read operations. repeated Operation read_set = 1; // Array of write operations. @@ -89,6 +146,8 @@ message ExecuteRequest { // Map : space name -> tuple format. // Contains formats of all provided tuples. Optional. map tuple_formats = 6; + // Description of messages to push when executing a transaction. + Push push = 7; } message ExecuteResponse { @@ -103,6 +162,9 @@ message ExecuteResponse { // Map : space name -> tuple format. // Contains formats of all returned tuples. map tuple_formats = 5; + // PushErr is the error returned by the push function, or nil + // if no error occurred. + Error push_err = 6; } // Transactionally inserts tuples into a space. diff --git a/aeon_queue.proto b/aeon_queue.proto new file mode 100644 index 0000000..b1d3b65 --- /dev/null +++ b/aeon_queue.proto @@ -0,0 +1,112 @@ +syntax = "proto3"; + +import "aeon_error.proto"; +import "aeon_value.proto"; + +package aeon; + +// Queue API to Aeon - a distributed database based on Tarantool. +service QueueService { + // Takes messages from a shard-local queue. The messages should be + // released after processing. Taken messages are not available + // to other consumers until they are released. If a consumer attempts + // to take messages again without releasing a previously accepted batch, + // the router will attempt to return the same batch of messages, although + // this is not guaranteed. If a consumer takes messages exclusively, + // no other consumer will be able to accept messages from the same shard + // with the same topic until the taken messages are released. + rpc TakeMessages(TakeMessagesRequest) returns (TakeMessagesResponse) {} + + // Releases messages. If messages are released with the `done` flag set, + // they will not be available to consumers subsequently. Otherwise, + // messages may be re-taken by this consumer or taken by other consumers. + rpc ReleaseMessages(ReleaseMessagesRequest) + returns (ReleaseMessagesResponse) {} + + // Returns the oldest message for all storages, or the oldest message + // for each storage. + rpc GetOldestMessages(GetOldestMessagesRequest) + returns (GetOldestMessagesResponse) {} +} + +// Description of returned messages. +message Message { + // The shard where the message was taken from. + string shard = 1; + // The serial number of the message on the shard. + uint64 lsn = 2; + // Data of the message. + Value data = 3; +} + +// Consumer description. +message ConsumerRef { + // The shard where the consumer took messages from and where + // it was registered. + string shard = 1; + // Topic of taken messages. + string topic = 2; + // Consumer name. + string consumer = 3; +} + +message TakeMessagesRequest { + // Topic name. + string topic = 1; + // The unique name of the consumer that takes messages for processing. + // This name will be used to return a reference to the consumer, + // which should be used in `ReleaseMessages` to release taken messages. + string consumer = 2; + // Max number of returned messages. + uint64 limit = 3; + // Time for the consumer to process the messages. After this time, + // messages will be released with the status `undone`. + double ttl = 4; + // Exclusive mode flag. If set, no other consumer can take messages from + // the same topic on the same shard. + bool exclusive = 5; + // Time to wait for messages. + double timeout = 6; +} + +message TakeMessagesResponse { + // Error information. Set only on failure. + Error error = 1; + // Returned messages. + repeated Message messages = 2; + // Consumer reference used to release messages. + ConsumerRef ref = 3; + // True if these messages have already been taken by the same consumer, + // false otherwise. + bool taken_earlier = 4; +} + +message ReleaseMessagesRequest { + // Consumer reference. Should be the same as returned by `TakeMessages`. + ConsumerRef ref = 1; + // If true, released messages can no longer be taken by consumers. + // Otherwise, the released messages may be taken again. + bool done = 2; +} + +message ReleaseMessagesResponse { + // Error information. Set only on failure. + Error error = 1; + // True if messages were already released, false otherwise. + bool released_earlier = 2; +} + +message GetOldestMessagesRequest { + // Topic name. + string topic = 1; + // True if the oldest messages for each shard should be returned, + // false if the oldest messages for all shards should be returned. + bool for_each_shard = 2; +} + +message GetOldestMessagesResponse { + // Error information. Set only on failure. + Error error = 1; + // Returned messages. + repeated Message messages = 2; +}