From c49498d5d25be34fa18bf47e2a69f7067a785fe3 Mon Sep 17 00:00:00 2001 From: Anatoly Popov Date: Fri, 16 Dec 2016 17:29:46 +0300 Subject: [PATCH 1/3] Support extend_ttr function --- README.md | 91 ++++++++++++++++++------------ queue/abstract.lua | 14 ++++- queue/abstract/driver/fifo.lua | 5 ++ queue/abstract/driver/fifottl.lua | 29 +++++++--- queue/abstract/driver/utube.lua | 5 ++ queue/abstract/driver/utubettl.lua | 14 +++++ 6 files changed, 113 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index 3f32885..a16a39c 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@ - - @@ -45,8 +45,8 @@ align="right"> Features: * If there is only one consumer, tasks are scheduled in strict FIFO order. -* If there are many concurrent consumers, FIFO order is preserved on -average, but is less strict: concurrent consumers may complete tasks in +* If there are many concurrent consumers, FIFO order is preserved on +average, but is less strict: concurrent consumers may complete tasks in a different order. The following options can be specified when creating a `fifo` queue: @@ -63,7 +63,7 @@ operation; the expected function syntax is `function(task, stats_data)`, where * task priorities (`pri`) * task time to live (`ttl`) * task time to execute (`ttr`) - * delayed execution (`delay`) + * delayed execution (`delay`) Example: @@ -97,12 +97,12 @@ operation The following options can be specified when putting a task in a `fifottl` queue: * `pri` - task priority (`0` is the highest priority and is the default) - * `ttl` - numeric - time to live for a task put into the queue, in + * `ttl` - numeric - time to live for a task put into the queue, in seconds. if `ttl` is not specified, it is set to infinity (if a task exists in a queue for longer than ttl seconds, it is removed) - * `ttr` - numeric - time allotted to the worker to work on a task, in + * `ttr` - numeric - time allotted to the worker to work on a task, in seconds; if `ttr` is not specified, it is set to the same as `ttl` - (if a task is being worked on for more than `ttr` seconds, its status + (if a task is being worked on for more than `ttr` seconds, its status is changed to 'ready' so another worker may take it) * `delay` - time to wait before starting to execute the task, in seconds @@ -115,17 +115,17 @@ queue.tube.tube_name:put('my_task_data', { ttl = 60.1, delay = 80 }) ``` -In the example above, the task has 60.1 seconds to live, but the start -of execution is delayed for 80 seconds. Thus the task actually will +In the example above, the task has 60.1 seconds to live, but the start +of execution is delayed for 80 seconds. Thus the task actually will exist for up to (60.1 + 80) 140.1 seconds. -A smaller priority value indicates a higher priority, so a task with -priority 1 will be executed after a task with priority 0, if all other +A smaller priority value indicates a higher priority, so a task with +priority 1 will be executed after a task with priority 0, if all other options are equal. ## `utube` - a queue with sub-queues inside -The main idea of this queue backend is the same as in a `fifo` queue: +The main idea of this queue backend is the same as in a `fifo` queue: the tasks are executed in FIFO order. However, tasks may be grouped into sub-queues. @@ -137,12 +137,12 @@ already exists * `on_task_change` - function name - a callback to be executed on every operation -The following options can be specified when putting a task in a `utube` +The following options can be specified when putting a task in a `utube` queue: * `utube` - the name of the sub-queue. -Sub-queues split the task stream according to the sub-queue name: it is +Sub-queues split the task stream according to the sub-queue name: it is not possible to take two tasks -out of a sub-queue concurrently, each sub-queue is executed in strict +out of a sub-queue concurrently, each sub-queue is executed in strict FIFO order, one task at a time. `utube` queue does not support: @@ -153,9 +153,9 @@ FIFO order, one task at a time. Example: -Imagine a web crawler, fetching pages from the Internet and finding URLs +Imagine a web crawler, fetching pages from the Internet and finding URLs to fetch more pages. -The web crawler is based on a queue, and each task in the queue refers +The web crawler is based on a queue, and each task in the queue refers to a URL which the web crawler must download and process. If the web crawler is split into many worker processes, then the same URL may show up in the queue many times, because a single URL may be referred to by many @@ -180,11 +180,17 @@ already exists * `on_task_change` - function name - a callback to be executed on every operation -The following options can be specified when putting a task in a +The following options can be specified when putting a task in a `utubettl` queue: * `utube` - the name of the sub-queue - * `ttl` - time to live for a task put into the queue; if `ttl` is not -specified, it is set to infinity + * `ttl` - numeric - time to live for a task put into the queue, in +seconds. if `ttl` is not specified, it is set to infinity + (if a task exists in a queue for longer than ttl seconds, it is removed) + * `ttr` - numeric - time allotted to the worker to work on a task, in +seconds; if `ttr` is not specified, it is set to the same as `ttl` + (if a task is being worked on for more than `ttr` seconds, its status +is changed to 'ready' so another worker may take it) + * `delay` - time to wait before starting to execute the task, in seconds # The underlying spaces @@ -193,20 +199,20 @@ Here is how queues map to spaces in a Tarantool database. The `_queue` space contains tuples for each queue and its properties. This space is created automatically when the queue system is initialized for -the first time (for example, by "require 'queue'"), and is re-used on +the first time (for example, by "require 'queue'"), and is re-used on later occasions. ## Fields of the `_queue` space 1. `tube` - the name of the queue 1. `tube_id` - queue ID, numeric -1. `space` - the name of a space associated with the queue, which +1. `space` - the name of a space associated with the queue, which contains one tuple for each queue task 1. `type` - the queue type ('fifo', 'fifottl', 'utube', 'utubettl') -1. `opts` - additional options supplied when creating the queue, for +1. `opts` - additional options supplied when creating the queue, for example 'ttl' -The `_queue_consumers` temporary space contains tuples for each job +The `_queue_consumers` temporary space contains tuples for each job which is working on a queue. Consumers may be simply waiting for tasks to be put in the queues. @@ -214,17 +220,17 @@ Consumers may be simply waiting for tasks to be put in the queues. 1. `session` - session (connection) ID of the client 1. `fid` - client fiber ID -1. `tube_id` - queue ID, referring to the `tube_id` field in the `_queue` +1. `tube_id` - queue ID, referring to the `tube_id` field in the `_queue` space; the client waits for tasks in this queue 1. `timeout` - the client wait timeout 1. `time` - the time when the client took a task -The `_queue_taken` temporary space contains tuples for each job which is +The `_queue_taken` temporary space contains tuples for each job which is processing a task in the queue. ## Fields of the `_queue_taken` space -1. `session` - session (connection) ID of the client, referring to the +1. `session` - session (connection) ID of the client, referring to the `session_id` field of the `_queue_consumers` space 1. `tube_id` - queue ID, to which the task belongs 1. `task_id` - task ID (of the task being taken) @@ -239,11 +245,11 @@ The associated space contains one tuple for each task. 1. task_id - numeric - see below 2. task_state - 'r' for ready, 't' for taken, etc. - see below 3. task_data - the contents of the task, usually a long string -x. (additional fields if the queue type has options for ttl, priority, +x. (additional fields if the queue type has options for ttl, priority, or delay) The `task_id` value is assigned to a task when it's inserted into a queue. -Currently, `task_id` values are simple integers for `fifo` and `fifottl` +Currently, `task_id` values are simple integers for `fifo` and `fifottl` queues. The `task_state` field takes one of the following values @@ -253,7 +259,7 @@ sets of `task_state` values, so this is a superset): * 'r' - the task is ready for execution (the first consumer executing a `take` request will get it) * 't' - the task has been taken by a consumer -* '-' - the task has been executed (a task is removed from the queue +* '-' - the task has been executed (a task is removed from the queue after it has been executed, so this value will rarely be seen) * '!' - the task is buried (disabled temporarily until further changes) @@ -262,10 +268,10 @@ after it # Installing There are three alternative ways of installation. -* Get the `tarantool_queue` package from a repository. For example, on +* Get the `tarantool_queue` package from a repository. For example, on Ubuntu, say: sudo apt-get install tarantool-queue * Take the Lua rock from rocks.tarantool.org. -* Take the source files from https://github.com/tarantool/queue, then +* Take the source files from https://github.com/tarantool/queue, then build and install. # Using the `queue` module @@ -315,9 +321,9 @@ The `tube_name` must be the name which was specified by `queue.create_tube`. The `task_data` contents are the user-defined description of the task, usually a long string. -The options, if specified, must be one or more of the options described +The options, if specified, must be one or more of the options described above -(`ttl` and/or `ttr` and/or `pri` and/or `delay` and/or `utube`, depending on the queue +(`ttl` and/or `ttr` and/or `pri` and/or `delay` and/or `utube`, depending on the queue type). If an option is not specified, the default is what was specified during `queue.create_tube`, and if that was not specified, then the default is what @@ -332,7 +338,7 @@ task_data = whatever the user put in the `task_data` parameter Returns: the value of the new tuple in the queue's associated space, also called the "created task". -Example: queue.tube.list_of_sites:put('Your task is to do something', +Example: queue.tube.list_of_sites:put('Your task is to do something', {pri=2}) After a task has been put in a queue, one of these things may happen: @@ -368,6 +374,19 @@ instructions for what to do with the task. Example: t_value = queue.tube.list_of_sites:take(15) +## Extending TTR for tasks + +```lua +queue.tube.tube_name:extend_ttr(task_id, ttr_change) +``` + +Extend `ttr` of running task. Useful if you can't predict in advance +time needed to work on task. + +Effect: the value of `ttr` increased by `ttr_change` seconds. If queue +does not support ttr, error will be thrown. + +Example: t_value = queue.tube.list_of_sites:extend_ttr(15, 60) ## Acknowledging the completion of a task diff --git a/queue/abstract.lua b/queue/abstract.lua index 190aa3f..ac621df 100644 --- a/queue/abstract.lua +++ b/queue/abstract.lua @@ -68,6 +68,17 @@ function tube.take(self, timeout) end end +function tube.extend_ttr(self, id, ttr) + local task = self:peek(id) + local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id} + if _taken == nil then + error("Task was not taken in the session") + end + + queue.stat[space_name]:inc('extend_ttr') + return self.raw:normalize_task(self.raw:extend_ttr(id, ttr)) +end + function tube.ack(self, id) local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id} if _taken == nil then @@ -397,7 +408,8 @@ local function build_stats(space) kick = 0, put = 0, release = 0, - take = 0 + take = 0, + extend_ttr = 0 }} local st = rawget(queue.stat, space) or {} diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index c23b8db..944074c 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -61,6 +61,11 @@ function method.take(self) end end +-- extend TTR of task +function method.extend_ttr(self, id, ttr) + error('fifo queue does not support ttr') +end + -- delete task function method.delete(self, id) local task = self.space:delete(id) diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index c2d9c09..ffc3816 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -196,19 +196,32 @@ function method.put(self, data, opts) end local task = self.space:insert{ - id, - status, - next_event, - time(ttl), - time(ttr), - pri, - time(), - data + id, + status, + next_event, + time(ttl), + time(ttr), + pri, + time(), + data } self:on_task_change(task, 'put') return task end +-- extend TTR of task +function method.extend_ttr(self, id, ttr) + if ttr <= 0 then + error('ttr should be greater that zero to extend ttr') + end + + local task = self.space:update{ + id, + {{5, '+', time(ttr)}} + } + self:on_task_change(task, 'extend_ttr') + return task +end -- take task function method.take(self) diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index c268fa0..2e529ff 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -75,6 +75,11 @@ function method.take(self) end end +-- extend TTR of task +function method.extend_ttr(self, id, ttr) + error('utube queue does not support ttr') +end + -- delete task function method.delete(self, id) local task = self.space:delete(id) diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index f85a80c..1e5ca4b 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -225,6 +225,20 @@ function method.put(self, data, opts) return task end +-- extend TTR of task +function method.extend_ttr(self, id, ttr) + if ttr <= 0 then + error('ttr should be greater that zero to extend ttr') + end + + local task = self.space:update{ + id, + {{5, '+', time(ttr)}} + } + self:on_task_change(task, 'extend_ttr') + return task +end + -- take task function method.take(self) for s, t in self.space.index.status:pairs(state.READY, {iterator = 'GE'}) do From 5ef82f100e17a3cfda955ed91764cbc2bf24376f Mon Sep 17 00:00:00 2001 From: Anatoly Popov Date: Tue, 27 Dec 2016 16:45:26 +0300 Subject: [PATCH 2/3] Rename extend_ttr to touch --- README.md | 13 +++++++------ queue/abstract.lua | 8 ++++---- queue/abstract/driver/fifo.lua | 6 +++--- queue/abstract/driver/fifottl.lua | 8 ++++---- queue/abstract/driver/utube.lua | 6 +++--- queue/abstract/driver/utubettl.lua | 8 ++++---- 6 files changed, 25 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index a16a39c..df078f6 100644 --- a/README.md +++ b/README.md @@ -374,19 +374,20 @@ instructions for what to do with the task. Example: t_value = queue.tube.list_of_sites:take(15) -## Extending TTR for tasks +## Increasing TTR and/or TTL for tasks ```lua -queue.tube.tube_name:extend_ttr(task_id, ttr_change) +queue.tube.tube_name:touch(task_id, increment) ``` -Extend `ttr` of running task. Useful if you can't predict in advance +Increase `ttr` of running task. Useful if you can't predict in advance time needed to work on task. -Effect: the value of `ttr` increased by `ttr_change` seconds. If queue -does not support ttr, error will be thrown. +Effect: the value of `ttr` and `ttl` increased by `increment` seconds. If queue +does not support ttr, error will be thrown. If `increment` is lower than zero, +error will be thrown. -Example: t_value = queue.tube.list_of_sites:extend_ttr(15, 60) +Example: t_value = queue.tube.list_of_sites:touch(15, 60) ## Acknowledging the completion of a task diff --git a/queue/abstract.lua b/queue/abstract.lua index ac621df..ebf0a0d 100644 --- a/queue/abstract.lua +++ b/queue/abstract.lua @@ -68,15 +68,15 @@ function tube.take(self, timeout) end end -function tube.extend_ttr(self, id, ttr) +function tube.touch(self, id, ttr) local task = self:peek(id) local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id} if _taken == nil then error("Task was not taken in the session") end - queue.stat[space_name]:inc('extend_ttr') - return self.raw:normalize_task(self.raw:extend_ttr(id, ttr)) + queue.stat[space_name]:inc('touch') + return self.raw:normalize_task(self.raw:touch(id, ttr)) end function tube.ack(self, id) @@ -409,7 +409,7 @@ local function build_stats(space) put = 0, release = 0, take = 0, - extend_ttr = 0 + touch = 0 }} local st = rawget(queue.stat, space) or {} diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index 944074c..8692a26 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -61,9 +61,9 @@ function method.take(self) end end --- extend TTR of task -function method.extend_ttr(self, id, ttr) - error('fifo queue does not support ttr') +-- touch task +function method.touch(self, id, ttr) + error('fifo queue does not support touch') end -- delete task diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index ffc3816..b85c279 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -209,17 +209,17 @@ function method.put(self, data, opts) return task end --- extend TTR of task -function method.extend_ttr(self, id, ttr) +-- touch TTR of task +function method.touch(self, id, ttr) if ttr <= 0 then - error('ttr should be greater that zero to extend ttr') + error('ttr should be greater that zero to touch') end local task = self.space:update{ id, {{5, '+', time(ttr)}} } - self:on_task_change(task, 'extend_ttr') + self:on_task_change(task, 'touch') return task end diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index 2e529ff..a747ba3 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -75,9 +75,9 @@ function method.take(self) end end --- extend TTR of task -function method.extend_ttr(self, id, ttr) - error('utube queue does not support ttr') +-- touch task +function method.touch(self, id, ttr) + error('utube queue does not support touch') end -- delete task diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index 1e5ca4b..daccbb3 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -225,17 +225,17 @@ function method.put(self, data, opts) return task end --- extend TTR of task -function method.extend_ttr(self, id, ttr) +-- touch task +function method.touch(self, id, ttr) if ttr <= 0 then - error('ttr should be greater that zero to extend ttr') + error('ttr should be greater that zero to touch') end local task = self.space:update{ id, {{5, '+', time(ttr)}} } - self:on_task_change(task, 'extend_ttr') + self:on_task_change(task, 'touch') return task end From 1d76e00dfd46563751a3b6e29e3d80aedc0c0117 Mon Sep 17 00:00:00 2001 From: Anatoly Popov Date: Tue, 27 Dec 2016 17:18:54 +0300 Subject: [PATCH 3/3] Increasing ttl with ttr. Add checks for overflow. --- README.md | 4 ++- queue/abstract.lua | 12 ++++++-- queue/abstract/driver/fifottl.lua | 30 +++++++++++++++---- queue/abstract/driver/utubettl.lua | 46 +++++++++++++++++++++--------- 4 files changed, 69 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index df078f6..6ee9442 100644 --- a/README.md +++ b/README.md @@ -385,7 +385,9 @@ time needed to work on task. Effect: the value of `ttr` and `ttl` increased by `increment` seconds. If queue does not support ttr, error will be thrown. If `increment` is lower than zero, -error will be thrown. +error will be thrown. If `increment` is zero effect is no op. If `increment` is +`nil` it will be equal to current `ttr` of task. If current `ttr` of task is +500 years or greater then operation is noop. Example: t_value = queue.tube.list_of_sites:touch(15, 60) diff --git a/queue/abstract.lua b/queue/abstract.lua index ebf0a0d..8214b14 100644 --- a/queue/abstract.lua +++ b/queue/abstract.lua @@ -68,7 +68,15 @@ function tube.take(self, timeout) end end -function tube.touch(self, id, ttr) +function tube.touch(self, id, increment) + if increment < 0 then + error("Increment can't be less than zero") + end + + if increment == 0 then + return + end + local task = self:peek(id) local _taken = box.space._queue_taken:get{session.id(), self.tube_id, id} if _taken == nil then @@ -76,7 +84,7 @@ function tube.touch(self, id, ttr) end queue.stat[space_name]:inc('touch') - return self.raw:normalize_task(self.raw:touch(id, ttr)) + return self.raw:normalize_task(self.raw:touch(id, increment)) end function tube.ack(self, id) diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index b85c279..ca8595b 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -209,16 +209,34 @@ function method.put(self, data, opts) return task end --- touch TTR of task -function method.touch(self, id, ttr) - if ttr <= 0 then - error('ttr should be greater that zero to touch') +local TIMEOUT_INFINITY_TIME = time(TIMEOUT_INFINITY) + +-- touch task +function method.touch(self, id, increment_seconds) + if increment_seconds < 0 then + error("Increment can't be less than zero") + elseif increment_seconds > TIMEOUT_INFINITY then + increment_seconds = TIMEOUT_INFINITY end - local task = self.space:update{ + local task = self:peek{id} + if increment_seconds == 0 or task[i_ttr] >= TIMEOUT_INFINITY_TIME then + return task + end + + local increment = 0ULL + if increment_seconds == nil then + increment = task[i_ttr] or task[i_ttr] + else + increment = time(increment_seconds) + end + + task = self.space:update{ id, - {{5, '+', time(ttr)}} + {{i_ttl, '+', increment}}, + {{i_ttr, '+', increment}} } + self:on_task_change(task, 'touch') return task end diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index daccbb3..5899232 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -211,30 +211,48 @@ function method.put(self, data, opts) end local task = self.space:insert{ - id, - status, - next_event, - time(ttl), - time(ttr), - pri, - time(), - tostring(opts.utube), - data + id, + status, + next_event, + time(ttl), + time(ttr), + pri, + time(), + tostring(opts.utube), + data } self:on_task_change(task, 'put') return task end +local TIMEOUT_INFINITY_TIME = time(TIMEOUT_INFINITY) + -- touch task -function method.touch(self, id, ttr) - if ttr <= 0 then - error('ttr should be greater that zero to touch') +function method.touch(self, id, increment_seconds) + if increment_seconds < 0 then + error("Increment can't be less than zero") + elseif increment_seconds > TIMEOUT_INFINITY then + increment_seconds = TIMEOUT_INFINITY + end + + local task = self:peek{id} + if increment_seconds == 0 or task[i_ttr] >= TIMEOUT_INFINITY_TIME then + return task end - local task = self.space:update{ + local increment = 0ULL + if increment_seconds == nil then + increment = task[i_ttr] or task[i_ttr] + else + increment = time(increment_seconds) + end + + task = self.space:update{ id, - {{5, '+', time(ttr)}} + {{i_ttl, '+', increment}}, + {{i_ttr, '+', increment}} } + self:on_task_change(task, 'touch') return task end