Skip to content

Commit 963b96c

Browse files
brettchienclaude
andcommitted
refactor(dispatch): per-mode consumer idle timeout (10s for per-message)
Per-message mode (cap=1) doesn't benefit from holding consumers across message gaps — there is no batch window to preserve — so a 5-minute idle timeout left consumer tasks lingering long after they were useful. Add PER_MESSAGE_CONSUMER_IDLE_TIMEOUT (10s), wire it through main.rs based on each adapter's message_processing_mode, and drop the unused Dispatcher::new wrapper. By Little's Law (steady-state idle count = arrival rate × idle window), this cuts per-message-mode idle dispatcher footprint by 30x for the same arrival rate while keeping batched modes' 5-minute window so between-trigger lanes aren't torn down on every message. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 93765a1 commit 963b96c

2 files changed

Lines changed: 37 additions & 38 deletions

File tree

src/dispatch.rs

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,22 @@ impl DispatchTarget for AdapterRouter {
169169
// Dispatcher
170170
// ---------------------------------------------------------------------------
171171

172-
/// Default idle timeout for per-thread consumer tasks. When no message arrives
173-
/// within this window the consumer exits, allowing `per_thread` map cleanup on
174-
/// the next `submit` (via `SendError` → `try_evict_locked`). Prevents unbounded
175-
/// task/memory growth from one-shot thread keys (e.g. Slack non-thread messages).
172+
/// Default idle timeout for per-thread consumer tasks in batched modes (Thread / Lane).
173+
/// When no message arrives within this window the consumer exits, allowing `per_thread`
174+
/// map cleanup on the next `submit` (via `SendError` → `try_evict_locked`). Prevents
175+
/// unbounded task/memory growth from one-shot thread keys (e.g. Slack non-thread messages).
176+
///
177+
/// Batched modes need a longer window so a lane that's between trigger arrivals isn't
178+
/// torn down and respawned on every message.
176179
pub const DEFAULT_CONSUMER_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
177180

181+
/// Idle timeout for per-message mode (cap=1, no batching). Per-message dispatchers
182+
/// don't benefit from holding consumers across message gaps — there is no batch
183+
/// window to preserve — so a much shorter timeout reduces idle resource footprint
184+
/// from one-shot thread keys (Little's Law: steady-state idle count = arrival rate
185+
/// × idle window).
186+
pub const PER_MESSAGE_CONSUMER_IDLE_TIMEOUT: Duration = Duration::from_secs(10);
187+
178188
/// Per-thread message dispatcher for batched mode.
179189
///
180190
/// Constructed once in `main.rs` and shared via `Arc`. Platform adapters call
@@ -194,23 +204,9 @@ pub struct Dispatcher {
194204
}
195205

196206
impl Dispatcher {
197-
pub fn new(
198-
target: Arc<dyn DispatchTarget>,
199-
max_buffered_messages: usize,
200-
max_batch_tokens: usize,
201-
grouping: BatchGrouping,
202-
) -> Self {
203-
Self::with_idle_timeout(
204-
target,
205-
max_buffered_messages,
206-
max_batch_tokens,
207-
grouping,
208-
DEFAULT_CONSUMER_IDLE_TIMEOUT,
209-
)
210-
}
211-
212-
/// Like `new`, but with a custom consumer idle timeout. Test-only knob —
213-
/// production code should use `new` (which applies `DEFAULT_CONSUMER_IDLE_TIMEOUT`).
207+
/// Construct a dispatcher with an explicit consumer idle timeout. Per-mode
208+
/// callers in `main.rs` pass `PER_MESSAGE_CONSUMER_IDLE_TIMEOUT` for cap=1
209+
/// dispatchers and `DEFAULT_CONSUMER_IDLE_TIMEOUT` for batched modes.
214210
pub fn with_idle_timeout(
215211
target: Arc<dyn DispatchTarget>,
216212
max_buffered_messages: usize,
@@ -1027,7 +1023,7 @@ mod tests {
10271023
crate::config::ReactionsConfig::default(),
10281024
crate::markdown::TableMode::Off,
10291025
));
1030-
Dispatcher::new(router, 10, 24_000, grouping)
1026+
Dispatcher::with_idle_timeout(router, 10, 24_000, grouping, DEFAULT_CONSUMER_IDLE_TIMEOUT)
10311027
}
10321028

10331029
#[tokio::test]
@@ -1398,7 +1394,7 @@ mod tests {
13981394
// whose consumer is still parked but whose rx has been dropped.
13991395
let mock = Arc::new(MockDispatchTarget::new());
14001396
let target: Arc<dyn DispatchTarget> = mock.clone();
1401-
let d = Dispatcher::new(target, 10, 24_000, BatchGrouping::Thread);
1397+
let d = Dispatcher::with_idle_timeout(target, 10, 24_000, BatchGrouping::Thread, DEFAULT_CONSUMER_IDLE_TIMEOUT);
14021398
let adapter: Arc<dyn ChatAdapter> = Arc::new(MockChatAdapter);
14031399

14041400
let key = "mock:T".to_string();

src/main.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -202,19 +202,20 @@ async fn main() -> anyhow::Result<()> {
202202
// Dispatcher is the sole serialization path for all modes. Message = cap 1
203203
// (each message dispatches alone, FIFO). Thread / Lane = configured cap;
204204
// grouping decides whether senders share a buffer or get their own lane.
205-
let (slack_cap, slack_grouping) = match slack_cfg.message_processing_mode {
205+
let (slack_cap, slack_grouping, slack_idle) = match slack_cfg.message_processing_mode {
206206
config::MessageProcessingMode::Message =>
207-
(1, dispatch::BatchGrouping::Thread),
207+
(1, dispatch::BatchGrouping::Thread, dispatch::PER_MESSAGE_CONSUMER_IDLE_TIMEOUT),
208208
config::MessageProcessingMode::Thread =>
209-
(slack_cfg.max_buffered_messages, dispatch::BatchGrouping::Thread),
209+
(slack_cfg.max_buffered_messages, dispatch::BatchGrouping::Thread, dispatch::DEFAULT_CONSUMER_IDLE_TIMEOUT),
210210
config::MessageProcessingMode::Lane =>
211-
(slack_cfg.max_buffered_messages, dispatch::BatchGrouping::Lane),
211+
(slack_cfg.max_buffered_messages, dispatch::BatchGrouping::Lane, dispatch::DEFAULT_CONSUMER_IDLE_TIMEOUT),
212212
};
213-
let slack_dispatcher = Arc::new(dispatch::Dispatcher::new(
213+
let slack_dispatcher = Arc::new(dispatch::Dispatcher::with_idle_timeout(
214214
router.clone(),
215215
slack_cap,
216216
slack_cfg.max_batch_tokens,
217217
slack_grouping,
218+
slack_idle,
218219
));
219220
dispatchers.lock().unwrap().push(slack_dispatcher.clone());
220221
Some(tokio::spawn(async move {
@@ -247,19 +248,20 @@ async fn main() -> anyhow::Result<()> {
247248
let router = router.clone();
248249
let shutdown_rx = shutdown_rx.clone();
249250
info!(url = %gw_cfg.url, "starting gateway adapter");
250-
let (gw_cap, gw_grouping) = match gw_cfg.message_processing_mode {
251+
let (gw_cap, gw_grouping, gw_idle) = match gw_cfg.message_processing_mode {
251252
config::MessageProcessingMode::Message =>
252-
(1, dispatch::BatchGrouping::Thread),
253+
(1, dispatch::BatchGrouping::Thread, dispatch::PER_MESSAGE_CONSUMER_IDLE_TIMEOUT),
253254
config::MessageProcessingMode::Thread =>
254-
(gw_cfg.max_buffered_messages, dispatch::BatchGrouping::Thread),
255+
(gw_cfg.max_buffered_messages, dispatch::BatchGrouping::Thread, dispatch::DEFAULT_CONSUMER_IDLE_TIMEOUT),
255256
config::MessageProcessingMode::Lane =>
256-
(gw_cfg.max_buffered_messages, dispatch::BatchGrouping::Lane),
257+
(gw_cfg.max_buffered_messages, dispatch::BatchGrouping::Lane, dispatch::DEFAULT_CONSUMER_IDLE_TIMEOUT),
257258
};
258-
let gw_dispatcher = Arc::new(dispatch::Dispatcher::new(
259+
let gw_dispatcher = Arc::new(dispatch::Dispatcher::with_idle_timeout(
259260
router.clone(),
260261
gw_cap,
261262
gw_cfg.max_batch_tokens,
262263
gw_grouping,
264+
gw_idle,
263265
));
264266
dispatchers.lock().unwrap().push(gw_dispatcher.clone());
265267
let params = gateway::GatewayParams {
@@ -346,19 +348,20 @@ async fn main() -> anyhow::Result<()> {
346348
"starting discord adapter"
347349
);
348350

349-
let (discord_cap, discord_grouping) = match discord_cfg.message_processing_mode {
351+
let (discord_cap, discord_grouping, discord_idle) = match discord_cfg.message_processing_mode {
350352
config::MessageProcessingMode::Message =>
351-
(1, dispatch::BatchGrouping::Thread),
353+
(1, dispatch::BatchGrouping::Thread, dispatch::PER_MESSAGE_CONSUMER_IDLE_TIMEOUT),
352354
config::MessageProcessingMode::Thread =>
353-
(discord_cfg.max_buffered_messages, dispatch::BatchGrouping::Thread),
355+
(discord_cfg.max_buffered_messages, dispatch::BatchGrouping::Thread, dispatch::DEFAULT_CONSUMER_IDLE_TIMEOUT),
354356
config::MessageProcessingMode::Lane =>
355-
(discord_cfg.max_buffered_messages, dispatch::BatchGrouping::Lane),
357+
(discord_cfg.max_buffered_messages, dispatch::BatchGrouping::Lane, dispatch::DEFAULT_CONSUMER_IDLE_TIMEOUT),
356358
};
357-
let discord_dispatcher = Arc::new(dispatch::Dispatcher::new(
359+
let discord_dispatcher = Arc::new(dispatch::Dispatcher::with_idle_timeout(
358360
router.clone(),
359361
discord_cap,
360362
discord_cfg.max_batch_tokens,
361363
discord_grouping,
364+
discord_idle,
362365
));
363366
dispatchers.lock().unwrap().push(discord_dispatcher.clone());
364367

0 commit comments

Comments
 (0)