Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 58 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
<a href="http://tarantool.org">
<img src="https://avatars2.githubusercontent.com/u/2344919?v=2&s=250"
<img src="https://avatars2.githubusercontent.com/u/2344919?v=2&s=250"
align="right">
</a>
<a href="https://travis-ci.org/tarantool/queue">
<img src="https://travis-ci.org/tarantool/queue.png?branch=master"
<img src="https://travis-ci.org/tarantool/queue.png?branch=master"
align="right">
</a>

Expand Down Expand Up @@ -45,8 +45,8 @@ align="right">
Features:

* If there is only one consumer, tasks are scheduled in strict FIFO order.
* If there are many concurrent consumers, FIFO order is preserved on
average, but is less strict: concurrent consumers may complete tasks in
* If there are many concurrent consumers, FIFO order is preserved on
average, but is less strict: concurrent consumers may complete tasks in
a different order.

The following options can be specified when creating a `fifo` queue:
Expand All @@ -63,7 +63,7 @@ operation; the expected function syntax is `function(task, stats_data)`, where
* task priorities (`pri`)
* task time to live (`ttl`)
* task time to execute (`ttr`)
* delayed execution (`delay`)
* delayed execution (`delay`)

Example:

Expand Down Expand Up @@ -97,12 +97,12 @@ operation

The following options can be specified when putting a task in a `fifottl` queue:
* `pri` - task priority (`0` is the highest priority and is the default)
* `ttl` - numeric - time to live for a task put into the queue, in
* `ttl` - numeric - time to live for a task put into the queue, in
seconds. if `ttl` is not specified, it is set to infinity
(if a task exists in a queue for longer than ttl seconds, it is removed)
* `ttr` - numeric - time allotted to the worker to work on a task, in
* `ttr` - numeric - time allotted to the worker to work on a task, in
seconds; if `ttr` is not specified, it is set to the same as `ttl`
(if a task is being worked on for more than `ttr` seconds, its status
(if a task is being worked on for more than `ttr` seconds, its status
is changed to 'ready' so another worker may take it)
* `delay` - time to wait before starting to execute the task, in seconds

Expand All @@ -115,17 +115,17 @@ queue.tube.tube_name:put('my_task_data', { ttl = 60.1, delay = 80 })

```

In the example above, the task has 60.1 seconds to live, but the start
of execution is delayed for 80 seconds. Thus the task actually will
In the example above, the task has 60.1 seconds to live, but the start
of execution is delayed for 80 seconds. Thus the task actually will
exist for up to (60.1 + 80) 140.1 seconds.

A smaller priority value indicates a higher priority, so a task with
priority 1 will be executed after a task with priority 0, if all other
A smaller priority value indicates a higher priority, so a task with
priority 1 will be executed after a task with priority 0, if all other
options are equal.

## `utube` - a queue with sub-queues inside

The main idea of this queue backend is the same as in a `fifo` queue:
The main idea of this queue backend is the same as in a `fifo` queue:
the tasks are executed in FIFO order.
However, tasks may be grouped into sub-queues.

Expand All @@ -137,12 +137,12 @@ already exists
* `on_task_change` - function name - a callback to be executed on every
operation

The following options can be specified when putting a task in a `utube`
The following options can be specified when putting a task in a `utube`
queue:
* `utube` - the name of the sub-queue.
Sub-queues split the task stream according to the sub-queue name: it is
Sub-queues split the task stream according to the sub-queue name: it is
not possible to take two tasks
out of a sub-queue concurrently, each sub-queue is executed in strict
out of a sub-queue concurrently, each sub-queue is executed in strict
FIFO order, one task at a time.

`utube` queue does not support:
Expand All @@ -153,9 +153,9 @@ FIFO order, one task at a time.

Example:

Imagine a web crawler, fetching pages from the Internet and finding URLs
Imagine a web crawler, fetching pages from the Internet and finding URLs
to fetch more pages.
The web crawler is based on a queue, and each task in the queue refers
The web crawler is based on a queue, and each task in the queue refers
to a URL which the web crawler must download and process.
If the web crawler is split into many worker processes, then the same URL may
show up in the queue many times, because a single URL may be referred to by many
Expand All @@ -180,11 +180,17 @@ already exists
* `on_task_change` - function name - a callback to be executed on every
operation

The following options can be specified when putting a task in a
The following options can be specified when putting a task in a
`utubettl` queue:
* `utube` - the name of the sub-queue
* `ttl` - time to live for a task put into the queue; if `ttl` is not
specified, it is set to infinity
* `ttl` - numeric - time to live for a task put into the queue, in
seconds. if `ttl` is not specified, it is set to infinity
(if a task exists in a queue for longer than ttl seconds, it is removed)
* `ttr` - numeric - time allotted to the worker to work on a task, in
seconds; if `ttr` is not specified, it is set to the same as `ttl`
(if a task is being worked on for more than `ttr` seconds, its status
is changed to 'ready' so another worker may take it)
* `delay` - time to wait before starting to execute the task, in seconds

# The underlying spaces

Expand All @@ -193,38 +199,38 @@ Here is how queues map to spaces in a Tarantool database.

The `_queue` space contains tuples for each queue and its properties.
This space is created automatically when the queue system is initialized for
the first time (for example, by "require 'queue'"), and is re-used on
the first time (for example, by "require 'queue'"), and is re-used on
later occasions.

## Fields of the `_queue` space

1. `tube` - the name of the queue
1. `tube_id` - queue ID, numeric
1. `space` - the name of a space associated with the queue, which
1. `space` - the name of a space associated with the queue, which
contains one tuple for each queue task
1. `type` - the queue type ('fifo', 'fifottl', 'utube', 'utubettl')
1. `opts` - additional options supplied when creating the queue, for
1. `opts` - additional options supplied when creating the queue, for
example 'ttl'

The `_queue_consumers` temporary space contains tuples for each job
The `_queue_consumers` temporary space contains tuples for each job
which is working on a queue.
Consumers may be simply waiting for tasks to be put in the queues.

## Fields of the `_queue_consumers` space

1. `session` - session (connection) ID of the client
1. `fid` - client fiber ID
1. `tube_id` - queue ID, referring to the `tube_id` field in the `_queue`
1. `tube_id` - queue ID, referring to the `tube_id` field in the `_queue`
space; the client waits for tasks in this queue
1. `timeout` - the client wait timeout
1. `time` - the time when the client took a task

The `_queue_taken` temporary space contains tuples for each job which is
The `_queue_taken` temporary space contains tuples for each job which is
processing a task in the queue.

## Fields of the `_queue_taken` space

1. `session` - session (connection) ID of the client, referring to the
1. `session` - session (connection) ID of the client, referring to the
`session_id` field of the `_queue_consumers` space
1. `tube_id` - queue ID, to which the task belongs
1. `task_id` - task ID (of the task being taken)
Expand All @@ -239,11 +245,11 @@ The associated space contains one tuple for each task.
1. task_id - numeric - see below
2. task_state - 'r' for ready, 't' for taken, etc. - see below
3. task_data - the contents of the task, usually a long string
x. (additional fields if the queue type has options for ttl, priority,
x. (additional fields if the queue type has options for ttl, priority,
or delay)

The `task_id` value is assigned to a task when it's inserted into a queue.
Currently, `task_id` values are simple integers for `fifo` and `fifottl`
Currently, `task_id` values are simple integers for `fifo` and `fifottl`
queues.

The `task_state` field takes one of the following values
Expand All @@ -253,7 +259,7 @@ sets of `task_state` values, so this is a superset):
* 'r' - the task is ready for execution (the first consumer executing
a `take` request will get it)
* 't' - the task has been taken by a consumer
* '-' - the task has been executed (a task is removed from the queue
* '-' - the task has been executed (a task is removed from the queue
after it
has been executed, so this value will rarely be seen)
* '!' - the task is buried (disabled temporarily until further changes)
Expand All @@ -262,10 +268,10 @@ after it
# Installing

There are three alternative ways of installation.
* Get the `tarantool_queue` package from a repository. For example, on
* Get the `tarantool_queue` package from a repository. For example, on
Ubuntu, say: sudo apt-get install tarantool-queue
* Take the Lua rock from rocks.tarantool.org.
* Take the source files from https://github.com/tarantool/queue, then
* Take the source files from https://github.com/tarantool/queue, then
build and install.

# Using the `queue` module
Expand Down Expand Up @@ -315,9 +321,9 @@ The `tube_name` must be the name which was specified by `queue.create_tube`.
The `task_data` contents are the user-defined description of the task,
usually a long string.

The options, if specified, must be one or more of the options described
The options, if specified, must be one or more of the options described
above
(`ttl` and/or `ttr` and/or `pri` and/or `delay` and/or `utube`, depending on the queue
(`ttl` and/or `ttr` and/or `pri` and/or `delay` and/or `utube`, depending on the queue
type).
If an option is not specified, the default is what was specified during
`queue.create_tube`, and if that was not specified, then the default is what
Expand All @@ -332,7 +338,7 @@ task_data = whatever the user put in the `task_data` parameter
Returns: the value of the new tuple in the queue's associated space,
also called the "created task".

Example: queue.tube.list_of_sites:put('Your task is to do something',
Example: queue.tube.list_of_sites:put('Your task is to do something',
{pri=2})

After a task has been put in a queue, one of these things may happen:
Expand Down Expand Up @@ -368,6 +374,22 @@ instructions for what to do with the task.

Example: t_value = queue.tube.list_of_sites:take(15)

## Increasing TTR and/or TTL for tasks

```lua
queue.tube.tube_name:touch(task_id, increment)
```

Increase `ttr` of running task. Useful if you can't predict in advance
time needed to work on task.

Effect: the value of `ttr` and `ttl` increased by `increment` seconds. If queue
does not support ttr, error will be thrown. If `increment` is lower than zero,
error will be thrown. If `increment` is zero effect is no op. If `increment` is
`nil` it will be equal to current `ttr` of task. If current `ttr` of task is
500 years or greater then operation is noop.

Example: t_value = queue.tube.list_of_sites:touch(15, 60)

## Acknowledging the completion of a task

Expand Down
22 changes: 21 additions & 1 deletion queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ function tube.take(self, timeout)
end
end

function tube.touch(self, id, increment)
if increment < 0 then
error("Increment can't be less than zero")
end

if increment == 0 then
return
end

local task = self:peek(id)
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
if _taken == nil then
error("Task was not taken in the session")
end

queue.stat[space_name]:inc('touch')
return self.raw:normalize_task(self.raw:touch(id, increment))
end

function tube.ack(self, id)
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
if _taken == nil then
Expand Down Expand Up @@ -397,7 +416,8 @@ local function build_stats(space)
kick = 0,
put = 0,
release = 0,
take = 0
take = 0,
touch = 0
}}

local st = rawget(queue.stat, space) or {}
Expand Down
5 changes: 5 additions & 0 deletions queue/abstract/driver/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ function method.take(self)
end
end

-- touch task
function method.touch(self, id, ttr)
error('fifo queue does not support touch')
end

-- delete task
function method.delete(self, id)
local task = self.space:delete(id)
Expand Down
47 changes: 39 additions & 8 deletions queue/abstract/driver/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,50 @@ function method.put(self, data, opts)
end

local task = self.space:insert{
id,
status,
next_event,
time(ttl),
time(ttr),
pri,
time(),
data
id,
status,
next_event,
time(ttl),
time(ttr),
pri,
time(),
data
}
self:on_task_change(task, 'put')
return task
end

local TIMEOUT_INFINITY_TIME = time(TIMEOUT_INFINITY)

-- touch task
function method.touch(self, id, increment_seconds)
if increment_seconds < 0 then
error("Increment can't be less than zero")
elseif increment_seconds > TIMEOUT_INFINITY then
increment_seconds = TIMEOUT_INFINITY
end

local task = self:peek{id}
if increment_seconds == 0 or task[i_ttr] >= TIMEOUT_INFINITY_TIME then
return task
end

local increment = 0ULL
if increment_seconds == nil then
increment = task[i_ttr] or task[i_ttr]
else
increment = time(increment_seconds)
end

task = self.space:update{
id,
{{i_ttl, '+', increment}},
{{i_ttr, '+', increment}}
}

self:on_task_change(task, 'touch')
return task
end

-- take task
function method.take(self)
Expand Down
5 changes: 5 additions & 0 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ function method.take(self)
end
end

-- touch task
function method.touch(self, id, ttr)
error('utube queue does not support touch')
end

-- delete task
function method.delete(self, id)
local task = self.space:delete(id)
Expand Down
Loading