From 2f9fe7138028ff989c07b4c261bf42724a26f9f8 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 11 May 2026 16:15:03 +0000 Subject: [PATCH] docs: update subagents docs for PgmqBus (PR #21) - Add PgmqBus section to api-reference/pipecat-subagents/bus.mdx - Update fundamentals/agent-bus.mdx to list PgmqBus - Update learn/distributed-agents.mdx with PgmqBus setup guide --- subagents/fundamentals/agent-bus.mdx | 27 +++++++++++++++++++- subagents/learn/distributed-agents.mdx | 34 +++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/subagents/fundamentals/agent-bus.mdx b/subagents/fundamentals/agent-bus.mdx index 06501163..146ab7eb 100644 --- a/subagents/fundamentals/agent-bus.mdx +++ b/subagents/fundamentals/agent-bus.mdx @@ -11,7 +11,7 @@ Think of it as an internal event bus -- agents publish messages and other agents ## Bus implementations -Pipecat Subagents provides two bus implementations: +Pipecat Subagents provides three bus implementations: ### AsyncQueueBus (local) @@ -42,6 +42,31 @@ All processes that share the same Redis channel can exchange messages. The progr `RedisBus` requires the `redis` extra: `uv add "pipecat-ai-subagents[redis]"` +### PgmqBus (distributed) + +An alternative distributed bus backed by PGMQ (PostgreSQL Message Queue). Each instance creates its own queue and broadcasts to peer queues. + +```python +from pgmq.async_queue import PGMQueue +from pipecat_subagents.bus.network.pgmq import PgmqBus + +pgmq = PGMQueue( + host="localhost", + port="5432", + database="postgres", + username="postgres", + password="...", + pool_size=4, +) +await pgmq.init() +bus = PgmqBus(pgmq=pgmq, channel="pipecat:my-app") +runner = AgentRunner(bus=bus) +``` + + + `PgmqBus` requires the `pgmq` extra: `uv add "pipecat-ai-subagents[pgmq]"` + + ## Message types Messages on the bus fall into three categories: diff --git a/subagents/learn/distributed-agents.mdx b/subagents/learn/distributed-agents.mdx index 9a0d1e48..51bf25f4 100644 --- a/subagents/learn/distributed-agents.mdx +++ b/subagents/learn/distributed-agents.mdx @@ -13,7 +13,11 @@ Distributed agents are agents connected to the **same bus** but running in diffe Agents](/subagents/learn/proxy-agents) instead. -## Setting up RedisBus +## Setting up a distributed bus + +Pipecat Subagents provides two distributed bus implementations: RedisBus and PgmqBus. Choose based on your infrastructure. + +### RedisBus Each process creates its own `AgentRunner` with a `RedisBus` connected to the same Redis channel: @@ -31,6 +35,30 @@ All runners sharing the same `channel` can exchange messages. Agents discover ea Install the Redis extra: `uv add "pipecat-ai-subagents[redis]"` +### PgmqBus + +Alternatively, use `PgmqBus` backed by PostgreSQL Message Queue: + +```python +from pgmq.async_queue import PGMQueue +from pipecat_subagents.bus.network.pgmq import PgmqBus +from pipecat_subagents.runner import AgentRunner + +pgmq = PGMQueue( + host="localhost", + port="5432", + database="postgres", + username="postgres", + password="...", + pool_size=4, +) +await pgmq.init() +bus = PgmqBus(pgmq=pgmq, channel="pipecat:my-app") +runner = AgentRunner(bus=bus, handle_sigint=True) +``` + +Install the PGMQ extra: `uv add "pipecat-ai-subagents[pgmq]"` + ## Example: distributed handoff This example splits the two-agent handoff across separate processes. The main agent handles transport on one machine, and LLM agents run independently on other machines. @@ -157,6 +185,6 @@ Runners exchange registry information automatically over the shared bus. To get ## Considerations -- **Latency**: Redis adds network overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the Redis instance. -- **Serialization**: `RedisBus` serializes messages to JSON. Custom frame types need to be registered with the serializer. +- **Latency**: Network buses add overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the bus server (Redis or PostgreSQL). +- **Serialization**: Both `RedisBus` and `PgmqBus` serialize messages to JSON. Custom frame types need to be registered with the serializer. - **Single channel**: All agents on the same channel see all messages. Use different channels for different sessions or applications.