diff --git a/README.md b/README.md new file mode 100644 index 0000000..cc749a8 --- /dev/null +++ b/README.md @@ -0,0 +1,158 @@ +# pgqueue + +PostgreSQL-only background job queue for Go. No Redis, no SQS — just your existing PostgreSQL. + +## Features + +- **No additional infrastructure** — uses PostgreSQL as the broker +- **LISTEN/NOTIFY** — real-time job pickup without polling +- **`SELECT ... FOR UPDATE SKIP LOCKED`** — lock-free concurrent dequeue +- **Transactional enqueue** — Outbox pattern with `EnqueueTx` +- **At-least-once delivery** — exponential backoff retry with dead letter +- **Job deduplication** — `unique_key` prevents duplicate active jobs +- **Scheduled jobs** — `WithRunAt` for future execution + +## Install + +```sh +go get github.com/mickamy/pgqueue +``` + +## Quick Start + +```go +package main + +import ( + "context" + "fmt" + "log" + "os/signal" + "syscall" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/mickamy/pgqueue" +) + +func main() { + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + pool, err := pgxpool.New(ctx, "postgres://localhost:5432/mydb") + if err != nil { + log.Fatal(err) + } + defer pool.Close() + + // Create table and indexes + if err := pgqueue.Migrate(ctx, pool); err != nil { + log.Fatal(err) + } + + // Enqueue a job + client := pgqueue.NewClient(pool) + job, err := client.Enqueue(ctx, "email", []byte(`{"to":"user@example.com"}`)) + if err != nil { + log.Fatal(err) + } + fmt.Printf("enqueued job %d\n", job.ID) + + // Process jobs + worker := pgqueue.NewWorker(pool) + worker.Handle("email", func(ctx context.Context, job *pgqueue.Job) error { + fmt.Printf("processing job %d: %s\n", job.ID, job.Payload) + return nil + }) + + if err := worker.Start(ctx); err != nil { + log.Fatal(err) + } +} +``` + +## Transactional Enqueue (Outbox Pattern) + +Enqueue a job within the same transaction as your business logic. The job is only visible after the transaction commits. + +```go +tx, err := pool.Begin(ctx) +if err != nil { + return err +} +defer tx.Rollback(ctx) + +// Business logic +_, err = tx.Exec(ctx, "INSERT INTO orders (id, total) VALUES ($1, $2)", orderID, total) +if err != nil { + return err +} + +// Enqueue in the same transaction +client := pgqueue.NewClient(pool) +_, err = client.EnqueueTx(ctx, tx, "send_receipt", []byte(`{"order_id":"123"}`)) +if err != nil { + return err +} + +return tx.Commit(ctx) +``` + +## Enqueue Options + +```go +// Schedule for later +client.Enqueue(ctx, "report", payload, pgqueue.WithRunAt(time.Now().Add(24*time.Hour))) + +// Custom retry count +client.Enqueue(ctx, "webhook", payload, pgqueue.WithMaxRetries(5)) + +// Deduplication +client.Enqueue(ctx, "sync", payload, pgqueue.WithUniqueKey("user:123")) +``` + +## Worker Options + +```go +worker := pgqueue.NewWorker(pool, + pgqueue.WithConcurrency(20), // max concurrent jobs (default: 10) + pgqueue.WithPollInterval(3*time.Second), // fallback poll interval (default: 5s) + pgqueue.WithRetryBaseWait(2*time.Second), // retry backoff base (default: 1s) +) +``` + +## Job Lifecycle + +``` +pending ──▶ running ──▶ completed + │ + ▼ + pending (retry with backoff) + │ + ▼ + dead (max retries exceeded) +``` + +## Retry Strategy + +Failed jobs are retried with exponential backoff: + +| Attempt | Delay (base = 1s) | +|---------|-------------------| +| 1 | 1s | +| 2 | 2s | +| 3 | 4s | + +After `max_retries` (default: 3), the job moves to `dead` state. + +## Schema + +`pgqueue.Migrate` creates the following: + +- **`pgqueue_jobs`** table +- **Dequeue index** — partial index on `(queue, run_at) WHERE state = 'pending'` +- **Unique index** — `(queue, unique_key) WHERE unique_key IS NOT NULL AND state NOT IN ('completed', 'dead')` +- **NOTIFY trigger** — automatically notifies workers on INSERT + +## License + +[MIT](./LICENSE) diff --git a/client.go b/client.go new file mode 100644 index 0000000..2bf8c4f --- /dev/null +++ b/client.go @@ -0,0 +1,84 @@ +package pgqueue + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5" +) + +const enqueueQuery = ` +INSERT INTO pgqueue_jobs (queue, payload, state, run_at, max_retries, unique_key) +VALUES ($1, $2, $3, $4, $5, $6) +RETURNING id, queue, payload, state, run_at, attempts, max_retries, unique_key, last_error, created_at, updated_at +` + +// Querier is the minimal interface required to execute queries that return rows. +// Satisfied by *pgxpool.Pool, *pgx.Conn, and pgx.Tx. +type Querier interface { + Executor + QueryRow(ctx context.Context, sql string, args ...any) pgx.Row +} + +// Client enqueues jobs into the queue. +type Client struct { + db Querier +} + +// NewClient creates a new Client. +func NewClient(db Querier) *Client { + return &Client{db: db} +} + +// Enqueue adds a new job to the specified queue. +func (c *Client) Enqueue(ctx context.Context, queue string, payload []byte, opts ...EnqueueOption) (*Job, error) { + return enqueue(ctx, c.db, queue, payload, opts...) +} + +// EnqueueTx adds a new job within an existing transaction. +// This enables the Outbox pattern — enqueue and business logic in the same transaction. +func (c *Client) EnqueueTx( + ctx context.Context, tx Querier, queue string, payload []byte, opts ...EnqueueOption, +) (*Job, error) { + return enqueue(ctx, tx, queue, payload, opts...) +} + +func enqueue(ctx context.Context, q Querier, queue string, payload []byte, opts ...EnqueueOption) (*Job, error) { + p := defaultEnqueueParams() + for _, opt := range opts { + opt(&p) + } + + runAt := p.runAt + if runAt.IsZero() { + runAt = time.Now() + } + + var job Job + err := q.QueryRow(ctx, enqueueQuery, + queue, + payload, + JobStatePending, + runAt, + p.maxRetries, + p.uniqueKey, + ).Scan( + &job.ID, + &job.Queue, + &job.Payload, + &job.State, + &job.RunAt, + &job.Attempts, + &job.MaxRetries, + &job.UniqueKey, + &job.LastError, + &job.CreatedAt, + &job.UpdatedAt, + ) + if err != nil { + return nil, fmt.Errorf("pgqueue: enqueue: %w", err) + } + + return &job, nil +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..52684ff --- /dev/null +++ b/client_test.go @@ -0,0 +1,169 @@ +package pgqueue_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + + "github.com/mickamy/pgqueue" +) + +type stubRow struct { + scanFn func(dest ...any) error +} + +func (s *stubRow) Scan(dest ...any) error { + return s.scanFn(dest...) +} + +type stubQuerier struct { + execFn func(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) + queryRowFn func(ctx context.Context, sql string, args ...any) pgx.Row +} + +func (s *stubQuerier) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) { + return s.execFn(ctx, sql, arguments...) +} + +func (s *stubQuerier) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row { + return s.queryRowFn(ctx, sql, args...) +} + +func newSuccessQuerier(t *testing.T) *stubQuerier { + t.Helper() + now := time.Now() + return &stubQuerier{ + execFn: func(_ context.Context, _ string, _ ...any) (pgconn.CommandTag, error) { + return pgconn.NewCommandTag(""), nil + }, + queryRowFn: func(_ context.Context, _ string, args ...any) pgx.Row { + return &stubRow{ + scanFn: func(dest ...any) error { + *mustAs[*int64](t, dest[0]) = 1 + *mustAs[*string](t, dest[1]) = mustAs[string](t, args[0]) + *mustAs[*[]byte](t, dest[2]) = mustAs[[]byte](t, args[1]) + *mustAs[*pgqueue.JobState](t, dest[3]) = mustAs[pgqueue.JobState](t, args[2]) + *mustAs[*time.Time](t, dest[4]) = mustAs[time.Time](t, args[3]) + *mustAs[*int](t, dest[5]) = 0 + *mustAs[*int](t, dest[6]) = mustAs[int](t, args[4]) + *mustAs[**string](t, dest[7]) = mustAs[*string](t, args[5]) + *mustAs[**string](t, dest[8]) = nil + *mustAs[*time.Time](t, dest[9]) = now + *mustAs[*time.Time](t, dest[10]) = now + return nil + }, + } + }, + } +} + +func TestClientEnqueue(t *testing.T) { + t.Parallel() + + t.Run("default options", func(t *testing.T) { + t.Parallel() + + q := newSuccessQuerier(t) + client := pgqueue.NewClient(q) + + job, err := client.Enqueue(t.Context(), "email", []byte(`{"to":"a@b.com"}`)) + if err != nil { + t.Fatalf("Enqueue() error = %v", err) + } + + if job.ID != 1 { + t.Errorf("ID = %d, want 1", job.ID) + } + if job.Queue != "email" { + t.Errorf("Queue = %q, want %q", job.Queue, "email") + } + if string(job.Payload) != `{"to":"a@b.com"}` { + t.Errorf("Payload = %q, want %q", job.Payload, `{"to":"a@b.com"}`) + } + if job.State != pgqueue.JobStatePending { + t.Errorf("State = %q, want %q", job.State, pgqueue.JobStatePending) + } + if job.MaxRetries != 3 { + t.Errorf("MaxRetries = %d, want 3", job.MaxRetries) + } + if job.UniqueKey != nil { + t.Errorf("UniqueKey = %v, want nil", job.UniqueKey) + } + }) + + t.Run("with options", func(t *testing.T) { + t.Parallel() + + q := newSuccessQuerier(t) + client := pgqueue.NewClient(q) + + scheduled := time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC) + job, err := client.Enqueue(t.Context(), "report", + []byte(`{}`), + pgqueue.WithRunAt(scheduled), + pgqueue.WithMaxRetries(5), + pgqueue.WithUniqueKey("daily:2026-06-01"), + ) + if err != nil { + t.Fatalf("Enqueue() error = %v", err) + } + + if !job.RunAt.Equal(scheduled) { + t.Errorf("RunAt = %v, want %v", job.RunAt, scheduled) + } + if job.MaxRetries != 5 { + t.Errorf("MaxRetries = %d, want 5", job.MaxRetries) + } + if job.UniqueKey == nil || *job.UniqueKey != "daily:2026-06-01" { + t.Errorf("UniqueKey = %v, want %q", job.UniqueKey, "daily:2026-06-01") + } + }) + + t.Run("query error", func(t *testing.T) { + t.Parallel() + + queryErr := errors.New("unique violation") + q := &stubQuerier{ + execFn: func(_ context.Context, _ string, _ ...any) (pgconn.CommandTag, error) { + return pgconn.NewCommandTag(""), nil + }, + queryRowFn: func(_ context.Context, _ string, _ ...any) pgx.Row { + return &stubRow{ + scanFn: func(_ ...any) error { + return queryErr + }, + } + }, + } + client := pgqueue.NewClient(q) + + _, err := client.Enqueue(t.Context(), "email", []byte(`{}`)) + if err == nil { + t.Fatal("Enqueue() error = nil, want error") + } + if !errors.Is(err, queryErr) { + t.Errorf("Enqueue() error = %v, want wrapping %v", err, queryErr) + } + }) +} + +func TestClientEnqueueTx(t *testing.T) { + t.Parallel() + + q := newSuccessQuerier(t) + tx := newSuccessQuerier(t) + client := pgqueue.NewClient(q) + + job, err := client.EnqueueTx(t.Context(), tx, "webhook", []byte(`{"url":"https://example.com"}`)) + if err != nil { + t.Fatalf("EnqueueTx() error = %v", err) + } + + if job.Queue != "webhook" { + t.Errorf("Queue = %q, want %q", job.Queue, "webhook") + } +} diff --git a/export_test.go b/export_test.go new file mode 100644 index 0000000..fbe0a4c --- /dev/null +++ b/export_test.go @@ -0,0 +1,35 @@ +package pgqueue + +import "time" + +// EnqueueParamsResult holds the result of applying enqueue options. +// Exported only for testing. +type EnqueueParamsResult struct { + RunAt time.Time + MaxRetries int + UniqueKey *string +} + +// ApplyEnqueueOptions applies options to default params and returns the result. +func ApplyEnqueueOptions(opts ...EnqueueOption) EnqueueParamsResult { + p := defaultEnqueueParams() + for _, opt := range opts { + opt(&p) + } + return EnqueueParamsResult{ + RunAt: p.runAt, + MaxRetries: p.maxRetries, + UniqueKey: p.uniqueKey, + } +} + +// Worker internals exported for testing. +var ( + DequeueOne = dequeueOne + CompleteJob = completeJob + FailJob = failJob + MarkDead = markDead + RetryDelay = retryDelay + NotifyChannel = notifyChannel + ErrNoJobs = errNoJobs +) diff --git a/go.mod b/go.mod index 6c72482..f78524e 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,13 @@ module github.com/mickamy/pgqueue go 1.25.0 + +require github.com/jackc/pgx/v5 v5.8.0 + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..87a6c8a --- /dev/null +++ b/go.sum @@ -0,0 +1,26 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/job.go b/job.go new file mode 100644 index 0000000..9de2e32 --- /dev/null +++ b/job.go @@ -0,0 +1,71 @@ +package pgqueue + +import "time" + +// JobState represents the lifecycle state of a job. +type JobState string + +const ( + JobStatePending JobState = "pending" + JobStateRunning JobState = "running" + JobStateCompleted JobState = "completed" + JobStateFailed JobState = "failed" + JobStateDead JobState = "dead" +) + +// Job represents a unit of work in the queue. +type Job struct { + ID int64 + Queue string + Payload []byte + State JobState + RunAt time.Time + Attempts int + MaxRetries int + UniqueKey *string + LastError *string + CreatedAt time.Time + UpdatedAt time.Time +} + +// enqueueParams holds optional parameters for enqueuing a job. +type enqueueParams struct { + runAt time.Time + maxRetries int + uniqueKey *string +} + +const defaultMaxRetries = 3 + +func defaultEnqueueParams() enqueueParams { + return enqueueParams{ + maxRetries: defaultMaxRetries, + } +} + +// EnqueueOption configures how a job is enqueued. +type EnqueueOption func(*enqueueParams) + +// WithRunAt schedules the job to run at a specific time. +// If not set, the job runs immediately. +func WithRunAt(t time.Time) EnqueueOption { + return func(p *enqueueParams) { + p.runAt = t + } +} + +// WithMaxRetries sets the maximum number of retry attempts. +// Defaults to 3. +func WithMaxRetries(n int) EnqueueOption { + return func(p *enqueueParams) { + p.maxRetries = n + } +} + +// WithUniqueKey sets a deduplication key for the job. +// Only one active job (not completed or dead) with the same queue and unique key can exist. +func WithUniqueKey(key string) EnqueueOption { + return func(p *enqueueParams) { + p.uniqueKey = &key + } +} diff --git a/job_test.go b/job_test.go new file mode 100644 index 0000000..b2b591a --- /dev/null +++ b/job_test.go @@ -0,0 +1,103 @@ +package pgqueue_test + +import ( + "testing" + "time" + + "github.com/mickamy/pgqueue" +) + +func TestJobState(t *testing.T) { + t.Parallel() + + tests := []struct { + state pgqueue.JobState + want string + }{ + {pgqueue.JobStatePending, "pending"}, + {pgqueue.JobStateRunning, "running"}, + {pgqueue.JobStateCompleted, "completed"}, + {pgqueue.JobStateFailed, "failed"}, + {pgqueue.JobStateDead, "dead"}, + } + + for _, tt := range tests { + t.Run(tt.want, func(t *testing.T) { + t.Parallel() + if got := string(tt.state); got != tt.want { + t.Errorf("JobState = %q, want %q", got, tt.want) + } + }) + } +} + +func TestEnqueueOptionDefaults(t *testing.T) { + t.Parallel() + + result := pgqueue.ApplyEnqueueOptions() + + if result.RunAt != (time.Time{}) { + t.Errorf("default RunAt = %v, want zero value", result.RunAt) + } + if result.MaxRetries != 3 { + t.Errorf("default MaxRetries = %d, want 3", result.MaxRetries) + } + if result.UniqueKey != nil { + t.Errorf("default UniqueKey = %v, want nil", result.UniqueKey) + } +} + +func TestWithRunAt(t *testing.T) { + t.Parallel() + + scheduled := time.Date(2026, 3, 10, 9, 0, 0, 0, time.UTC) + result := pgqueue.ApplyEnqueueOptions(pgqueue.WithRunAt(scheduled)) + + if !result.RunAt.Equal(scheduled) { + t.Errorf("RunAt = %v, want %v", result.RunAt, scheduled) + } +} + +func TestWithMaxRetries(t *testing.T) { + t.Parallel() + + result := pgqueue.ApplyEnqueueOptions(pgqueue.WithMaxRetries(5)) + + if result.MaxRetries != 5 { + t.Errorf("MaxRetries = %d, want 5", result.MaxRetries) + } +} + +func TestWithUniqueKey(t *testing.T) { + t.Parallel() + + result := pgqueue.ApplyEnqueueOptions(pgqueue.WithUniqueKey("user:123")) + + if result.UniqueKey == nil { + t.Fatal("UniqueKey is nil, want non-nil") + } + if *result.UniqueKey != "user:123" { + t.Errorf("UniqueKey = %q, want %q", *result.UniqueKey, "user:123") + } +} + +func TestEnqueueOptionsComposition(t *testing.T) { + t.Parallel() + + scheduled := time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC) + result := pgqueue.ApplyEnqueueOptions( + pgqueue.WithRunAt(scheduled), + pgqueue.WithMaxRetries(10), + pgqueue.WithUniqueKey("order:456"), + ) + + if !result.RunAt.Equal(scheduled) { + t.Errorf("RunAt = %v, want %v", result.RunAt, scheduled) + } + if result.MaxRetries != 10 { + t.Errorf("MaxRetries = %d, want 10", result.MaxRetries) + } + if result.UniqueKey == nil || *result.UniqueKey != "order:456" { + t.Errorf("UniqueKey = %v, want %q", result.UniqueKey, "order:456") + } +} diff --git a/migrate.go b/migrate.go new file mode 100644 index 0000000..7bf7454 --- /dev/null +++ b/migrate.go @@ -0,0 +1,59 @@ +package pgqueue + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5/pgconn" +) + +const schema = ` +CREATE TABLE IF NOT EXISTS pgqueue_jobs ( + id BIGSERIAL PRIMARY KEY, + queue TEXT NOT NULL, + payload BYTEA NOT NULL, + state TEXT NOT NULL DEFAULT 'pending', + run_at TIMESTAMPTZ NOT NULL DEFAULT now(), + attempts INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 3, + unique_key TEXT, + last_error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_pgqueue_jobs_dequeue + ON pgqueue_jobs (queue, run_at) + WHERE state = 'pending'; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_pgqueue_jobs_unique_key + ON pgqueue_jobs (queue, unique_key) + WHERE unique_key IS NOT NULL AND state NOT IN ('completed', 'dead'); + +CREATE OR REPLACE FUNCTION pgqueue_notify() RETURNS trigger AS $$ +BEGIN + PERFORM pg_notify('pgqueue_' || NEW.queue, NEW.id::text); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS pgqueue_jobs_notify ON pgqueue_jobs; +CREATE TRIGGER pgqueue_jobs_notify + AFTER INSERT ON pgqueue_jobs + FOR EACH ROW + EXECUTE FUNCTION pgqueue_notify(); +` + +// Executor is the minimal interface required to execute SQL statements. +// Satisfied by *pgxpool.Pool, *pgx.Conn, and pgx.Tx. +type Executor interface { + Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) +} + +// Migrate creates the pgqueue_jobs table and indexes if they do not exist. +func Migrate(ctx context.Context, exec Executor) error { + if _, err := exec.Exec(ctx, schema); err != nil { + return fmt.Errorf("pgqueue: migrate: %w", err) + } + return nil +} diff --git a/migrate_test.go b/migrate_test.go new file mode 100644 index 0000000..22aa17e --- /dev/null +++ b/migrate_test.go @@ -0,0 +1,59 @@ +package pgqueue_test + +import ( + "context" + "errors" + "testing" + + "github.com/jackc/pgx/v5/pgconn" + + "github.com/mickamy/pgqueue" +) + +type stubExecutor struct { + execFn func(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) +} + +func (s *stubExecutor) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) { + return s.execFn(ctx, sql, arguments...) +} + +func TestMigrate(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + exec := &stubExecutor{ + execFn: func(_ context.Context, sql string, _ ...any) (pgconn.CommandTag, error) { + if sql == "" { + t.Error("expected non-empty SQL") + } + return pgconn.NewCommandTag(""), nil + }, + } + + if err := pgqueue.Migrate(t.Context(), exec); err != nil { + t.Fatalf("Migrate() error = %v, want nil", err) + } + }) + + t.Run("exec error", func(t *testing.T) { + t.Parallel() + + execErr := errors.New("connection refused") + exec := &stubExecutor{ + execFn: func(_ context.Context, _ string, _ ...any) (pgconn.CommandTag, error) { + return pgconn.NewCommandTag(""), execErr + }, + } + + err := pgqueue.Migrate(t.Context(), exec) + if err == nil { + t.Fatal("Migrate() error = nil, want error") + } + if !errors.Is(err, execErr) { + t.Errorf("Migrate() error = %v, want wrapping %v", err, execErr) + } + }) +} diff --git a/test_helper_test.go b/test_helper_test.go new file mode 100644 index 0000000..934bcff --- /dev/null +++ b/test_helper_test.go @@ -0,0 +1,12 @@ +package pgqueue_test + +import "testing" + +func mustAs[T any](t *testing.T, v any) T { + t.Helper() + val, ok := v.(T) + if !ok { + t.Fatalf("type assertion failed: want %T, got %T", val, v) + } + return val +} diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..24baafc --- /dev/null +++ b/worker.go @@ -0,0 +1,271 @@ +package pgqueue + +import ( + "context" + "errors" + "fmt" + "log/slog" + "math" + "sync" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +var ( + errNoHandlers = errors.New("pgqueue: no handlers registered") + errNoJobs = errors.New("pgqueue: no jobs available") +) + +// HandlerFunc processes a job. Return nil on success, or an error to trigger a retry. +type HandlerFunc func(ctx context.Context, job *Job) error + +// WorkerOption configures Worker behavior. +type WorkerOption func(*workerConfig) + +type workerConfig struct { + pollInterval time.Duration + concurrency int + retryBaseWait time.Duration +} + +const ( + defaultPollInterval = 5 * time.Second + defaultConcurrency = 10 + defaultRetryBaseWait = 1 * time.Second +) + +// WithPollInterval sets the fallback polling interval for checking new jobs. +// Default: 5 seconds. +func WithPollInterval(d time.Duration) WorkerOption { + return func(c *workerConfig) { + c.pollInterval = d + } +} + +// WithConcurrency sets the maximum number of jobs processed concurrently. +// Default: 10. +func WithConcurrency(n int) WorkerOption { + return func(c *workerConfig) { + c.concurrency = n + } +} + +// WithRetryBaseWait sets the base wait duration for exponential backoff. +// Actual wait = base * 2^(attempts-1). +// Default: 1 second. +func WithRetryBaseWait(d time.Duration) WorkerOption { + return func(c *workerConfig) { + c.retryBaseWait = d + } +} + +// Worker processes jobs from registered queues. +type Worker struct { + pool *pgxpool.Pool + handlers map[string]HandlerFunc + config workerConfig +} + +// NewWorker creates a new Worker. +func NewWorker(pool *pgxpool.Pool, opts ...WorkerOption) *Worker { + config := workerConfig{ + pollInterval: defaultPollInterval, + concurrency: defaultConcurrency, + retryBaseWait: defaultRetryBaseWait, + } + for _, opt := range opts { + opt(&config) + } + return &Worker{ + pool: pool, + handlers: make(map[string]HandlerFunc), + config: config, + } +} + +// Handle registers a handler for the given queue name. +func (w *Worker) Handle(queue string, handler HandlerFunc) { + w.handlers[queue] = handler +} + +// Start begins processing jobs and blocks until ctx is cancelled. +func (w *Worker) Start(ctx context.Context) error { + if len(w.handlers) == 0 { + return errNoHandlers + } + + queues := make([]string, 0, len(w.handlers)) + for q := range w.handlers { + queues = append(queues, q) + } + + listenConn, err := w.pool.Acquire(ctx) + if err != nil { + return fmt.Errorf("pgqueue: acquire listen connection: %w", err) + } + defer listenConn.Release() + + for _, q := range queues { + channel := pgx.Identifier{notifyChannel(q)}.Sanitize() + if _, err := listenConn.Exec(ctx, "LISTEN "+channel); err != nil { + return fmt.Errorf("pgqueue: listen %q: %w", q, err) + } + } + + sem := make(chan struct{}, w.config.concurrency) + var wg sync.WaitGroup + defer wg.Wait() + + for { + processed := false + for _, queue := range queues { + if err := ctx.Err(); err != nil { + return fmt.Errorf("pgqueue: %w", err) + } + + job, deqErr := dequeueOne(ctx, w.pool, queue) + if deqErr != nil { + if !errors.Is(deqErr, errNoJobs) { + slog.ErrorContext(ctx, "pgqueue: dequeue error", "queue", queue, "error", deqErr) + } + continue + } + + processed = true + + select { + case <-ctx.Done(): + return fmt.Errorf("pgqueue: %w", ctx.Err()) + case sem <- struct{}{}: + } + + wg.Go(func() { + defer func() { <-sem }() + w.processJob(ctx, job) + }) + } + + if processed { + continue + } + + waitCtx, cancel := context.WithTimeout(ctx, w.config.pollInterval) + _, _ = listenConn.Conn().WaitForNotification(waitCtx) + cancel() + + if err := ctx.Err(); err != nil { + return fmt.Errorf("pgqueue: %w", err) + } + } +} + +func (w *Worker) processJob(ctx context.Context, job *Job) { + handler := w.handlers[job.Queue] + + err := safeCall(ctx, handler, job) + if err == nil { + if completeErr := completeJob(ctx, w.pool, job.ID); completeErr != nil { + slog.ErrorContext(ctx, "pgqueue: complete job error", "job_id", job.ID, "error", completeErr) + } + return + } + + if job.Attempts >= job.MaxRetries { + if deadErr := markDead(ctx, w.pool, job.ID, err); deadErr != nil { + slog.ErrorContext(ctx, "pgqueue: mark dead error", "job_id", job.ID, "error", deadErr) + } + return + } + + delay := retryDelay(w.config.retryBaseWait, job.Attempts) + retryAt := time.Now().Add(delay) + if failErr := failJob(ctx, w.pool, job.ID, retryAt, err); failErr != nil { + slog.ErrorContext(ctx, "pgqueue: fail job error", "job_id", job.ID, "error", failErr) + } +} + +func safeCall(ctx context.Context, handler HandlerFunc, job *Job) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("pgqueue: panic: %v", r) + } + }() + return handler(ctx, job) +} + +// notifyChannel returns the PostgreSQL LISTEN/NOTIFY channel name for a queue. +func notifyChannel(queue string) string { + return "pgqueue_" + queue +} + +// retryDelay calculates exponential backoff: base * 2^(attempts-1). +func retryDelay(base time.Duration, attempts int) time.Duration { + if attempts <= 1 { + return base + } + return base * time.Duration(math.Pow(2, float64(attempts-1))) +} + +const dequeueOneQuery = ` +UPDATE pgqueue_jobs +SET state = $1, attempts = attempts + 1, updated_at = now() +WHERE id = ( + SELECT id FROM pgqueue_jobs + WHERE queue = $2 AND state = $3 AND run_at <= now() + ORDER BY run_at + LIMIT 1 + FOR UPDATE SKIP LOCKED +) +RETURNING id, queue, payload, state, run_at, attempts, max_retries, unique_key, last_error, created_at, updated_at +` + +func dequeueOne(ctx context.Context, db Querier, queue string) (*Job, error) { + var job Job + err := db.QueryRow(ctx, dequeueOneQuery, JobStateRunning, queue, JobStatePending).Scan( + &job.ID, &job.Queue, &job.Payload, &job.State, &job.RunAt, + &job.Attempts, &job.MaxRetries, &job.UniqueKey, &job.LastError, + &job.CreatedAt, &job.UpdatedAt, + ) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, errNoJobs + } + return nil, fmt.Errorf("pgqueue: dequeue: %w", err) + } + return &job, nil +} + +func completeJob(ctx context.Context, db Executor, id int64) error { + _, err := db.Exec(ctx, + `UPDATE pgqueue_jobs SET state = $1, updated_at = now() WHERE id = $2`, + JobStateCompleted, id, + ) + if err != nil { + return fmt.Errorf("pgqueue: complete: %w", err) + } + return nil +} + +func failJob(ctx context.Context, db Executor, id int64, retryAt time.Time, lastErr error) error { + _, err := db.Exec(ctx, + `UPDATE pgqueue_jobs SET state = $1, run_at = $2, last_error = $3, updated_at = now() WHERE id = $4`, + JobStatePending, retryAt, lastErr.Error(), id, + ) + if err != nil { + return fmt.Errorf("pgqueue: fail: %w", err) + } + return nil +} + +func markDead(ctx context.Context, db Executor, id int64, lastErr error) error { + _, err := db.Exec(ctx, + `UPDATE pgqueue_jobs SET state = $1, last_error = $2, updated_at = now() WHERE id = $3`, + JobStateDead, lastErr.Error(), id, + ) + if err != nil { + return fmt.Errorf("pgqueue: mark dead: %w", err) + } + return nil +} diff --git a/worker_test.go b/worker_test.go new file mode 100644 index 0000000..140ff3d --- /dev/null +++ b/worker_test.go @@ -0,0 +1,261 @@ +package pgqueue_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + + "github.com/mickamy/pgqueue" +) + +func TestNotifyChannel(t *testing.T) { + t.Parallel() + + tests := []struct { + queue string + want string + }{ + {"email", "pgqueue_email"}, + {"webhook", "pgqueue_webhook"}, + {"report_daily", "pgqueue_report_daily"}, + } + + for _, tt := range tests { + t.Run(tt.queue, func(t *testing.T) { + t.Parallel() + if got := pgqueue.NotifyChannel(tt.queue); got != tt.want { + t.Errorf("NotifyChannel(%q) = %q, want %q", tt.queue, got, tt.want) + } + }) + } +} + +func TestRetryDelay(t *testing.T) { + t.Parallel() + + base := 1 * time.Second + tests := []struct { + name string + attempts int + want time.Duration + }{ + {"zero attempts", 0, 1 * time.Second}, + {"first attempt", 1, 1 * time.Second}, + {"second attempt", 2, 2 * time.Second}, + {"third attempt", 3, 4 * time.Second}, + {"fourth attempt", 4, 8 * time.Second}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + if got := pgqueue.RetryDelay(base, tt.attempts); got != tt.want { + t.Errorf("RetryDelay(%v, %d) = %v, want %v", base, tt.attempts, got, tt.want) + } + }) + } +} + +func TestDequeueOne(t *testing.T) { + t.Parallel() + + t.Run("returns job", func(t *testing.T) { + t.Parallel() + + now := time.Now() + q := &stubQuerier{ + execFn: func(_ context.Context, _ string, _ ...any) (pgconn.CommandTag, error) { + return pgconn.NewCommandTag(""), nil + }, + queryRowFn: func(_ context.Context, _ string, _ ...any) pgx.Row { + return &stubRow{ + scanFn: func(dest ...any) error { + *mustAs[*int64](t, dest[0]) = 42 + *mustAs[*string](t, dest[1]) = "email" + *mustAs[*[]byte](t, dest[2]) = []byte(`{"to":"a@b.com"}`) + *mustAs[*pgqueue.JobState](t, dest[3]) = pgqueue.JobStateRunning + *mustAs[*time.Time](t, dest[4]) = now + *mustAs[*int](t, dest[5]) = 1 + *mustAs[*int](t, dest[6]) = 3 + *mustAs[**string](t, dest[7]) = nil + *mustAs[**string](t, dest[8]) = nil + *mustAs[*time.Time](t, dest[9]) = now + *mustAs[*time.Time](t, dest[10]) = now + return nil + }, + } + }, + } + + job, err := pgqueue.DequeueOne(t.Context(), q, "email") + if err != nil { + t.Fatalf("DequeueOne() error = %v", err) + } + if job == nil { + t.Fatal("DequeueOne() returned nil job") + } + if job.ID != 42 { + t.Errorf("ID = %d, want 42", job.ID) + } + if job.Queue != "email" { + t.Errorf("Queue = %q, want %q", job.Queue, "email") + } + if job.State != pgqueue.JobStateRunning { + t.Errorf("State = %q, want %q", job.State, pgqueue.JobStateRunning) + } + }) + + t.Run("no rows returns errNoJobs", func(t *testing.T) { + t.Parallel() + + q := &stubQuerier{ + execFn: func(_ context.Context, _ string, _ ...any) (pgconn.CommandTag, error) { + return pgconn.NewCommandTag(""), nil + }, + queryRowFn: func(_ context.Context, _ string, _ ...any) pgx.Row { + return &stubRow{ + scanFn: func(_ ...any) error { + return pgx.ErrNoRows + }, + } + }, + } + + job, err := pgqueue.DequeueOne(t.Context(), q, "email") + if !errors.Is(err, pgqueue.ErrNoJobs) { + t.Fatalf("DequeueOne() error = %v, want %v", err, pgqueue.ErrNoJobs) + } + if job != nil { + t.Errorf("DequeueOne() = %v, want nil", job) + } + }) + + t.Run("query error", func(t *testing.T) { + t.Parallel() + + dbErr := errors.New("connection lost") + q := &stubQuerier{ + execFn: func(_ context.Context, _ string, _ ...any) (pgconn.CommandTag, error) { + return pgconn.NewCommandTag(""), nil + }, + queryRowFn: func(_ context.Context, _ string, _ ...any) pgx.Row { + return &stubRow{ + scanFn: func(_ ...any) error { + return dbErr + }, + } + }, + } + + _, err := pgqueue.DequeueOne(t.Context(), q, "email") + if err == nil { + t.Fatal("DequeueOne() error = nil, want error") + } + if !errors.Is(err, dbErr) { + t.Errorf("DequeueOne() error = %v, want wrapping %v", err, dbErr) + } + }) +} + +func TestCompleteJob(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + exec := &stubExecutor{ + execFn: func(_ context.Context, _ string, args ...any) (pgconn.CommandTag, error) { + state := mustAs[pgqueue.JobState](t, args[0]) + if state != pgqueue.JobStateCompleted { + t.Errorf("state = %q, want %q", state, pgqueue.JobStateCompleted) + } + id := mustAs[int64](t, args[1]) + if id != 42 { + t.Errorf("id = %d, want 42", id) + } + return pgconn.NewCommandTag("UPDATE 1"), nil + }, + } + + if err := pgqueue.CompleteJob(t.Context(), exec, 42); err != nil { + t.Fatalf("CompleteJob() error = %v", err) + } + }) + + t.Run("exec error", func(t *testing.T) { + t.Parallel() + + dbErr := errors.New("connection lost") + exec := &stubExecutor{ + execFn: func(_ context.Context, _ string, _ ...any) (pgconn.CommandTag, error) { + return pgconn.NewCommandTag(""), dbErr + }, + } + + err := pgqueue.CompleteJob(t.Context(), exec, 42) + if !errors.Is(err, dbErr) { + t.Errorf("CompleteJob() error = %v, want wrapping %v", err, dbErr) + } + }) +} + +func TestFailJob(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + retryAt := time.Now().Add(2 * time.Second) + handlerErr := errors.New("temporary failure") + + exec := &stubExecutor{ + execFn: func(_ context.Context, _ string, args ...any) (pgconn.CommandTag, error) { + state := mustAs[pgqueue.JobState](t, args[0]) + if state != pgqueue.JobStatePending { + t.Errorf("state = %q, want %q", state, pgqueue.JobStatePending) + } + lastError := mustAs[string](t, args[2]) + if lastError != "temporary failure" { + t.Errorf("last_error = %q, want %q", lastError, "temporary failure") + } + return pgconn.NewCommandTag("UPDATE 1"), nil + }, + } + + if err := pgqueue.FailJob(t.Context(), exec, 42, retryAt, handlerErr); err != nil { + t.Fatalf("FailJob() error = %v", err) + } + }) +} + +func TestMarkDead(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + handlerErr := errors.New("permanent failure") + + exec := &stubExecutor{ + execFn: func(_ context.Context, _ string, args ...any) (pgconn.CommandTag, error) { + state := mustAs[pgqueue.JobState](t, args[0]) + if state != pgqueue.JobStateDead { + t.Errorf("state = %q, want %q", state, pgqueue.JobStateDead) + } + lastError := mustAs[string](t, args[1]) + if lastError != "permanent failure" { + t.Errorf("last_error = %q, want %q", lastError, "permanent failure") + } + return pgconn.NewCommandTag("UPDATE 1"), nil + }, + } + + if err := pgqueue.MarkDead(t.Context(), exec, 42, handlerErr); err != nil { + t.Fatalf("MarkDead() error = %v", err) + } + }) +}