Skip to content

mickamy/pgqueue

Repository files navigation

pgqueue

PostgreSQL-only background job queue for Go. No Redis, no SQS — just your existing PostgreSQL.

Features

  • No additional infrastructure — uses PostgreSQL as the broker
  • LISTEN/NOTIFY — real-time job pickup without polling
  • SELECT ... FOR UPDATE SKIP LOCKED — lock-free concurrent dequeue
  • Transactional enqueue — Outbox pattern with EnqueueTx
  • At-least-once delivery — exponential backoff retry with dead letter
  • Job deduplicationunique_key prevents duplicate active jobs
  • Scheduled jobsWithRunAt for future execution

Install

go get github.com/mickamy/pgqueue

Quick Start

package main

import (
	"context"
	"fmt"
	"log"
	"os/signal"
	"syscall"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/mickamy/pgqueue"
)

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	pool, err := pgxpool.New(ctx, "postgres://localhost:5432/mydb")
	if err != nil {
		log.Fatal(err)
	}
	defer pool.Close()

	// Create table and indexes
	if err := pgqueue.Migrate(ctx, pool); err != nil {
		log.Fatal(err)
	}

	// Enqueue a job
	client := pgqueue.NewClient(pool)
	job, err := client.Enqueue(ctx, "email", []byte(`{"to":"user@example.com"}`))
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("enqueued job %d\n", job.ID)

	// Process jobs
	worker := pgqueue.NewWorker(pool)
	worker.Handle("email", func(ctx context.Context, job *pgqueue.Job) error {
		fmt.Printf("processing job %d: %s\n", job.ID, job.Payload)
		return nil
	})

	if err := worker.Start(ctx); err != nil {
		log.Fatal(err)
	}
}

Transactional Enqueue (Outbox Pattern)

Enqueue a job within the same transaction as your business logic. The job is only visible after the transaction commits.

tx, err := pool.Begin(ctx)
if err != nil {
	return err
}
defer tx.Rollback(ctx)

// Business logic
_, err = tx.Exec(ctx, "INSERT INTO orders (id, total) VALUES ($1, $2)", orderID, total)
if err != nil {
	return err
}

// Enqueue in the same transaction
client := pgqueue.NewClient(pool)
_, err = client.EnqueueTx(ctx, tx, "send_receipt", []byte(`{"order_id":"123"}`))
if err != nil {
	return err
}

return tx.Commit(ctx)

Enqueue Options

// Schedule for later
client.Enqueue(ctx, "report", payload, pgqueue.WithRunAt(time.Now().Add(24*time.Hour)))

// Custom retry count
client.Enqueue(ctx, "webhook", payload, pgqueue.WithMaxRetries(5))

// Deduplication
client.Enqueue(ctx, "sync", payload, pgqueue.WithUniqueKey("user:123"))

Worker Options

worker := pgqueue.NewWorker(pool,
	pgqueue.WithConcurrency(20),              // max concurrent jobs (default: 10)
	pgqueue.WithPollInterval(3*time.Second),   // fallback poll interval (default: 5s)
	pgqueue.WithRetryBaseWait(2*time.Second),  // retry backoff base (default: 1s)
)

Job Lifecycle

pending  ──▶  running  ──▶  completed
                │
                ▼
             pending   (retry with backoff)
                │
                ▼
              dead     (max retries exceeded)

Retry Strategy

Failed jobs are retried with exponential backoff:

Attempt Delay (base = 1s)
1 1s
2 2s
3 4s

After max_retries (default: 3), the job moves to dead state.

Schema

pgqueue.Migrate creates the following:

  • pgqueue_jobs table
  • Dequeue index — partial index on (queue, run_at) WHERE state = 'pending'
  • Unique index(queue, unique_key) WHERE unique_key IS NOT NULL AND state NOT IN ('completed', 'dead')
  • NOTIFY trigger — automatically notifies workers on INSERT

License

MIT

About

PostgreSQL-only background job queue for Go. No Redis, no SQS — just your existing PostgreSQL.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors