Skip to content

shyim/go-queue

Repository files navigation

go-queue

A type-safe message queue library for Go with generics. Inspired by asynq and Symfony Messenger.

Messages are plain Go structs — no interfaces to implement, no manual JSON encoding. The library handles serialization, routing, retries, and transport abstraction.

Features

  • GenericsDispatch[T], HandleFunc[T] — fully typed, no interface{}
  • Envelope + Stamps — metadata (delay, retry info, tracing) travels with the message without polluting your structs
  • Pluggable transportsAMQP, PostgreSQL, in-memory, or bring your own
  • Retry with backoff — exponential backoff with jitter, per-message override, configurable strategy
  • Failure handling — callback for permanently failed messages (dead-letter, alerting, replay)
  • Middleware — wrap handler calls for logging, metrics, tracing, validation
  • Graceful shutdown — stops consuming, waits for in-flight messages to finish
  • Per-message timeout — prevent hung handlers from blocking worker slots
  • Panic recovery — panics in handlers become unrecoverable errors, worker stays alive
  • OpenTelemetry — optional tracing and metrics

Install

go get github.com/shyim/go-queue

Quick Start

package main

import (
	"context"
	"fmt"
	"log"
	"os/signal"
	"syscall"

	"github.com/shyim/go-queue"
	"github.com/shyim/go-queue/transport/amqp"
)

type SendEmail struct {
	To      string `json:"to"`
	Subject string `json:"subject"`
	Body    string `json:"body"`
}

type ResizeImage struct {
	URL    string `json:"url"`
	Width  int    `json:"width"`
	Height int    `json:"height"`
}

func main() {
	transport := amqp.NewTransport(amqp.Config{
		DSN:      "amqp://guest:guest@localhost:5672/",
		Exchange: "app",
		Queue:    "default",
	})
	defer transport.Close()

	bus := queue.NewBus()
	bus.AddTransport("async", transport)

	queue.HandleFunc[SendEmail](bus, "async", func(ctx context.Context, msg SendEmail) error {
		fmt.Printf("Sending email to %s: %s\n", msg.To, msg.Subject)
		return nil
	})

	queue.HandleFunc[ResizeImage](bus, "async", func(ctx context.Context, msg ResizeImage) error {
		fmt.Printf("Resizing %s to %dx%d\n", msg.URL, msg.Width, msg.Height)
		return nil
	})

	if err := bus.Setup(context.Background()); err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()
	if err := queue.Dispatch(ctx, bus, SendEmail{
		To:      "user@example.com",
		Subject: "Welcome!",
		Body:    "Hello!",
	}); err != nil {
		log.Fatal(err)
	}

	ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	worker := queue.NewWorker(bus, queue.WorkerConfig{Concurrency: 4})
	if err := worker.Run(ctx); err != nil && err != context.Canceled {
		log.Fatal(err)
	}
}

Dispatch Options

queue.Dispatch(ctx, bus, msg, queue.WithDelay(5*time.Minute))
queue.Dispatch(ctx, bus, msg, queue.WithMaxRetries(10))
queue.Dispatch(ctx, bus, msg, queue.WithQueue("high-priority"))
queue.Dispatch(ctx, bus, msg, queue.WithHeader("x-tenant", "acme"))

Worker Configuration

worker := queue.NewWorker(bus, queue.WorkerConfig{
	Concurrency:     8,
	HandlerTimeout:  30 * time.Second,
	ShutdownTimeout: 60 * time.Second, // default 30s

	RetryStrategy: queue.RetryStrategy{
		MaxRetries: 5,
		Delay:      1 * time.Second,
		Multiplier: 2.0,
		MaxDelay:   1 * time.Minute,
		Jitter:     0.1,
	},

	ErrorHandler: func(ctx context.Context, env *queue.Envelope, err error) {
		slog.Error("message failed", "error", err, "type", env.Type)
	},

	FailureHandler: func(ctx context.Context, env *queue.Envelope, err error) {
		// dead-letter, alert, etc.
	},

	Middleware: []queue.Middleware{myLoggingMiddleware},
})

Middleware

func loggingMiddleware(ctx context.Context, env *queue.Envelope, next func(context.Context, *queue.Envelope) error) error {
	slog.Info("processing", "type", env.Type, "id", env.ID)
	start := time.Now()
	err := next(ctx, env)
	slog.Info("done", "type", env.Type, "duration", time.Since(start), "error", err)
	return err
}

Envelope Context

Handlers can access the full envelope (ID, type, headers, stamps) from the context:

queue.HandleFunc[MyMsg](bus, "async", func(ctx context.Context, msg MyMsg) error {
	env := queue.EnvelopeFromContext(ctx)
	slog.Info("processing", "id", env.ID, "type", env.Type)
	return nil
})

Unrecoverable Errors

queue.HandleFunc[MyMsg](bus, "async", func(ctx context.Context, msg MyMsg) error {
	if msg.Invalid() {
		return queue.Unrecoverable(fmt.Errorf("invalid message: %s", msg.ID))
	}
	return process(msg)
})

Transports

Transport Package
RabbitMQ / LavinMQ transport/amqp
PostgreSQL transport/postgres
In-memory (testing) transport/memory

Custom Transport

Implement the Transport interface to add a new broker:

type Transport interface {
	Send(ctx context.Context, envelope *queue.Envelope) error
	Receive(ctx context.Context) (<-chan *queue.Envelope, error)
	Ack(ctx context.Context, envelope *queue.Envelope) error
	Nack(ctx context.Context, envelope *queue.Envelope, requeue bool) error
	Retry(ctx context.Context, envelope *queue.Envelope) error
}

Optionally implement SetupTransport for infrastructure auto-creation:

type SetupTransport interface {
	Setup(ctx context.Context) error
}

OpenTelemetry

Register once — all dispatch and consume calls are traced automatically:

import queueotel "github.com/shyim/go-queue/middleware/otel"

bus.AddDispatchMiddleware(queueotel.DispatchMiddleware())

worker := queue.NewWorker(bus, queue.WorkerConfig{
	Middleware: []queue.Middleware{
		queueotel.Middleware(),        // tracing
		queueotel.MetricsMiddleware(), // metrics
	},
})

See middleware/otel for details on spans, attributes, and metrics.

Testing

Use the in-memory transport for unit tests:

import "github.com/shyim/go-queue/transport/memory"

bus := queue.NewBus()
transport := memory.NewTransport()
bus.AddTransport("async", transport)

queue.HandleFunc[MyMsg](bus, "async", handler)
queue.Dispatch(ctx, bus, MyMsg{...})

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
worker := queue.NewWorker(bus, queue.WorkerConfig{})
worker.Run(ctx)

acked := transport.Acked()
nacked := transport.Nacked()

About

Type-safe message queue library for Go with generics

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages