Async Keyed Leased Queue - An asyncio-friendly in-memory queue combining FIFO semantics, dictionary-style keyed access, and lease-based exclusive processing.
- FIFO Queue Semantics: Process items in order with
get() - Keyed Access: Target specific items with
take(key) - Lease-Based Processing: Exclusive access with explicit
ack/release - O(1) Operations: Constant-time enqueue, dequeue, and key-based removal
- Automatic Timeouts: Optional lease expiration with auto-retry
- Blocking Behavior: Async waiting for items or specific keys
- Type Safe: Full type hints with strict mypy compliance
- Zero Dependencies: Pure Python stdlib implementation
pip install leasedkeyqimport asyncio
from leasedkeyq import LeasedKeyQueue
async def main():
# Create queue with 30-second lease timeout
async with LeasedKeyQueue[str, dict](default_lease_timeout=30.0) as queue:
# Producer: add items
await queue.put("task-1", {"action": "process", "data": 123})
await queue.put("task-2", {"action": "send", "data": 456})
# Consumer: FIFO consumption
key, value, lease = await queue.get()
print(f"Processing {key}: {value}")
try:
# Do work...
await process(value)
# Success: acknowledge
await queue.ack(lease)
except Exception:
# Failure: release for retry
await queue.release(lease, requeue_front=True)
# Targeted consumption
key, value, lease = await queue.take("task-2")
print(f"Specifically got {key}: {value}")
await queue.ack(lease)
asyncio.run(main())Each key is in exactly one state:
- ABSENT: Not in queue
- AVAILABLE: Ready for consumption
- IN_FLIGHT: Leased to a consumer
Producer API
await queue.put(key, value, if_in_flight="update") # update|reject|bufferConsumer API
# FIFO consumption
key, value, lease = await queue.get(timeout=10.0, lease_timeout=30.0)
# Keyed consumption
key, value, lease = await queue.take("specific-key", timeout=10.0)Lease Control
await queue.ack(lease) # Permanent removal
await queue.release(lease, requeue_front=True) # RetryIntrospection
value = await queue.peek("key")
has_key = await queue.contains("key")
keys = await queue.available_keys()
inflight = await queue.inflight_keys()
size = await queue.qsize()Control behavior when putting a key that's currently leased:
update(default): Update the in-flight valuereject: RaiseKeyAlreadyInFlightErrorbuffer: Enqueue a second copy to available
await queue.put("key", new_value, if_in_flight="update")Prevent stuck items with automatic lease expiration:
# Queue-wide default timeout
queue = LeasedKeyQueue[str, int](default_lease_timeout=30.0)
await queue.start()
# Per-lease override
key, value, lease = await queue.get(lease_timeout=60.0)Expired leases are automatically released to the front of the queue for retry.
| Operation | Time Complexity |
|---|---|
put() |
O(1) |
get() |
O(1) |
take(key) |
O(1) |
ack() |
O(1) |
release() |
O(1) |
Achieved through intrusive doubly-linked list with direct node references.
- Task Queues: FIFO processing with retry on failure
- Job Scheduling: Target specific jobs while maintaining order
- Rate Limiting: Lease-based exclusive access prevents double-processing
- Event Processing: Handle events by ID with guaranteed exclusivity
- Workflow Engines: Track in-flight work with timeout-based recovery
# Install with dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Type checking
mypy src/
# Linting
ruff check src/ tests/MIT License - see LICENSE file for details.
Contributions welcome! Please open an issue or PR on GitHub.