Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# pgqueue

PostgreSQL-only background job queue for Go. No Redis, no SQS — just your existing PostgreSQL.

## Features

- **No additional infrastructure** — uses PostgreSQL as the broker
- **LISTEN/NOTIFY** — real-time job pickup without polling
- **`SELECT ... FOR UPDATE SKIP LOCKED`** — lock-free concurrent dequeue
- **Transactional enqueue** — Outbox pattern with `EnqueueTx`
- **At-least-once delivery** — exponential backoff retry with dead letter
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The README claims "At-least-once delivery," but the current implementation has no mechanism to reclaim jobs stuck in the running state (e.g., after a worker crash or ungraceful shutdown). The dequeueOne query only selects jobs with state = 'pending', so a job that was dequeued but never completed, failed, or marked dead will remain in running state indefinitely.

To support the "at-least-once delivery" guarantee, consider adding a stale-job recovery mechanism, such as a periodic sweep that resets running jobs back to pending if they have not been updated within a timeout window.

Suggested change
- **At-least-once delivery** — exponential backoff retry with dead letter
- **Retries with backoff and dead letter** — exponential backoff retry with dead letter

Copilot uses AI. Check for mistakes.
- **Job deduplication** — `unique_key` prevents duplicate active jobs
- **Scheduled jobs** — `WithRunAt` for future execution

## Install

```sh
go get github.com/mickamy/pgqueue
```

## Quick Start

```go
package main

import (
"context"
"fmt"
"log"
"os/signal"
"syscall"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/mickamy/pgqueue"
)

func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

pool, err := pgxpool.New(ctx, "postgres://localhost:5432/mydb")
if err != nil {
log.Fatal(err)
}
defer pool.Close()

// Create table and indexes
if err := pgqueue.Migrate(ctx, pool); err != nil {
log.Fatal(err)
}

// Enqueue a job
client := pgqueue.NewClient(pool)
job, err := client.Enqueue(ctx, "email", []byte(`{"to":"user@example.com"}`))
if err != nil {
log.Fatal(err)
}
fmt.Printf("enqueued job %d\n", job.ID)

// Process jobs
worker := pgqueue.NewWorker(pool)
worker.Handle("email", func(ctx context.Context, job *pgqueue.Job) error {
fmt.Printf("processing job %d: %s\n", job.ID, job.Payload)
return nil
})

if err := worker.Start(ctx); err != nil {
log.Fatal(err)
}
}
```

## Transactional Enqueue (Outbox Pattern)

Enqueue a job within the same transaction as your business logic. The job is only visible after the transaction commits.

```go
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)

// Business logic
_, err = tx.Exec(ctx, "INSERT INTO orders (id, total) VALUES ($1, $2)", orderID, total)
if err != nil {
return err
}

// Enqueue in the same transaction
client := pgqueue.NewClient(pool)
_, err = client.EnqueueTx(ctx, tx, "send_receipt", []byte(`{"order_id":"123"}`))
if err != nil {
return err
}

return tx.Commit(ctx)
```

## Enqueue Options

```go
// Schedule for later
client.Enqueue(ctx, "report", payload, pgqueue.WithRunAt(time.Now().Add(24*time.Hour)))

// Custom retry count
client.Enqueue(ctx, "webhook", payload, pgqueue.WithMaxRetries(5))

// Deduplication
client.Enqueue(ctx, "sync", payload, pgqueue.WithUniqueKey("user:123"))
```

## Worker Options

```go
worker := pgqueue.NewWorker(pool,
pgqueue.WithConcurrency(20), // max concurrent jobs (default: 10)
pgqueue.WithPollInterval(3*time.Second), // fallback poll interval (default: 5s)
pgqueue.WithRetryBaseWait(2*time.Second), // retry backoff base (default: 1s)
)
```

## Job Lifecycle

```
pending ──▶ running ──▶ completed
pending (retry with backoff)
dead (max retries exceeded)
```

## Retry Strategy

Failed jobs are retried with exponential backoff:

| Attempt | Delay (base = 1s) |
|---------|-------------------|
| 1 | 1s |
| 2 | 2s |
| 3 | 4s |

After `max_retries` (default: 3), the job moves to `dead` state.

## Schema

`pgqueue.Migrate` creates the following:

- **`pgqueue_jobs`** table
- **Dequeue index** — partial index on `(queue, run_at) WHERE state = 'pending'`
- **Unique index** — `(queue, unique_key) WHERE unique_key IS NOT NULL AND state NOT IN ('completed', 'dead')`
- **NOTIFY trigger** — automatically notifies workers on INSERT

## License

[MIT](./LICENSE)
84 changes: 84 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package pgqueue

import (
"context"
"fmt"
"time"

"github.com/jackc/pgx/v5"
)

const enqueueQuery = `
INSERT INTO pgqueue_jobs (queue, payload, state, run_at, max_retries, unique_key)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, queue, payload, state, run_at, attempts, max_retries, unique_key, last_error, created_at, updated_at
`

// Querier is the minimal interface required to execute queries that return rows.
// Satisfied by *pgxpool.Pool, *pgx.Conn, and pgx.Tx.
type Querier interface {
Executor
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}

// Client enqueues jobs into the queue.
type Client struct {
db Querier
}

// NewClient creates a new Client.
func NewClient(db Querier) *Client {
return &Client{db: db}
}

// Enqueue adds a new job to the specified queue.
func (c *Client) Enqueue(ctx context.Context, queue string, payload []byte, opts ...EnqueueOption) (*Job, error) {
return enqueue(ctx, c.db, queue, payload, opts...)
}

// EnqueueTx adds a new job within an existing transaction.
// This enables the Outbox pattern — enqueue and business logic in the same transaction.
func (c *Client) EnqueueTx(
ctx context.Context, tx Querier, queue string, payload []byte, opts ...EnqueueOption,
) (*Job, error) {
return enqueue(ctx, tx, queue, payload, opts...)
}

func enqueue(ctx context.Context, q Querier, queue string, payload []byte, opts ...EnqueueOption) (*Job, error) {
p := defaultEnqueueParams()
for _, opt := range opts {
opt(&p)
}

runAt := p.runAt
if runAt.IsZero() {
runAt = time.Now()
}

var job Job
err := q.QueryRow(ctx, enqueueQuery,
queue,
payload,
JobStatePending,
runAt,
p.maxRetries,
p.uniqueKey,
).Scan(
&job.ID,
&job.Queue,
&job.Payload,
&job.State,
&job.RunAt,
&job.Attempts,
&job.MaxRetries,
&job.UniqueKey,
&job.LastError,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return nil, fmt.Errorf("pgqueue: enqueue: %w", err)
}

return &job, nil
}
Loading