Skip to content

Minnzen/sqliteq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

sqliteq

Durable message queue over SQLite, in TypeScript.

Inspired by goqite. Zero dependencies.

Install

npm install @minnzen/sqliteq better-sqlite3

sqliteq has zero runtime dependencies. You provide the SQLite binding — better-sqlite3 is recommended. Bun users can use the built-in bun:sqlite instead.

Usage

import Database from 'better-sqlite3'
import { Queue } from '@minnzen/sqliteq'

const db = new Database('app.db')
const q = new Queue(db, 'emails')

q.send({ to: 'user@example.com', subject: 'Hello' })

const msg = q.receive()
if (msg) {
  console.log(msg.body) // { to: 'user@example.com', subject: 'Hello' }
  q.delete(msg.id, msg.received)
}

The table and indexes are created automatically on first use. WAL mode is enabled by default.

Features

  • Single sqliteq table holds all queues -- no migrations per queue
  • Atomic receive via UPDATE ... WHERE id = (SELECT ...) -- no advisory locks
  • Visibility timeout with automatic redelivery on failure
  • Priority queues (higher priority messages are received first)
  • Delayed messages
  • Batch send in a single transaction
  • Dead letter detection (messages exceeding max receive count)
  • Typed messages with generics (Queue<T>)
  • Bring your own SQLite driver -- tested with better-sqlite3 and bun:sqlite

API

new Queue<T>(db, name, options?)

Create or connect to a named queue. Schema is created if it does not exist.

Option Type Default Description
timeout number 5000 Visibility timeout in ms
maxReceive number 3 Max receives before dead-lettering
maxBodyBytes number 1048576 Max body size in bytes after JSON serialization

The db parameter accepts any object matching the Database interface (a subset of better-sqlite3 and bun:sqlite):

interface Database {
  prepare(sql: string): Statement
  exec(sql: string): void
  transaction<R>(fn: (...args: unknown[]) => R): (...args: unknown[]) => R
}

queue.send(body: T, options?): string

Send a message. Returns the message ID.

Option Type Default Description
delay number 0 Delivery delay in ms
priority number 0 Higher values are received first

queue.sendBatch(messages): string[]

Send multiple messages in one transaction. Each entry is { body: T, options?: SendOptions }. Returns an array of IDs.

queue.receive(): Message<T> | null

Claim the next available message. Returns null when the queue is empty or all messages are in-flight. The message becomes invisible to other consumers for timeout ms.

interface Message<T> {
  id: string
  body: T
  received: number  // receive count (1 on first delivery)
}

queue.extend(id, received, delay): boolean

Extend a message's visibility timeout by delay ms. Returns false if the message was already redelivered to another consumer (stale handle).

queue.delete(id, received): boolean

Acknowledge and remove a message. Returns false on stale handle (safe no-op).

queue.size(): number

Total messages in the queue (all states).

queue.purge(): number

Delete all messages. Returns the count removed.

queue.deadLetters(): Message<T>[]

Get messages that exceeded maxReceive. These will never be delivered again and should be inspected or moved.

new Processor<T>(queue, options)

Long-running consumer that polls, processes, and auto-deletes messages.

import { Queue, Processor } from '@minnzen/sqliteq'

const q = new Queue(db, 'jobs')
const p = new Processor(q, {
  handler(msg) {
    console.log('processing', msg.body)
    // return normally = auto-delete
    // throw = leave for retry after timeout
  },
  pollInterval: 200,
  concurrency: 4,
})

p.start()
// later:
await p.stop() // waits for in-flight handlers to finish
Option Type Default Description
handler (msg) => void | Promise<void> required Message handler
pollInterval number 100 Poll interval in ms
concurrency number 1 Max simultaneous handlers
extendInterval number queue timeout Auto-extend period; 0 to disable
onError (error, context) => void console.error Error callback

The processor auto-extends visibility at 4/5 of the extend interval so long-running handlers don't lose their message. Note: auto-extend relies on setInterval, so handlers must yield the event loop (use async/await). CPU-bound synchronous handlers will block the timer and may cause unintended redelivery.

Benchmarks

On a MacBook Pro M-series, with better-sqlite3, file-backed database:

Operation Throughput Latency
send + receive + delete ~20,000 ops/sec 49 us/op
send only ~31,000 ops/sec --
receive + delete (100K row table, 10 queues) ~18,000 ops/sec 55 us/op
sendBatch (100 msgs/transaction) ~120,000 ops/sec --

Run npx tsx bench/run.ts to reproduce.

Design

Messages live in a single sqliteq table shared across all queues. Each message has a timeout timestamp; receiving a message atomically bumps that timestamp forward, making the message invisible. If the consumer crashes or fails to delete the message, it becomes visible again after the timeout expires. After maxReceive deliveries, the message is dead-lettered.

This is the same model as SQS and goqite, adapted for synchronous SQLite.

Delivery guarantee is at-least-once. Handlers must be idempotent because a message can be delivered more than once if the previous consumer times out.

License

MIT

About

Durable message queue backed by SQLite. Atomic claim, visibility timeout, fencing. Zero dependencies.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors