diff --git a/README.md b/README.md
index 3f32885b..6ee94422 100644
--- a/README.md
+++ b/README.md
@@ -1,9 +1,9 @@
-
-
@@ -45,8 +45,8 @@ align="right">
Features:
* If there is only one consumer, tasks are scheduled in strict FIFO order.
-* If there are many concurrent consumers, FIFO order is preserved on
-average, but is less strict: concurrent consumers may complete tasks in
+* If there are many concurrent consumers, FIFO order is preserved on
+average, but is less strict: concurrent consumers may complete tasks in
a different order.
The following options can be specified when creating a `fifo` queue:
@@ -63,7 +63,7 @@ operation; the expected function syntax is `function(task, stats_data)`, where
* task priorities (`pri`)
* task time to live (`ttl`)
* task time to execute (`ttr`)
- * delayed execution (`delay`)
+ * delayed execution (`delay`)
Example:
@@ -97,12 +97,12 @@ operation
The following options can be specified when putting a task in a `fifottl` queue:
* `pri` - task priority (`0` is the highest priority and is the default)
- * `ttl` - numeric - time to live for a task put into the queue, in
+ * `ttl` - numeric - time to live for a task put into the queue, in
seconds. if `ttl` is not specified, it is set to infinity
(if a task exists in a queue for longer than ttl seconds, it is removed)
- * `ttr` - numeric - time allotted to the worker to work on a task, in
+ * `ttr` - numeric - time allotted to the worker to work on a task, in
seconds; if `ttr` is not specified, it is set to the same as `ttl`
- (if a task is being worked on for more than `ttr` seconds, its status
+ (if a task is being worked on for more than `ttr` seconds, its status
is changed to 'ready' so another worker may take it)
* `delay` - time to wait before starting to execute the task, in seconds
@@ -115,17 +115,17 @@ queue.tube.tube_name:put('my_task_data', { ttl = 60.1, delay = 80 })
```
-In the example above, the task has 60.1 seconds to live, but the start
-of execution is delayed for 80 seconds. Thus the task actually will
+In the example above, the task has 60.1 seconds to live, but the start
+of execution is delayed for 80 seconds. Thus the task actually will
exist for up to (60.1 + 80) 140.1 seconds.
-A smaller priority value indicates a higher priority, so a task with
-priority 1 will be executed after a task with priority 0, if all other
+A smaller priority value indicates a higher priority, so a task with
+priority 1 will be executed after a task with priority 0, if all other
options are equal.
## `utube` - a queue with sub-queues inside
-The main idea of this queue backend is the same as in a `fifo` queue:
+The main idea of this queue backend is the same as in a `fifo` queue:
the tasks are executed in FIFO order.
However, tasks may be grouped into sub-queues.
@@ -137,12 +137,12 @@ already exists
* `on_task_change` - function name - a callback to be executed on every
operation
-The following options can be specified when putting a task in a `utube`
+The following options can be specified when putting a task in a `utube`
queue:
* `utube` - the name of the sub-queue.
-Sub-queues split the task stream according to the sub-queue name: it is
+Sub-queues split the task stream according to the sub-queue name: it is
not possible to take two tasks
-out of a sub-queue concurrently, each sub-queue is executed in strict
+out of a sub-queue concurrently, each sub-queue is executed in strict
FIFO order, one task at a time.
`utube` queue does not support:
@@ -153,9 +153,9 @@ FIFO order, one task at a time.
Example:
-Imagine a web crawler, fetching pages from the Internet and finding URLs
+Imagine a web crawler, fetching pages from the Internet and finding URLs
to fetch more pages.
-The web crawler is based on a queue, and each task in the queue refers
+The web crawler is based on a queue, and each task in the queue refers
to a URL which the web crawler must download and process.
If the web crawler is split into many worker processes, then the same URL may
show up in the queue many times, because a single URL may be referred to by many
@@ -180,11 +180,17 @@ already exists
* `on_task_change` - function name - a callback to be executed on every
operation
-The following options can be specified when putting a task in a
+The following options can be specified when putting a task in a
`utubettl` queue:
* `utube` - the name of the sub-queue
- * `ttl` - time to live for a task put into the queue; if `ttl` is not
-specified, it is set to infinity
+ * `ttl` - numeric - time to live for a task put into the queue, in
+seconds. if `ttl` is not specified, it is set to infinity
+ (if a task exists in a queue for longer than ttl seconds, it is removed)
+ * `ttr` - numeric - time allotted to the worker to work on a task, in
+seconds; if `ttr` is not specified, it is set to the same as `ttl`
+ (if a task is being worked on for more than `ttr` seconds, its status
+is changed to 'ready' so another worker may take it)
+ * `delay` - time to wait before starting to execute the task, in seconds
# The underlying spaces
@@ -193,20 +199,20 @@ Here is how queues map to spaces in a Tarantool database.
The `_queue` space contains tuples for each queue and its properties.
This space is created automatically when the queue system is initialized for
-the first time (for example, by "require 'queue'"), and is re-used on
+the first time (for example, by "require 'queue'"), and is re-used on
later occasions.
## Fields of the `_queue` space
1. `tube` - the name of the queue
1. `tube_id` - queue ID, numeric
-1. `space` - the name of a space associated with the queue, which
+1. `space` - the name of a space associated with the queue, which
contains one tuple for each queue task
1. `type` - the queue type ('fifo', 'fifottl', 'utube', 'utubettl')
-1. `opts` - additional options supplied when creating the queue, for
+1. `opts` - additional options supplied when creating the queue, for
example 'ttl'
-The `_queue_consumers` temporary space contains tuples for each job
+The `_queue_consumers` temporary space contains tuples for each job
which is working on a queue.
Consumers may be simply waiting for tasks to be put in the queues.
@@ -214,17 +220,17 @@ Consumers may be simply waiting for tasks to be put in the queues.
1. `session` - session (connection) ID of the client
1. `fid` - client fiber ID
-1. `tube_id` - queue ID, referring to the `tube_id` field in the `_queue`
+1. `tube_id` - queue ID, referring to the `tube_id` field in the `_queue`
space; the client waits for tasks in this queue
1. `timeout` - the client wait timeout
1. `time` - the time when the client took a task
-The `_queue_taken` temporary space contains tuples for each job which is
+The `_queue_taken` temporary space contains tuples for each job which is
processing a task in the queue.
## Fields of the `_queue_taken` space
-1. `session` - session (connection) ID of the client, referring to the
+1. `session` - session (connection) ID of the client, referring to the
`session_id` field of the `_queue_consumers` space
1. `tube_id` - queue ID, to which the task belongs
1. `task_id` - task ID (of the task being taken)
@@ -239,11 +245,11 @@ The associated space contains one tuple for each task.
1. task_id - numeric - see below
2. task_state - 'r' for ready, 't' for taken, etc. - see below
3. task_data - the contents of the task, usually a long string
-x. (additional fields if the queue type has options for ttl, priority,
+x. (additional fields if the queue type has options for ttl, priority,
or delay)
The `task_id` value is assigned to a task when it's inserted into a queue.
-Currently, `task_id` values are simple integers for `fifo` and `fifottl`
+Currently, `task_id` values are simple integers for `fifo` and `fifottl`
queues.
The `task_state` field takes one of the following values
@@ -253,7 +259,7 @@ sets of `task_state` values, so this is a superset):
* 'r' - the task is ready for execution (the first consumer executing
a `take` request will get it)
* 't' - the task has been taken by a consumer
-* '-' - the task has been executed (a task is removed from the queue
+* '-' - the task has been executed (a task is removed from the queue
after it
has been executed, so this value will rarely be seen)
* '!' - the task is buried (disabled temporarily until further changes)
@@ -262,10 +268,10 @@ after it
# Installing
There are three alternative ways of installation.
-* Get the `tarantool_queue` package from a repository. For example, on
+* Get the `tarantool_queue` package from a repository. For example, on
Ubuntu, say: sudo apt-get install tarantool-queue
* Take the Lua rock from rocks.tarantool.org.
-* Take the source files from https://github.com/tarantool/queue, then
+* Take the source files from https://github.com/tarantool/queue, then
build and install.
# Using the `queue` module
@@ -315,9 +321,9 @@ The `tube_name` must be the name which was specified by `queue.create_tube`.
The `task_data` contents are the user-defined description of the task,
usually a long string.
-The options, if specified, must be one or more of the options described
+The options, if specified, must be one or more of the options described
above
-(`ttl` and/or `ttr` and/or `pri` and/or `delay` and/or `utube`, depending on the queue
+(`ttl` and/or `ttr` and/or `pri` and/or `delay` and/or `utube`, depending on the queue
type).
If an option is not specified, the default is what was specified during
`queue.create_tube`, and if that was not specified, then the default is what
@@ -332,7 +338,7 @@ task_data = whatever the user put in the `task_data` parameter
Returns: the value of the new tuple in the queue's associated space,
also called the "created task".
-Example: queue.tube.list_of_sites:put('Your task is to do something',
+Example: queue.tube.list_of_sites:put('Your task is to do something',
{pri=2})
After a task has been put in a queue, one of these things may happen:
@@ -368,6 +374,22 @@ instructions for what to do with the task.
Example: t_value = queue.tube.list_of_sites:take(15)
+## Increasing TTR and/or TTL for tasks
+
+```lua
+queue.tube.tube_name:touch(task_id, increment)
+```
+
+Increase `ttr` of running task. Useful if you can't predict in advance
+time needed to work on task.
+
+Effect: the value of `ttr` and `ttl` increased by `increment` seconds. If queue
+does not support ttr, error will be thrown. If `increment` is lower than zero,
+error will be thrown. If `increment` is zero effect is no op. If `increment` is
+`nil` it will be equal to current `ttr` of task. If current `ttr` of task is
+500 years or greater then operation is noop.
+
+Example: t_value = queue.tube.list_of_sites:touch(15, 60)
## Acknowledging the completion of a task
diff --git a/queue/abstract.lua b/queue/abstract.lua
index 190aa3f7..8214b144 100644
--- a/queue/abstract.lua
+++ b/queue/abstract.lua
@@ -68,6 +68,25 @@ function tube.take(self, timeout)
end
end
+function tube.touch(self, id, increment)
+ if increment < 0 then
+ error("Increment can't be less than zero")
+ end
+
+ if increment == 0 then
+ return
+ end
+
+ local task = self:peek(id)
+ local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
+ if _taken == nil then
+ error("Task was not taken in the session")
+ end
+
+ queue.stat[space_name]:inc('touch')
+ return self.raw:normalize_task(self.raw:touch(id, increment))
+end
+
function tube.ack(self, id)
local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id}
if _taken == nil then
@@ -397,7 +416,8 @@ local function build_stats(space)
kick = 0,
put = 0,
release = 0,
- take = 0
+ take = 0,
+ touch = 0
}}
local st = rawget(queue.stat, space) or {}
diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua
index c23b8db4..8692a26e 100644
--- a/queue/abstract/driver/fifo.lua
+++ b/queue/abstract/driver/fifo.lua
@@ -61,6 +61,11 @@ function method.take(self)
end
end
+-- touch task
+function method.touch(self, id, ttr)
+ error('fifo queue does not support touch')
+end
+
-- delete task
function method.delete(self, id)
local task = self.space:delete(id)
diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua
index c2d9c097..ca8595b0 100644
--- a/queue/abstract/driver/fifottl.lua
+++ b/queue/abstract/driver/fifottl.lua
@@ -196,19 +196,50 @@ function method.put(self, data, opts)
end
local task = self.space:insert{
- id,
- status,
- next_event,
- time(ttl),
- time(ttr),
- pri,
- time(),
- data
+ id,
+ status,
+ next_event,
+ time(ttl),
+ time(ttr),
+ pri,
+ time(),
+ data
}
self:on_task_change(task, 'put')
return task
end
+local TIMEOUT_INFINITY_TIME = time(TIMEOUT_INFINITY)
+
+-- touch task
+function method.touch(self, id, increment_seconds)
+ if increment_seconds < 0 then
+ error("Increment can't be less than zero")
+ elseif increment_seconds > TIMEOUT_INFINITY then
+ increment_seconds = TIMEOUT_INFINITY
+ end
+
+ local task = self:peek{id}
+ if increment_seconds == 0 or task[i_ttr] >= TIMEOUT_INFINITY_TIME then
+ return task
+ end
+
+ local increment = 0ULL
+ if increment_seconds == nil then
+ increment = task[i_ttr] or task[i_ttr]
+ else
+ increment = time(increment_seconds)
+ end
+
+ task = self.space:update{
+ id,
+ {{i_ttl, '+', increment}},
+ {{i_ttr, '+', increment}}
+ }
+
+ self:on_task_change(task, 'touch')
+ return task
+end
-- take task
function method.take(self)
diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua
index c268fa01..a747ba3d 100644
--- a/queue/abstract/driver/utube.lua
+++ b/queue/abstract/driver/utube.lua
@@ -75,6 +75,11 @@ function method.take(self)
end
end
+-- touch task
+function method.touch(self, id, ttr)
+ error('utube queue does not support touch')
+end
+
-- delete task
function method.delete(self, id)
local task = self.space:delete(id)
diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua
index f85a80cc..58992323 100644
--- a/queue/abstract/driver/utubettl.lua
+++ b/queue/abstract/driver/utubettl.lua
@@ -211,20 +211,52 @@ function method.put(self, data, opts)
end
local task = self.space:insert{
- id,
- status,
- next_event,
- time(ttl),
- time(ttr),
- pri,
- time(),
- tostring(opts.utube),
- data
+ id,
+ status,
+ next_event,
+ time(ttl),
+ time(ttr),
+ pri,
+ time(),
+ tostring(opts.utube),
+ data
}
self:on_task_change(task, 'put')
return task
end
+local TIMEOUT_INFINITY_TIME = time(TIMEOUT_INFINITY)
+
+-- touch task
+function method.touch(self, id, increment_seconds)
+ if increment_seconds < 0 then
+ error("Increment can't be less than zero")
+ elseif increment_seconds > TIMEOUT_INFINITY then
+ increment_seconds = TIMEOUT_INFINITY
+ end
+
+ local task = self:peek{id}
+ if increment_seconds == 0 or task[i_ttr] >= TIMEOUT_INFINITY_TIME then
+ return task
+ end
+
+ local increment = 0ULL
+ if increment_seconds == nil then
+ increment = task[i_ttr] or task[i_ttr]
+ else
+ increment = time(increment_seconds)
+ end
+
+ task = self.space:update{
+ id,
+ {{i_ttl, '+', increment}},
+ {{i_ttr, '+', increment}}
+ }
+
+ self:on_task_change(task, 'touch')
+ return task
+end
+
-- take task
function method.take(self)
for s, t in self.space.index.status:pairs(state.READY, {iterator = 'GE'}) do