Skip to content

thruflo/ntorque

Repository files navigation

nTorque - web hook task queue

nTorque is a task queue service that uses web hooks. It is free, open source software released into the public domain that you can use from any programming language (that speaks HTTP) to queue up and reliably execute idempotent tasks. For example, in Python:

import os
import requests

params = {'url': 'http://example.com/myhooks/send_email'}
data = {'user_id': 1234}

endpoint = os.environ.get('NTORQUE_URL')
response = requests.post(endpoint, data=data, params=params)

Rationale

nTorque is designed to be a good solution when you need more reliability than fire-and-forget but you don't need an AMPQ / ESB sledgehammer to crack your "do this later" nut.

Because it uses web hooks, you can:

  1. use it from (and to integrate) applications written in any language
  2. use DNS / web server load balancing to distribute tasks
  3. bootstrap your task execution environment the way you bootstrap a web application -- i.e.: once at startup, re-using your web app's configuration and middleware

Functionality

nTorque provides the following endpoints:

  • POST / to enqueue a task
  • GET /tasks/:id to view task status

And the following features:

  • persistent task storage
  • non-blocking, concurrent task execution
  • HTTPS and redirect support
  • configurable (linear or exponential) backoff

Implementation

nTorque is a Python application comprising of a web application and one or more worker processes. These use a PostgreSQL database to persist tasks and a Redis database as a notification channel.

+------+  |  +--------+    +--------+    +--------+  |
|POST /|     |Frontend|    |Web app |    |Postgres|
|------|  |  |--------|    |--------|    |--------|  |
|- url |+- ->|- auth  |+-->|- store |+-->|- tasks |
|- data|  |  |- rate  |    |- notify|    |        |  |
|      |     |  limits|    |        |    |        |
+------+  |  +--------+    +--------+    +--------+
                               +           ^    +    |
          |                    |           |   url
                             rpush        get  data  |
          |                    |           |    |
                               v           +    v    |
          |                 +--------+   +--------+     +---------+
                            |Redis   |   |Worker  |  |  |Web hook |
          |                 +--------+   |--------|     |---------|
                               |         |- POST  |+-|->|- perform|
          | nTorque            +-blpop-> |  data  |     |  task   |
                                         +--------+  |  +---------+

In the event of a response with status code:

  • 200 or 201: the task is marked as successfully completed
  • 202 - 499: the task is marked as failed and is not retried
  • 500 (or network error): the task is retried

Hack here if you'd like a different strategy.

Algorithm

The real crux of nTorque is a trade-off between request timeout and retry delay. It's worth understanding this before deploying -- and how to simply mitigate it by a) specifying an appropriate default timeout and b) overriding this as necessary on a task by task basis.

Like RQ and Resque, nTorque uses Redis as a push messaging channel. A request comes in, a notification is rpushd onto a channel and blpopd off. This means that tasks are executed immediately, with a nice evented / push notification pattern.

Unlike RQ and Resque, nTorque doesn't trust Redis as a persistence layer. Instead, it relies on good-old-fashioned PostgreSQL: the first thing nTorque does when a new task arrives is write it to disk. It then notifies a consumer process using Redis BLPOP. The consumer then reads the data from disk and performs the task by making an HTTP request to its url.

In most cases, this request will succeed, the task will be marked as completed and no more needs to be done. However, this won't happen every time, e.g.: when there's a network error or the webhook server is temporarily down. Because there are edge case failure scenarios where the web hook response is unreliable, nTorque refuses to rely on it as the source of truth™ about a task's status. Instead, the single source of truth is the PostgreSQL database.

This is achieved by automatically setting a task to retry every time it's read ("acquired") from the database. Specifically, the query that reads the task data is performed within a transaction that also updates the task's due date and retry count. This means that in any failure scenario, nTorque can always just be restarted (potentially on a new server as long as it connects to the same database) and you can be sure that tasks will be performed at least once no matter where they were in the pipeline when whatever it was fell over.

Incidentally, tasks due to be retried are picked up by a background process that polls the database every NTORQUE_REQUEUE_INTERVAL seconds.

More importantly, and where this description has been heading, is the relation between the due date of the task as it lies, gloriously in repose, and the timeout of the web hook call. For there is one thing we don't want to do, and that is keep retrying tasks before they've had a chance to complete.

In order to prevent this behaviour, we impose a simple constraint:

The due date set when the task is transactionally read and incremented must be longer than the web hook timeout.

This means that, in the worst case (when a web hook request does timeout or fail to respond), you must wait for the full timeout duration before your task is retried. So whilst you may naturally want to set a relatively high timeout for long running tasks, you may want to keep it shorter for simper tasks like sending your new user's welcome or reset password email: so that they're retried faster.

The good news is that, in addition to the global NTORQUE_DEFAULT_TIMEOUT configuration variable, you can set an appropriate timeout for different tasks using the timeout query parameter.

Simple -- once you know how the system works.

Installation

Clone the repo, install the Python app using:

bash pip_install.sh

You need Redis and Postgres running. If necessary, create the database:

createdb -T template0 -E UTF8 ntorque

If you like, install Foreman, to run the multiple processes, using:

bundle install

Run the migrations:

foreman run alembic upgrade head

Bootstrap an app (if you'd like to authenticate access with an API key):

foreman run python alembic/scripts/create_application.py --name YOURAPP

You should then be able to:

foreman start

Alternatively, skip the Foreman stuff and run the commands listed in Processes manually / using a Docker / Chef / init.d wrapper. Or push to Heroku, run the migrations and it should just work.

Configuration

Algorithm / Behaviour:

  • NTORQUE_BACKOFF: exponential (default) or linear
  • NTORQUE_CLEANUP_AFTER_DAYS: how many days to leave tasks in the db for, defaults to 7
  • NTORQUE_DEFAULT_TIMEOUT: how long, in seconds, to wait before treating a web hook request as having failed -- defaults to 60 see the algorithm section above for details
  • NTORQUE_MIN_DUE_DELAY: minimum delay before retrying -- don't set any lower than 2
  • NTORQUE_MAX_DUE_DELAY: maximum retry delay -- defaults to 7200 but you should make sure its longer than NTORQUE_DEFAULT_TIMEOUT
  • NTORQUE_MAX_RETRIES: how many attempts before giving up on a task -- defaults to 36
  • NTORQUE_REQUEUE_INTERVAL: how often, in seconds, to poll the database for tasks to requeue -- defaults to 5
  • NTORQUE_TRANSIENT_REQUEST_ERRORS: 4xx errors which ntorque should retry -- defaults to '408,423,429,449'

Deployment:

  • NTORQUE_AUTHENTICATE: whether to require authentication; defaults to True -- see authentication section in Usage below
  • NTORQUE_ENABLE_HSTS: set this to True if you're using HSTS
  • HSTS_PROTOCOL_HEADER: set this to, e.g.: X-Forwarded-Proto if you're running behind an https proxy frontend (see pyramid_hsts for more details)
  • MODE: if set to development this will run Gunicorn in watch mode (so the app server restarts when a Python file changes) and will raise HTTP exceptions in the API views (rather than returning them). If set to production it will run Gunicorn behind a newrelic client. If this isn't quite what you want then either don't set it or set it to any other string (or hack the run.sh and / or gunicorn.py scripts)

Redis:

  • NTORQUE_REDIS_CHANNEL: name of your Redis list used as a notification channel; defaults to ntorque
  • REDIS_URL, etc.: see pyramid_redis for details on how to configure your Redis connection

Database:

  • DATABASE_URL, defaults to postgresql:///ntorque
  • SQLALCHEMY_MAX_OVERFLOW, SQLALCHEMY_POOL_CLASS, SQLALCHEMY_POOL_SIZE and SQLALCHEMY_POOL_RECYCLE -- see the SQLAlchemy docs on engine configuration and pyramid_basemodel for more information; if you don't provide these then SQLAlchemy will use sensible defaults, also note that if you're using pgbouncer you should set SQLALCHEMY_POOL_CLASS=sqlalchemy.pool.NullPool

Usage / API

Authentication

If you set NTORQUE_AUTHENTICATE to True then you need to create at least one application (e.g.: using the alembic/scripts/create_application.py script) and provide its api key in the NTORQUE_API_KEY header when enqueuing a task.

POST /

To enqueue a task, make a POST request to the root path of your nTorque installation.

Required:

  • a url query parameter; this is the url to your web hook that you want nTorque to call to perform your task

Optional:

  • a method query parameter; which http method to use when calling the webhook -- the default is POST, but you can alternatively specify DELETE, PUT or PATCH.
  • a timeout query parameter; how long, in seconds, to wait before treating the web hook call as having timed out -- see the Algorithm section above for context

Data:

This aside, you can pass through any POST data, encoded as any content type you like. The data, content type and character encoding will be passed on in the POST (or DELETE, PUT or PATCH) request to your web hook.

Headers:

Aside from the content type, length and charset headers, derived from your request, you can specify headers to pass through to your web hook, by prefixing the header name with NTORQUE-PASSTHROUGH-. So, for example, to pass through a FOO: Bar header, you would provide NTORQUE-PASSTHROUGH-FOO: Bar in your request headers.

Response:

You should receive a 201 response with the url to the task in the Location header.

GET /task/:id

Returns a JSON data dict with status information about a task.

POST /task/:id/push

Pushes a task onto the redis notification channel to be consumed, aquired and performed. You should not normally need to use this. It's exposed as an optimisation for hybrid integrations.

Pro-Tips

nTorque is a system for reliably calling web hook task handlers: not for implementing them. You are responsible for implementing and exposing your own web hooks. In most languages and frameworks this is simple, e.g.: in Ruby using Sinatra:

post '/hooks/foo' do
    # your code here
end

Or in Python using Flask:

@app.route('/hooks/foo', methods=['POST'])
def foo():
    # your code here

Key things to bear in mind are:

Status Code

After successfully performing their task, your web hooks are expected to return an HTTP response with a 200 or 201 status code. If not, nTorque will keep retrying the task.

Avoid Timeouts

Your web server must be configured with a high enough timeout to allow tasks enough time to complete. If not, you may be responding with an error when tasks are actually being performed successfully.

For example, for a 30 minute timeout with Apache as a proxy:

Timeout 1800
ProxyTimeout 1800

Or with Nginx:

send_timeout 1800;
proxy_send_timeout 1800;

Secure Public Hooks

If your web hooks are exposed on a public IP, you are likely to want to secure them, e.g.: using HTTPS and an authentication credential like an API key.

It's also worth noting that you may need to turn off CSRF validation.

Support

Raise bugs / issues on GitHub.

Releases

No releases published

Packages

No packages published

Languages