Skip to content

v0.26.0

Choose a tag to compare

@janbjorge janbjorge released this 01 Mar 17:50
· 73 commits to main since this release
a0bff06

PgQueuer v0.26.0

Hexagonal Architecture, In-Memory Adapter, and Prometheus Metrics

This release is the largest architectural overhaul since the project's inception. The internal codebase has been restructured around a hexagonal (ports & adapters) architecture, introducing formal protocol boundaries between domain logic, persistence, and infrastructure. This release has no breaking changes -- all public imports and APIs remain backward-compatible.

Highlights

  • Hexagonal architecture with explicit port protocols and layered package structure
  • In-memory adapter for testing and development without PostgreSQL
  • Prometheus metrics module with a FastAPI integration
  • Dependency injection for QueueManager and SchedulerManager
  • Documentation rewrite migrated from Sphinx/ReadTheDocs to MkDocs Material/GitHub Pages
  • PostgreSQL 18 added to the CI test matrix

New Features

Port Protocols (pgqueuer.ports)

Six new typing.Protocol classes define the contracts between core logic and infrastructure. The existing Queries class satisfies all of them via structural subtyping -- no code changes are needed for standard usage.

Protocol Purpose
QueueRepositoryPort Job queue persistence (enqueue, dequeue, log, cancel, heartbeat, etc.)
ScheduleRepositoryPort Cron schedule persistence (insert, fetch, pick, delete)
NotificationPort LISTEN/NOTIFY abstractions (RPS, cancellation, health check events)
SchemaManagementPort DDL operations (install, uninstall, upgrade, introspection)
RepositoryPort Combined protocol -- union of all four above
TracingProtocol Moved from pgqueuer.tracing -- trace publishing and processing

These protocols enable alternative backend implementations and simplify testing with mocks.

In-Memory Adapter (pgqueuer.adapters.inmemory)

A pure-Python drop-in replacement for the PostgreSQL persistence layer:

from pgqueuer import PgQueuer, Job

app = PgQueuer.in_memory()

@app.entrypoint("my_job")
async def handler(job: Job) -> None:
    print(job.payload)
  • InMemoryDriver -- satisfies the Driver protocol with in-process notification delivery
  • InMemoryQueries -- satisfies all four repository port protocols using Python dicts
  • Both classes are exported from the top-level pgqueuer package
  • Use cases: unit tests, CI pipelines, local prototyping, short-lived batch containers

Dependency Injection for Managers

QueueManager and SchedulerManager now accept an optional queries parameter:

# Production (unchanged)
qm = QueueManager(connection=driver)

# Testing with a mock
qm = QueueManager(connection=driver, queries=mock_repository)

When queries is None (the default), Queries(self.connection) is auto-created as before. An optional tracer parameter was also added for injecting a custom TracingProtocol implementation.

Prometheus Metrics (pgqueuer.metrics)

A new library-level metrics module replaces the old Docker-only approach:

from pgqueuer.metrics.prometheus import collect_metrics

content = await collect_metrics(repository)  # Returns Prometheus exposition format
  • collect_metrics(repository) -- collects queue size and log statistics, returns Prometheus text format
  • MetricNames -- configurable metric names (defaults: pgqueuer_queue_count, pgqueuer_logs_count)
  • FastAPI integration: create_metrics_router(repository) returns a ready-to-mount APIRouter with a /metrics endpoint
  • Depends on QueueRepositoryPort, not the concrete Queries class

Deprecations

The following fields on executor parameter dataclasses are deprecated and emit DeprecationWarning when used. The framework still passes real values during the deprecation window, so existing custom executors continue to work without changes. These fields will be removed in a future major version.

EntrypointExecutorParameters:

  • connection, channel, queries, shutdown -- these infrastructure fields are unused by the built-in executors

ScheduleExecutorFactoryParameters:

  • connection, queries, shutdown

If you have custom executors that read self.parameters.connection or self.parameters.queries, plan to restructure that code before the next major release. The deprecation warning will remind you.


Internal Restructure

The flat pgqueuer/ package was reorganized into a hexagonal layout:

Layer Path Contents
domain pgqueuer/domain/ models.py, types.py, errors.py, settings.py
ports pgqueuer/ports/ repository.py, driver.py, tracing.py
core pgqueuer/core/ qm.py, sm.py, executors.py, applications.py, buffers.py, heartbeat.py, ...
adapters pgqueuer/adapters/ drivers/, persistence/, tracing/, cli/, inmemory/

Every original module (pgqueuer/qm.py, pgqueuer/models.py, etc.) is now a backward-compatibility shim that re-exports from the canonical location. All existing imports continue to work.

All internal imports were converted from relative to absolute. Import-linter CI enforcement ensures layer boundaries are respected (domain does not import from adapters/core, ports do not import from adapters/core, core does not import from adapters).


Bug Fixes

  • Benchmark hang resolved -- ThroughputStrategy.run() now uses a grace period with forced cancellation instead of asyncio.gather(), preventing indefinite hangs on shutdown (#528)
  • Silent benchmark timeout failures -- Safe .get() defaults and max(..., 1) guards prevent KeyError and division-by-zero when tqdm never updates
  • Health check task leak -- periodic_health_check_task in QueueManager is now explicitly cancelled when the main loop exits
  • Mypy type checking errors in the metrics module resolved with cast() instead of # type: ignore

Documentation

  • Migrated from Sphinx/ReadTheDocs to MkDocs Material/GitHub Pages with dark/light theme toggle, search, tabbed content, and Mermaid diagram support
  • 25 documentation pages organized into Guides, Reference, Integrations, and Development sections (up from 13 flat files)
  • New guides: Reliability Model, Performance Tuning, Production Deployment, Database Permissions, Troubleshooting
  • README rewritten from ~60 lines to ~375 lines with categorized features, architectural overview, and runnable code examples for common patterns
  • 13 redirect mappings preserve old Sphinx URLs

CI & Infrastructure

  • PostgreSQL 18 added to the test matrix (now testing PG 13-18)
  • Codecov integration -- pytest-cov with XML reporting
  • Import-linter enforces hexagonal architecture boundaries in CI
  • In-memory driver ("mem") added to the benchmark matrix
  • Uniform YAML formatting across all CI and config files

Preparing for the Future

This section covers changes that are not breaking today but are signaled by deprecation warnings, TODO comments, and the ongoing migration plan. Acting on these now will make future upgrades seamless.

Migrate imports away from shim files

All 19 original top-level modules (pgqueuer/qm.py, pgqueuer/models.py, pgqueuer/db.py, etc.) are now thin re-export shims. They work today but are earmarked for removal in a future major version. Start migrating to canonical paths:

# Old (works today via shim, will break in a future major version)
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager
from pgqueuer.db import AsyncpgDriver

# Canonical (future-proof)
from pgqueuer.domain.models import Job
from pgqueuer.core.qm import QueueManager
from pgqueuer.adapters.drivers.asyncpg import AsyncpgDriver

# Safest (top-level package, unlikely to change)
from pgqueuer import QueueManager, Job, AsyncpgDriver

Replace PGChannel with Channel

The PGChannel type alias is pending deprecation (PGChannel = Channel). Switch to Channel now.

Move from set_tracing_class() to injected tracing

The global tracing.TRACER singleton and set_tracing_class() function will be deprecated in favor of constructor injection. The tracer parameter already exists on QueueManager:

# Current (global singleton -- will be deprecated)
from pgqueuer.tracing import set_tracing_class
set_tracing_class(LogfireTracing)

# Future-proof (injected)
qm = QueueManager(connection=driver, tracer=LogfireTracing())

Auto-creation of Queries may require explicit injection

Currently QueueManager(connection=driver) auto-creates Queries(self.connection) when no queries argument is passed. A future version may require explicit injection:

# Current (auto-creates Queries internally)
qm = QueueManager(connection=driver)

# Future-proof (explicit injection)
qm = QueueManager(connection=driver, queries=Queries(driver))

# Or use the PgQueuer facade (will remain convenient)
app = PgQueuer.from_asyncpg_pool(pool)

Scheduled functions will gain a context parameter

A future version will add optional ScheduleContext support for scheduled tasks, similar to how queue entrypoints support accepts_context=True. This will be backward-compatible via arity inspection, but users should be aware the pattern is coming:

# Current
@app.schedule("cleanup", "*/5 * * * *")
async def cleanup(schedule: Schedule) -> None: ...

# Future (optional second parameter)
@app.schedule("cleanup", "*/5 * * * *")
async def cleanup(schedule: Schedule, context: ScheduleContext) -> None: ...

statistics_table_status_type in DBSettings will be removed

This setting and the related PostgreSQL ENUM type are slated for removal in a future release.


Migration Guide

For most users: no changes needed. All public imports (from pgqueuer import QueueManager, Job, Queries, ...) continue to work through backward-compatibility shims. Custom executors that access infrastructure fields on their parameters will continue to work but will emit deprecation warnings.

If you want to adopt the new architecture: You can now import from canonical paths (e.g., from pgqueuer.core.qm import QueueManager, from pgqueuer.ports import RepositoryPort) and implement custom adapters against the port protocols.

If you want to test without PostgreSQL: Use PgQueuer.in_memory() or inject InMemoryQueries directly into a manager.