Skip to content

Commit afd6fff

Browse files
超渡法師brettchien
authored andcommitted
fix(dispatch): proactive stale-entry cleanup + transparent retry on idle exit
- submit() now checks consumer.is_finished() before using an existing handle, removing stale entries proactively (fixes map leak for one-shot thread keys that never get a second submit) - On SendError, transparently evict + rebuild + retry once instead of surfacing an error to the user (fixes first-message-after-idle being treated as ConsumerDead) - Only report ConsumerDead if the retry also fails (truly unexpected)
1 parent 81850c1 commit afd6fff

1 file changed

Lines changed: 62 additions & 14 deletions

File tree

src/dispatch.rs

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,17 @@ impl Dispatcher {
149149

150150
let (tx, my_generation) = {
151151
let mut map = self.per_thread.lock().unwrap();
152+
153+
// Proactive stale-entry cleanup: if the consumer has exited (idle
154+
// timeout or unexpected), remove the entry so `or_insert_with`
155+
// creates a fresh one. Prevents map leak from one-shot thread keys
156+
// and avoids the first-message-after-idle being treated as an error.
157+
if let Some(handle) = map.get(&thread_key) {
158+
if handle.consumer.is_finished() {
159+
map.remove(&thread_key);
160+
}
161+
}
162+
152163
let entry = map.entry(thread_key.clone()).or_insert_with(|| {
153164
let (tx, rx) = tokio::sync::mpsc::channel(cap);
154165
let consumer = tokio::spawn(consumer_loop(
@@ -172,25 +183,62 @@ impl Dispatcher {
172183
};
173184

174185
if let Err(e) = tx.send(msg).await {
175-
// Consumer has exited — race-safe eviction under lock (§2.5).
186+
// Consumer has exited between our check and the send — race-safe
187+
// eviction under lock (§2.5), then transparent retry once.
176188
{
177189
let mut map = self.per_thread.lock().unwrap();
178190
Self::try_evict_locked(&mut map, &thread_key, my_generation);
179191
}
180192
let failed_msg = e.0;
181-
let _ = adapter
182-
.add_reaction(
183-
&failed_msg.trigger_msg,
184-
&self.router.reactions_config().emojis.error,
185-
)
186-
.await;
187-
let _ = adapter
188-
.send_message(
189-
&thread_channel,
190-
&format!("⚠️ {}", format_user_error("dispatch consumer exited unexpectedly")),
191-
)
192-
.await;
193-
return Err(DispatchError::ConsumerDead);
193+
194+
// Retry: spawn a fresh consumer and re-send. If this also fails,
195+
// surface the error to the user.
196+
let retry_g = self.next_generation.fetch_add(1, Ordering::Relaxed);
197+
let (retry_tx, retry_gen) = {
198+
let mut map = self.per_thread.lock().unwrap();
199+
let entry = map.entry(thread_key.clone()).or_insert_with(|| {
200+
let (tx, rx) = tokio::sync::mpsc::channel(cap);
201+
let consumer = tokio::spawn(consumer_loop(
202+
thread_key.clone(),
203+
thread_channel.clone(),
204+
rx,
205+
Arc::clone(&router),
206+
Arc::clone(&adapter),
207+
cap,
208+
max_tokens,
209+
));
210+
ThreadHandle {
211+
tx,
212+
consumer,
213+
generation: retry_g,
214+
channel_id: thread_channel.channel_id.clone(),
215+
adapter_kind: adapter.platform().to_string(),
216+
}
217+
});
218+
(entry.tx.clone(), entry.generation)
219+
};
220+
221+
if let Err(e2) = retry_tx.send(failed_msg).await {
222+
// Retry also failed — truly unexpected. Surface error.
223+
{
224+
let mut map = self.per_thread.lock().unwrap();
225+
Self::try_evict_locked(&mut map, &thread_key, retry_gen);
226+
}
227+
let failed_msg = e2.0;
228+
let _ = adapter
229+
.add_reaction(
230+
&failed_msg.trigger_msg,
231+
&self.router.reactions_config().emojis.error,
232+
)
233+
.await;
234+
let _ = adapter
235+
.send_message(
236+
&thread_channel,
237+
&format!("⚠️ {}", format_user_error("dispatch consumer exited unexpectedly")),
238+
)
239+
.await;
240+
return Err(DispatchError::ConsumerDead);
241+
}
194242
}
195243
Ok(())
196244
}

0 commit comments

Comments
 (0)