Skip to content

feat(gateway): queue inbound messages during drain and recover interrupted turns on restart#41597

Closed
amittell wants to merge 34 commits intoopenclaw:mainfrom
amittell:feat/inbound-drain-queue
Closed

feat(gateway): queue inbound messages during drain and recover interrupted turns on restart#41597
amittell wants to merge 34 commits intoopenclaw:mainfrom
amittell:feat/inbound-drain-queue

Conversation

@amittell
Copy link
Copy Markdown
Contributor

Summary

Two complementary fixes that together eliminate silent message loss when the gateway restarts:

1. Inbound drain queue — when the gateway starts draining for restart, new inbound messages from Telegram and Discord are captured to a persistent store instead of being rejected with a 503. Acknowledged immediately (200/OK) to the provider. On next startup, captured messages are replayed before the channel pollers resume — so no provider-side retry races and no user messages silently dropped.

2. Active-turn recovery — when a turn is interrupted mid-run by a restart, the session receives a system event on next startup prompting the user to resend. Delivery target is resolved from the session store; if no target can be found (scheduler sessions, dead sessions), recovery is skipped silently. This is inspired by the approach in #41248 by @riftzen-bit, reimplemented using the existing writeJsonAtomic infra and a single shared store (pending-inbound.json) to keep both fixes atomic.

Related: #41547 addresses a related restart race in the session event queue.

Files changed

File What
src/process/command-queue.ts Export isGatewayDraining() getter
src/infra/pending-inbound-store.ts New — atomic read/write for inbound queue + active turns
src/infra/pending-inbound-store.test.ts 12 unit tests
src/telegram/bot-handlers.ts Drain guard in handleInboundMessageLike
src/discord/monitor/message-handler.process.ts Drain guard before dispatchInboundMessage
src/agents/pi-embedded-runner/run/attempt.ts Write/clear active turn on run start/end
src/gateway/server-startup.ts Replay queued inbound + stale turn recovery on startup
src/telegram/bot/delivery.replies.ts Empty-chunk guard in sendPendingFollowUpText
src/telegram/bot/delivery.send.ts Empty-text error handling hardening

Commits

7b42beb01 fix(gateway): skip active-turn recovery when no delivery target resolves
609e8610b fix(gateway): skip scheduler sessions in active-turn recovery
f5cfe97ae feat(gateway): add active-turn recovery on restart using shared store
5a3f67837 feat(gateway): queue inbound messages during drain for lossless restart

Copilot AI review requested due to automatic review settings March 10, 2026 01:43
@aisle-research-bot
Copy link
Copy Markdown

aisle-research-bot Bot commented Mar 10, 2026

🔒 Aisle Security Analysis

We found 3 potential security issue(s) in this PR:

# Severity Title
1 🟠 High Telegram drain capture uses non-isolated sessionKey for named-account DMs (cross-account message mixup)
2 🟡 Medium Prompt-injection risk: untrusted user message replayed as trusted System: event
3 🔵 Low Symlink/TOCTOU risk in atomic writer due to path-based chmod on directory and final file

1. 🟠 Telegram drain capture uses non-isolated sessionKey for named-account DMs (cross-account message mixup)

Property Value
Severity High
CWE CWE-284
Location extensions/telegram/src/bot-handlers.ts:1749-1785

Description

In the Telegram drain path (isGatewayDraining()), the code persists inbound messages to pending-inbound.json with a sessionKey computed from resolveTelegramConversationRoute().route.sessionKey.

For named (non-default) Telegram accounts without explicit bindings, the normal (non-drain) path intentionally overrides the base session key via resolveTelegramConversationBaseSessionKey() to force per-account isolation (see buildTelegramMessageContext).

However, the drain path does not apply this override. As a result:

  • A DM received by a non-default bot account (e.g. accountId opie) during drain can be persisted with a shared session key (often agent:<agent>:main or a key that lacks accountId)
  • On restart, server-startup.ts replays pending entries into the stored sessionKey, so the message can be replayed into the wrong session/agent, mixing conversations across accounts

This breaks account isolation and can lead to cross-account information exposure and misrouting.

Vulnerable flow:

  • Input: Telegram DM to non-default bot account during drain
  • Sink: writePendingInbound(..., { sessionKey: drainSessionKey })
  • Replay: server-startup.ts uses entry.sessionKey to enqueue system events into that session

Recommendation

Mirror the normal message path’s session-key derivation in the drain path.

Specifically:

  1. Compute the base session key using resolveTelegramConversationBaseSessionKey() (which applies the named-account DM fallback isolation).
  2. Then apply resolveThreadSessionKeys() for DM topics (as is done today).

Example fix:

const baseSessionKey = resolveTelegramConversationBaseSessionKey({
  cfg,
  route: drainRoute,
  chatId: event.chatId,
  isGroup: event.isGroup,
  senderId: event.senderId,
});

const threadKeys =
  dmThreadId != null
    ? resolveThreadSessionKeys({ baseSessionKey, threadId: `${event.chatId}:${dmThreadId}` })
    : null;

const drainSessionKey = threadKeys?.sessionKey ?? baseSessionKey;

Add a regression test covering drain capture for non-default accountId DMs (no explicit bindings) asserting the stored sessionKey contains the per-account isolation segment (e.g. agent:main:telegram:<accountId>:direct:<senderId>).


2. 🟡 Prompt-injection risk: untrusted user message replayed as trusted System: event

Property Value
Severity Medium
CWE CWE-74
Location src/gateway/server-startup.ts:184-240

Description

The gateway startup replay logic queues missed inbound user messages as system events. System events are later formatted and prepended to the model prompt as trusted System: lines (see drainFormattedSystemEvents() behavior), which crosses a trust boundary:

  • Input (untrusted): payload.senderName/senderUsername/senderId and payload.text come from the original inbound message.
  • Transformation: only sanitizeInboundSystemTags() is applied, which only rewrites a small set of spoofable markers (e.g. System: prefix, [System]), but does not make the content safe to include as trusted system context.
  • Sink (high-trust prompt context): the resulting eventText is enqueued via enqueueSystemEvent(...) and later inserted into the prompt as System: ....

Because the content is presented to the model as a system line, an attacker can craft message text that attempts instruction-following ("ignore previous instructions", tool-use coercion, data exfiltration requests, etc.). Even though it is wrapped in quotes, LLMs frequently still follow instructions that appear in higher-authority context.

Vulnerable code:

const rawPreview = (payload.text ?? "").slice(0, 200).replace(/\n/g, "\\n");
const textPreview = sanitizeInboundSystemTags(String(rawPreview));
...
const eventText = `[pending-inbound:${entry.id}] Missed message during restart from ${senderLabel}: "${textPreview || "(no text)"}"`;
...
enqueueSystemEvent(eventText, { sessionKey, contextKey: `pending-inbound:${entry.channel}:${entry.id}` });

Recommendation

Treat replayed user content as untrusted user content, not as a trusted system message.

Recommended fixes (pick one):

  1. Do not use system events for user text. Instead, inject the replay as a normal user message (or a dedicated Untrusted replay: block) in the prompt-building layer.

  2. If you must use the system-event mechanism for delivery reasons, explicitly downgrade the authority in the text and in formatting:

    • Change the event text to clearly mark untrusted content, and
    • Adjust drainFormattedSystemEvents() to prefix these as System (untrusted): or Note (untrusted): rather than System:.

Example safer event text + context key:

const replayText = `Untrusted replay (missed during restart) from ${senderLabel}:\n` +
  "```\n" + (payload.text ?? "") + "\n```";

enqueueSystemEvent(replayText, {
  sessionKey,
  contextKey: `pending-inbound:${entry.channel}:${entry.id}`,
});

Additionally, consider avoiding inclusion of raw user text entirely (store only a pointer/metadata like message id), and let the channel re-fetch if possible.


3. 🔵 Symlink/TOCTOU risk in atomic writer due to path-based chmod on directory and final file

Property Value
Severity Low
CWE CWE-59
Location src/infra/json-files.ts:39-67

Description

writeTextAtomic() attempts to enforce permissions by calling chmod() on the parent directory and again on the destination file path after an atomic rename.

Because these chmod() calls operate on paths (not file descriptors) and do not check for symlinks, a local attacker who can manipulate the filesystem entries in the target directory can exploit TOCTOU windows to redirect the chmod() operation to an unintended target.

Impact (requires attacker write access to the configured state directory or ability to swap it for a symlink):

  • Permission tampering / DoS: attacker can cause OpenClaw to chmod() an arbitrary file or directory (the symlink target) to 0o600 or ensureDirMode by racing to replace the destination path with a symlink between rename() and chmod().
  • Symlink-following chmod: fs.chmod(path.dirname(filePath), ...) will follow symlinks on POSIX, so if the state directory is a symlink (or is swapped to one), the permissions applied may affect an unexpected directory.

Vulnerable code:

await fs.mkdir(path.dirname(filePath), mkdirOptions);
await fs.chmod(path.dirname(filePath), options.ensureDirMode);
...
await fs.rename(tmp, filePath);
await fs.chmod(filePath, mode);

Recommendation

Avoid path-based chmod() in this atomic-write helper, or harden it against symlink/TOCTOU:

  1. Prefer eliminating the destination chmod(filePath, mode) entirely.

    • Because the temp file is created and then chmod(tmp, mode) is applied before rename(), the final file will typically already have the correct mode after the atomic rename.
  2. If directory permission enforcement is required, validate and refuse symlinked directories before chmod():

const dir = path.dirname(filePath);
const st = await fs.lstat(dir);
if (!st.isDirectory() || st.isSymbolicLink()) {
  throw new Error(`Refusing to chmod non-directory or symlinked dir: ${dir}`);
}
await fs.chmod(dir, options.ensureDirMode);
  1. On platforms where it is available, use descriptor-based operations to avoid link races (e.g., open directory/file with O_NOFOLLOW and then fchmod).

Also consider failing closed (throwing) if ensureDirMode cannot be enforced for sensitive stores (instead of silently ignoring chmod failures), so the application does not continue writing into an attacker-controlled/wrongly-permissioned directory.


Analyzed PR: #41597 at commit 5220193

Last updated on: 2026-03-18T05:02:01Z

@openclaw-barnacle openclaw-barnacle Bot added channel: discord Channel integration: discord channel: telegram Channel integration: telegram gateway Gateway runtime agents Agent runtime and tooling size: L labels Mar 10, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7b42beb014

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/gateway/server-startup.ts Outdated
Comment on lines +160 to +162
// Consume-then-process: clear first to prevent infinite retry on crash.
await clearPendingInbound(stateDir);
for (const entry of pending) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve active-turn state when clearing pending inbound

clearPendingInbound deletes the whole pending-inbound.json file, but that same file also stores activeTurns; in the startup flow, this call runs before stale-turn recovery, so any restart that has both queued inbound messages and stale turns will silently drop the stale-turn records and skip user recovery notifications. This breaks the new recovery path specifically when drain-queued traffic is present.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. Added clearPendingInboundEntries() and clearAllActiveTurns() that each only touch their own key in the shared store, leaving the other intact. server-startup.ts now uses the scoped functions. The old clearPendingInbound() is kept but marked @deprecated.

Comment thread src/infra/pending-inbound-store.ts Outdated
Comment on lines +53 to +56
const existing = await readPendingInboundFile(filePath);
const key = storeKey(entry);
existing.entries[key] = entry;
await writeJsonAtomic(filePath, existing, {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Serialize read-modify-write updates to the shared store

This write path performs readPendingInboundFile followed by writeJsonAtomic without any lock, so concurrent drain-time writes can race and overwrite each other (last writer wins). In practice, two inbound events arriving close together can each read the same snapshot and one queued message is lost, which defeats the lossless-drain guarantee.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. All read-modify-write operations in pending-inbound-store.ts (writePendingInbound, writeActiveTurn, clearActiveTurn, clearPendingInboundEntries, clearAllActiveTurns) are now wrapped with withFileLock() using the same pattern as updateAuthProfileStoreWithLock. Added concurrent write tests (10 parallel writes of each type) confirming no data loss.

Comment thread src/telegram/bot-handlers.ts Outdated
Comment on lines +1419 to +1421
channel: "telegram",
id: String(event.msg.message_id ?? Date.now()),
payload: {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include chat identity in Telegram pending-entry key

The queued Telegram entry ID is based only on message_id, but Telegram message IDs are not globally unique across chats; during drain, messages from different chats with the same message_id will map to the same dedupe key (telegram:<id>) and one overwrites the other. This causes cross-chat message loss in multi-chat bots.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. Added optional sessionKey field to PendingInboundEntry. Telegram bot-handlers.ts now resolves the session key via resolveTelegramConversationRoute() at capture time. Discord message-handler.process.ts stores boundSessionKey ?? baseSessionKey (already available in ctx). Replay in server-startup.ts uses the stored key with fallback to the fabricated key for backward compat with any entries written before this change.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Mar 10, 2026

Greptile Summary

This PR introduces two complementary restart-safety features: a drain-time inbound queue that captures Telegram/Discord messages during gateway shutdown (replaying them on next startup), and active-turn recovery that notifies sessions interrupted mid-run. The delivery hardening changes (empty-chunk guards, extraction of EMPTY_TEXT_ERR_RE, sentAnyChunk flag) are clean, well-tested, and correct.

Critical issue found:

clearPendingInbound at line 161 deletes the entire shared pending-inbound.json file. Because both inbound queue entries and active-turn data live in the same file, this deletion destroys all active turns before readStaleActiveTurns (line 204) can read them. In the exact scenario both features are designed to handle simultaneously (drain with both queued messages and an in-flight turn), active-turn recovery silently does nothing. The fix is to read both pending and staleTurns before clearing anything, or change clearPendingInbound to zero out only the entries section rather than deleting the whole file.

Confidence Score: 1/5

  • Not safe to merge as-is. The shared-store design has a critical startup sequencing bug that causes silent data loss.
  • The startup code has a sequencing bug where clearPendingInbound() deletes the entire pending-inbound.json file before readStaleActiveTurns() can read the active-turn data from it. In the exact scenario both features are designed to handle together (restart with both queued messages and in-flight turns), the active-turn recovery silently does nothing because the data was deleted at line 161 before it could be read at line 204. All other changes (delivery hardening, store implementation) are solid, but this fundamental issue blocks the feature.
  • src/gateway/server-startup.ts (startup sequencing bug) and src/infra/pending-inbound-store.ts (clearPendingInbound deletes entire file instead of only the entries section).

Last reviewed commit: 7b42beb

Comment on lines +154 to +256
const pending = await readPendingInbound(stateDir);
if (pending.length > 0) {
params.log.warn(`replaying ${pending.length} inbound message(s) captured during drain`);
// Consume-then-process: clear first to prevent infinite retry on crash.
await clearPendingInbound(stateDir);
for (const entry of pending) {
try {
const payload = entry.payload as {
chatId?: number | string;
channelId?: string;
senderId?: string;
senderName?: string;
senderUsername?: string;
text?: string;
};
const senderLabel =
payload.senderName ?? payload.senderUsername ?? payload.senderId ?? "unknown";
const textPreview = (payload.text ?? "").slice(0, 200).replace(/\n/g, "\\n");
const sessionKey =
entry.channel === "telegram"
? `telegram:${payload.chatId ?? "unknown"}`
: entry.channel === "discord"
? `discord:channel:${payload.channelId ?? "unknown"}`
: `${entry.channel}:unknown`;
const eventText = `[pending-inbound] Missed message during restart from ${senderLabel}: "${textPreview || "(no text)"}"`;
enqueueSystemEvent(eventText, {
sessionKey,
contextKey: `pending-inbound:${entry.channel}:${entry.id}`,
});
params.log.warn(
`pending-inbound: replayed ${entry.channel}:${entry.id} → session ${sessionKey}`,
);
} catch (err) {
params.log.warn(
`pending-inbound: replay failed for ${entry.channel}:${entry.id}: ${String(err)}`,
);
}
}
}
} catch (err) {
params.log.warn(`pending-inbound: replay startup failed: ${String(err)}`);
}

// Recover stale active turns — runs that were in-flight when the process died.
// Notify the originating session so the user knows to resend.
try {
const stateDir = resolveStateDir(process.env);
const staleTurns = await readStaleActiveTurns(stateDir);
if (staleTurns.length > 0) {
params.log.warn(
`active-turn recovery: found ${staleTurns.length} stale turn(s) from previous process`,
);
for (const turn of staleTurns) {
// Consume first to prevent infinite retry on crash.
await clearActiveTurn(stateDir, turn.sessionId);

// Skip probe sessions — they are synthetic health-check runs.
if (turn.sessionId.startsWith("probe-")) {
continue;
}

// Attempt to resolve a delivery target for the session. Sessions without
// a resolvable channel target (isolated scheduler sessions, orphaned sessions,
// or any session with no channel mapping) are skipped — the originating system
// (e.g. a scheduler's drain-retry) handles recovery for those.
const { baseSessionKey } = parseSessionThreadInfo(turn.sessionKey);
const parsedTarget = resolveAnnounceTargetFromKey(baseSessionKey ?? turn.sessionKey ?? "");
const { entry: turnEntry } = loadSessionEntry(turn.sessionKey ?? "");
let deliveryCtx = deliveryContextFromSession(turnEntry);
if (!deliveryCtx && baseSessionKey && baseSessionKey !== turn.sessionKey) {
const { entry: baseEntry } = loadSessionEntry(baseSessionKey);
deliveryCtx = deliveryContextFromSession(baseEntry);
}
const origin = mergeDeliveryContext(deliveryCtx, parsedTarget ?? undefined);
const resolvedChannel = origin?.channel ? normalizeChannelId(origin.channel) : null;
const resolvedTo = origin?.to;
if (!resolvedChannel || !resolvedTo) {
continue;
}

try {
const recoveryMessage =
"⚠️ I was restarted mid-conversation. Please resend your last message.";
enqueueSystemEvent(`[active-turn-recovery] ${recoveryMessage}`, {
sessionKey: turn.sessionKey,
contextKey: `active-turn-recovery:${turn.sessionId}`,
});
params.log.warn(
`active-turn recovery: notified session ${turn.sessionKey} (sessionId=${turn.sessionId}, channel=${turn.channel})`,
);
} catch (err) {
params.log.warn(
`active-turn recovery: notify failed for session ${turn.sessionKey}: ${String(err)}`,
);
}
}
}
} catch (err) {
params.log.warn(`active-turn recovery: startup failed: ${String(err)}`);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical — startup sequencing bug loses active-turn recovery data

clearPendingInbound (line 161) calls fs.unlink() to delete the entire shared pending-inbound.json file. Because both inbound queue (entries) and active-turn data (activeTurns) live in the same file, this deletion permanently removes any active turns before readStaleActiveTurns (line 204) can read them.

Failure scenario (exactly what the PR is designed to handle):

  1. Gateway drains while a turn is mid-flight → both entries (queued inbound) and activeTurns written to the same file
  2. Restart begins → line 157 reads pending inbound (non-empty)
  3. Line 161 calls clearPendingInbound() → file deleted entirely
  4. Line 204 calls readStaleActiveTurns() → file no longer exists, returns [] silently
  5. Active-turn recovery skipped even though stale turns existed

The two features are advertised as complementary and atomic, but they actively sabotage each other at startup in the exact scenario they're designed to handle.

Fix: Read both pending and staleTurns before clearing anything:

// Read both sections before any clearing
const stateDir = resolveStateDir(process.env);
const [pending, staleTurns] = await Promise.all([
  readPendingInbound(stateDir),
  readStaleActiveTurns(stateDir),
]);
// ... replay pending ...
if (pending.length > 0) {
  await clearPendingInbound(stateDir);
  // ... process ...
}
// ... recover stale turns (data already read before file was deleted) ...

Alternatively, modify clearPendingInbound to only zero out the entries section rather than deleting the whole file.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/gateway/server-startup.ts
Line: 154-256

Comment:
**Critical — startup sequencing bug loses active-turn recovery data**

`clearPendingInbound` (line 161) calls `fs.unlink()` to delete the entire shared `pending-inbound.json` file. Because both inbound queue (`entries`) and active-turn data (`activeTurns`) live in the same file, this deletion permanently removes any active turns **before** `readStaleActiveTurns` (line 204) can read them.

**Failure scenario (exactly what the PR is designed to handle):**
1. Gateway drains while a turn is mid-flight → both `entries` (queued inbound) and `activeTurns` written to the same file
2. Restart begins → line 157 reads pending inbound (non-empty)
3. Line 161 calls `clearPendingInbound()` → file deleted entirely
4. Line 204 calls `readStaleActiveTurns()` → file no longer exists, returns `[]` silently
5. Active-turn recovery skipped even though stale turns existed

The two features are advertised as complementary and atomic, but they actively sabotage each other at startup in the exact scenario they're designed to handle.

**Fix:**  Read both `pending` and `staleTurns` before clearing anything:

```typescript
// Read both sections before any clearing
const stateDir = resolveStateDir(process.env);
const [pending, staleTurns] = await Promise.all([
  readPendingInbound(stateDir),
  readStaleActiveTurns(stateDir),
]);
// ... replay pending ...
if (pending.length > 0) {
  await clearPendingInbound(stateDir);
  // ... process ...
}
// ... recover stale turns (data already read before file was deleted) ...
```

Alternatively, modify `clearPendingInbound` to only zero out the `entries` section rather than deleting the whole file.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. Added clearPendingInboundEntries() and clearAllActiveTurns() that each only touch their own key in the shared store, leaving the other intact. server-startup.ts now uses the scoped functions. The old clearPendingInbound() is kept but marked @deprecated.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to prevent silent message loss across gateway restarts by (1) persisting inbound messages received while the gateway is draining and replaying them on next startup, and (2) tracking “active turns” on disk so interrupted runs can be detected and a recovery prompt can be issued after restart. It also hardens Telegram sending to better handle “empty text” rejection cases and to skip empty chunks during chunked delivery.

Changes:

  • Add a disk-backed pending-inbound.json store for drain-captured inbound messages and active-turn markers.
  • Capture inbound messages during drain in Telegram/Discord handlers and replay them on gateway startup; detect stale active turns and enqueue recovery notices.
  • Improve Telegram send/delivery behavior around empty-text errors and chunk handling, with added tests.

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
src/process/command-queue.ts Exposes isGatewayDraining() so channel handlers can detect drain state.
src/infra/pending-inbound-store.ts New atomic JSON store for pending inbound entries + active turns.
src/infra/pending-inbound-store.test.ts Unit tests for the pending inbound / active turn store.
src/telegram/bot-handlers.ts Captures inbound Telegram messages to pending store during drain.
src/discord/monitor/message-handler.process.ts Captures inbound Discord messages to pending store during drain.
src/gateway/server-startup.ts Replays pending inbound entries and attempts active-turn recovery on startup.
src/agents/pi-embedded-runner/run/attempt.ts Writes/clears active-turn markers at run start/end.
src/telegram/network-errors.ts Adds shared regex for Telegram “empty text” error variants.
src/telegram/send.ts Treats “empty text” errors like HTML parse errors to trigger plain-text fallback.
src/telegram/send.test.ts Adds test coverage for the new empty-text fallback in send path.
src/telegram/bot/delivery.send.ts Hardens Telegram send logic to skip/return undefined on unrecoverable empty text scenarios.
src/telegram/bot/delivery.replies.ts Skips empty chunks; ensures buttons/reply threading apply to first sent chunk.
src/telegram/bot/delivery.test.ts Adds tests for empty chunk handling + empty-text fallback modes.

Comment on lines +121 to +129
if (isGatewayDraining()) {
const stateDir = resolveStateDir(process.env);
await writePendingInbound(stateDir, {
channel: "discord",
id: String(message.id ?? Date.now()),
payload: {
channelId: messageChannelId,
messageId: message.id,
text: baseText ?? "",
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drain capture already has route/baseSessionKey/boundSessionKey in ctx, but the stored payload omits the effective session key. Startup replay currently fabricates discord:channel:<id> keys, which won’t match the agent-scoped session keys used by routing (e.g. agent:<id>:discord:channel:<id>). Persist the effective session key in the pending entry so replay targets the correct session.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. Added optional sessionKey field to PendingInboundEntry. Telegram bot-handlers.ts now resolves the session key via resolveTelegramConversationRoute() at capture time. Discord message-handler.process.ts stores boundSessionKey ?? baseSessionKey (already available in ctx). Replay in server-startup.ts uses the stored key with fallback to the fabricated key for backward compat with any entries written before this change.

Comment on lines +158 to +162
if (pending.length > 0) {
params.log.warn(`replaying ${pending.length} inbound message(s) captured during drain`);
// Consume-then-process: clear first to prevent infinite retry on crash.
await clearPendingInbound(stateDir);
for (const entry of pending) {
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearPendingInbound(stateDir) happens before any replay events are queued. Because replay uses enqueueSystemEvent (in-memory only), a crash/restart during startup can permanently drop the persisted inbound entries with no retry path. Consider delaying deletion until after a durable write (or tracking per-entry replay attempts).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. Added clearPendingInboundEntries() and clearAllActiveTurns() that each only touch their own key in the shared store, leaving the other intact. server-startup.ts now uses the scoped functions. The old clearPendingInbound() is kept but marked @deprecated.

Comment on lines +209 to +213
for (const turn of staleTurns) {
// Consume first to prevent infinite retry on crash.
await clearActiveTurn(stateDir, turn.sessionId);

// Skip probe sessions — they are synthetic health-check runs.
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Active-turn recovery clears the on-disk marker (clearActiveTurn) before the recovery event is enqueued. Since enqueueSystemEvent is in-memory only, a crash during startup can drop the notice and also remove the marker so it won’t retry next time. Consider clearing only after a durable notification is recorded, or tracking retry attempts safely.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. Added clearPendingInboundEntries() and clearAllActiveTurns() that each only touch their own key in the shared store, leaving the other intact. server-startup.ts now uses the scoped functions. The old clearPendingInbound() is kept but marked @deprecated.

Comment thread src/gateway/server-startup.ts Outdated
Comment on lines +175 to +180
const sessionKey =
entry.channel === "telegram"
? `telegram:${payload.chatId ?? "unknown"}`
: entry.channel === "discord"
? `discord:channel:${payload.channelId ?? "unknown"}`
: `${entry.channel}:unknown`;
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The replay path synthesizes sessionKey values like telegram:<chatId> / discord:channel:<channelId>, but system events are queued under the exact string key and most of the gateway uses agent-scoped session keys (e.g. agent:<id>:telegram:...). These keys likely won’t match, so replayed events may never be applied. Prefer persisting the exact sessionKey at capture time and using it here (or canonicalizing to the routed session key form).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. Added optional sessionKey field to PendingInboundEntry. Telegram bot-handlers.ts now resolves the session key via resolveTelegramConversationRoute() at capture time. Discord message-handler.process.ts stores boundSessionKey ?? baseSessionKey (already available in ctx). Replay in server-startup.ts uses the stored key with fallback to the fabricated key for backward compat with any entries written before this change.

Comment thread src/infra/pending-inbound-store.ts Outdated
Comment on lines +52 to +60
const filePath = resolveStorePath(stateDir);
const existing = await readPendingInboundFile(filePath);
const key = storeKey(entry);
existing.entries[key] = entry;
await writeJsonAtomic(filePath, existing, {
mode: 0o600,
trailingNewline: true,
ensureDirMode: 0o700,
});
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writePendingInbound does a read-modify-write without any locking. If multiple inbound messages are captured concurrently during drain, the last writer can overwrite the file based on a stale read and drop other entries. Consider guarding updates with an async lock (e.g. createAsyncLock() around the RMW section).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. All read-modify-write operations in pending-inbound-store.ts (writePendingInbound, writeActiveTurn, clearActiveTurn, clearPendingInboundEntries, clearAllActiveTurns) are now wrapped with withFileLock() using the same pattern as updateAuthProfileStoreWithLock. Added concurrent write tests (10 parallel writes of each type) confirming no data loss.

Comment thread src/infra/pending-inbound-store.ts Outdated
Comment on lines +92 to +102
const filePath = resolveStorePath(stateDir);
const existing = await readPendingInboundFile(filePath);
if (!existing.activeTurns) {
existing.activeTurns = {};
}
existing.activeTurns[entry.sessionId] = entry;
await writeJsonAtomic(filePath, existing, {
mode: 0o600,
trailingNewline: true,
ensureDirMode: 0o700,
});
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeActiveTurn also does a read-modify-write without locking. Concurrent turn starts/ends (or overlap with pending-inbound writes) can race and lose updates to activeTurns. Consider reusing the same async lock as writePendingInbound to serialize modifications to pending-inbound.json.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. All read-modify-write operations in pending-inbound-store.ts (writePendingInbound, writeActiveTurn, clearActiveTurn, clearPendingInboundEntries, clearAllActiveTurns) are now wrapped with withFileLock() using the same pattern as updateAuthProfileStoreWithLock. Added concurrent write tests (10 parallel writes of each type) confirming no data loss.

Comment thread src/telegram/bot-handlers.ts Outdated
Comment on lines +1416 to +1425
if (isGatewayDraining()) {
const stateDir = resolveStateDir(process.env);
await writePendingInbound(stateDir, {
channel: "telegram",
id: String(event.msg.message_id ?? Date.now()),
payload: {
chatId: event.chatId,
messageId: event.msg.message_id,
text: event.msg.text ?? event.msg.caption ?? "",
senderId: event.senderId,
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drain capture here stores only chat/message metadata, but not the resolved session key (which is typically agent-scoped like agent:<id>:telegram:... and can vary via bindings). Startup replay currently fabricates telegram:<chatId> keys, which won’t match real session keys and can prevent the replayed event from being consumed. Capture and persist the effective sessionKey (and any needed routing inputs) at drain time.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. Added optional sessionKey field to PendingInboundEntry. Telegram bot-handlers.ts now resolves the session key via resolveTelegramConversationRoute() at capture time. Discord message-handler.process.ts stores boundSessionKey ?? baseSessionKey (already available in ctx). Replay in server-startup.ts uses the stored key with fallback to the fabricated key for backward compat with any entries written before this change.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: cf736d223c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/telegram/bot-handlers.ts Outdated
Comment on lines +1416 to +1420
if (isGatewayDraining()) {
const stateDir = resolveStateDir(process.env);
// Resolve the session key now so replay routes to the correct agent-scoped session.
const { route: drainRoute } = resolveTelegramConversationRoute({
cfg,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Apply Telegram auth checks before persisting drain events

The drain fast-path writes to pending-inbound and returns before resolveTelegramEventAuthorizationContext runs, so messages that would normally be blocked by DM/group allowlist policy are still captured and later replayed as system events. In group chats with sender restrictions, an unauthorized user posting during restart can therefore inject text into the next prompt for that session, which bypasses the existing authorization gate for that message path.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. Added optional sessionKey field to PendingInboundEntry. Telegram bot-handlers.ts now resolves the session key via resolveTelegramConversationRoute() at capture time. Discord message-handler.process.ts stores boundSessionKey ?? baseSessionKey (already available in ctx). Replay in server-startup.ts uses the stored key with fallback to the fabricated key for backward compat with any entries written before this change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bccb4f9eef239c139e24b349d07782998f8eff79. Moved isGatewayDraining() guard to after resolveTelegramEventAuthorizationContext, shouldSkipGroupMessage, enforceTelegramDmAccess, and the inline DM/requireTopic auth checks. Only messages that pass all authorization gates are now written to the pending-inbound store.

Comment on lines +214 to +216
for (const turn of staleTurns) {
// Consume first to prevent infinite retry on crash.
await clearActiveTurn(stateDir, turn.sessionId);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Filter out fresh turns in startup stale-turn recovery

This recovery loop clears every activeTurns entry without checking whether it actually predates the current process, but it runs after channel startup, so new inbound traffic can create fresh active-turn records before this loop executes. Those live turns can then be cleared and treated as stale, causing incorrect restart warnings and dropping crash-recovery tracking for the currently running turn.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cf736d223. Added optional sessionKey field to PendingInboundEntry. Telegram bot-handlers.ts now resolves the session key via resolveTelegramConversationRoute() at capture time. Discord message-handler.process.ts stores boundSessionKey ?? baseSessionKey (already available in ctx). Replay in server-startup.ts uses the stored key with fallback to the fabricated key for backward compat with any entries written before this change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Will add a processStartedAt guard: store the gateway process start time at boot and skip any activeTurns entry whose startedAt is newer than processStartedAt. That way turns started after the recovery loop can schedule are never incorrectly treated as stale. Fix incoming in the next commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in db1608c59. processStartedAt is recorded at the top of the startup function before channel pollers are started. The active-turn recovery loop now skips any entry where entry.startedAt >= processStartedAt — those were written by the current process and are live, not stale. Only turns that predate the current boot are recovered and cleared.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a335294dbe1b77e63c76f5558cbc6f95a3c07bd1. Added processStartedAt = Date.now() captured at the top of startGatewaySidecars (before channel pollers start). The active-turn recovery loop now skips any entry where turn.startedAt >= processStartedAt — those turns were written by the current process after channel startup and are live, not stale.

@amittell
Copy link
Copy Markdown
Contributor Author

amittell commented Mar 10, 2026

Thanks for the analysis — all three issues have been addressed in commit a2dc479bf:

1. High: Prompt injection via system event replay
Applied sanitizeInboundSystemTags() to both senderLabel and textPreview before building eventText in the startup replay loop, preventing any spoofed [System Message] / System: tags in user-controlled payload from being treated as trusted.

2. Medium: Auth bypass before persist
Moved the drain guard in handleInboundMessageLike to after resolveTelegramEventAuthorizationContext, shouldSkipGroupMessage, and enforceTelegramDmAccess. Only messages that would pass the normal authorization path are now persisted. Discord was already post-auth (auth verified in message-handler.preflight.ts before processDiscordMessage is called — added a comment to make this clear).

3. Medium: Unbounded store
Added MAX_PENDING_ENTRIES = 200 and MAX_ACTIVE_TURNS = 50 caps. writePendingInbound now prunes the oldest entries by capturedAt when over cap. writeActiveTurn prunes oldest turns by startedAt.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: a2dc479bf5

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/infra/pending-inbound-store.ts Outdated
entry: PendingInboundEntry,
): Promise<void> {
const filePath = resolveStorePath(stateDir);
await withFileLock(filePath, PENDING_INBOUND_LOCK_OPTIONS, async () => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Replace reentrant locking for pending-inbound updates

Using withFileLock here does not actually serialize concurrent in-process read-modify-write calls, because the lock helper is reentrant for the same process; overlapping writes can still read the same snapshot and clobber each other. Fresh evidence: concurrent writePendingInbound runs on this revision intermittently lose entries (for example, 79 persisted out of 80 writes), so the queue can still drop inbound messages under burst traffic.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f7efdcbae. Added a module-level Map<string, Promise<void>> in pending-inbound-store.ts that serializes all operations on the same file path as a promise chain, preventing any two in-process calls from interleaving their read-modify-write. The existing withFileLock is kept for cross-process safety. Added concurrent write tests (10 parallel writePendingInbound calls) — all 10 entries now persist reliably.

// Fire-and-forget: disk write failure must not block the run.
if (!params.sessionId?.startsWith("probe-")) {
const runtimeChannel = params.messageChannel ?? params.messageProvider ?? "unknown";
writeActiveTurn(resolveStateDir(process.env), {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Await active-turn persistence to keep write/clear ordered

The active-turn lifecycle is now fire-and-forget on both start and cleanup, so a short run can call clearActiveTurn before writeActiveTurn finishes; that leaves a completed turn recorded as active and triggers false stale-turn recovery on next startup. This is especially likely during lock contention because neither operation is awaited before proceeding.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f7efdcbae. Both writeActiveTurn at run start and clearActiveTurn in the finally block in attempt.ts are now properly awaited, ensuring the write completes before the run proceeds and the clear completes before cleanup exits. No more write/clear race on short runs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 685565ede3a40605c9e5a103cdc19953fb0d3e47. Both writeActiveTurn at run start and clearActiveTurn in the finally block are now properly await-ed with try/catch, replacing the previous fire-and-forget .catch() pattern. This guarantees the write settles before the clear runs, preventing ghost active-turn records on short runs under lock contention.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: db1608c599

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/telegram/bot-handlers.ts Outdated
});
await writePendingInbound(stateDir, {
channel: "telegram",
id: String(event.msg.message_id ?? Date.now()),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include chat/session identity in Telegram drain dedupe key

Fresh evidence on this revision: drain capture still sets id to only event.msg.message_id, and the store deduplicates by channel:id, so Telegram messages from different chats that share the same message_id overwrite each other in pending-inbound.json. In multi-chat bots this drops one queued message during restart drain/replay instead of preserving both.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f96c75e4e — dedupe key now ${chatId}:${messageId} in bot-handlers.ts.

Comment thread src/telegram/bot-handlers.ts Outdated
date: event.msg.date,
},
capturedAt: Date.now(),
sessionKey: drainRoute.sessionKey,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve Telegram thread-scoped session key during drain

The drain path persists sessionKey directly from drainRoute.sessionKey, but this bypasses the thread-session derivation used in normal Telegram handling (for example DM topics via resolveThreadSessionKeys). Messages captured while draining from a threaded DM can therefore be replayed into the base DM session after restart, mixing thread contexts and misrouting the recovery event.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f96c75e4eresolveThreadSessionKeys() is now called in the drain path before storing sessionKey, so DM topic routing is preserved on replay.

Comment on lines +161 to +163
// Replay inbound messages captured during the previous drain.
try {
const stateDir = resolveStateDir(process.env);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Replay queued inbound before starting channel intake

Queued inbound replay is executed only after startChannels() returns, so new live inbound events can be accepted and processed before older drain-captured events are replayed. If users send messages during this startup window, the "missed during restart" system event is injected late/out of order, which undermines the intended recovery flow.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f96c75e4e — replay is now called before startChannels() in server-startup.ts so drain-captured messages are processed before new live events can race ahead.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f96c75e4eb

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/telegram/bot-handlers.ts Outdated
Comment on lines +1485 to +1490
const { route: drainRoute } = resolveTelegramConversationRoute({
cfg,
accountId,
chatId: event.chatId,
isGroup: event.isGroup,
resolvedThreadId: resolveTelegramForumThreadId({
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Pass topic routing params when capturing Telegram drain events

The drain capture route resolution omits replyThreadId and topicAgentId, even though normal Telegram routing passes them to apply topic conversation bindings and topic-level agent overrides. In forum topics (or any topic-bound session), this can resolve drainSessionKey to the base chat session instead of the topic session, so the replayed "missed during restart" event is injected into the wrong session/agent and the intended topic loses that queued inbound context.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in fb7291629 — drain path now passes replyThreadId and topicAgentId to resolveTelegramConversationRoute().

Comment thread src/telegram/bot-handlers.ts Outdated
await writePendingInbound(stateDir, {
channel: "telegram",
// Include chatId in the id to prevent collisions across chats with the same message_id.
id: `${event.chatId}:${event.msg.message_id ?? Date.now()}`,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include account identity in Telegram drain entry IDs

Pending inbound deduplication is keyed by channel:id, but this ID only includes chatId:message_id. In multi-account Telegram deployments, two bot accounts in the same chat observe the same message_id, so one account's queued drain entry overwrites the other and only one replay event survives restart.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in fb7291629 — dedupe key is now ${accountId}::${chatId}:${message_id}.

@amittell
Copy link
Copy Markdown
Contributor Author

Note for reviewers: This branch was built on top of fix/empty-text-silent-skip (#40384) and includes those commits. If #40384 merges first, this branch will be rebased onto main and the duplicate commits dropped before merge. If this PR merges first, #40384 becomes a no-op. Either order is safe — just flagging so there is no surprise on the merge conflict in delivery.replies.ts / delivery.send.ts.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: fb72916297

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/infra/pending-inbound-store.ts Outdated
inProcessOpQueue.set(filePath, next);
// Clean up the map entry once the chain settles to avoid unbounded growth
// for paths that are written once and never again.
void next.finally(() => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid unhandled rejection from queue cleanup finalizer

withInProcessQueue launches next.finally(...) and discards the returned promise, so when a queued operation fails (for example lock timeout or atomic write error), callers can handle the next rejection but the separate promise from .finally still rejects unhandled. In Node 22 this can surface as unhandledRejection and may terminate the gateway process during transient store failures.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bd2c0988f — added .catch(() => {}) on the finally wrapper promise to prevent unhandled rejection in Node 22.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: dfe4810e68

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

capturedAt: Date.now(),
sessionKey: drainSessionKey,
});
return; // provider already got 200, no retry needed
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Apply full Telegram gating before queuing drain events

The drain fast-path returns immediately after writePendingInbound, so these messages never run through processInboundMessage and buildTelegramMessageContext checks (for example mention gating, DM requireTopic, named-account explicit binding, and configured ACP route readiness). In group chats with requireMention enabled, any non-mention message received during drain is currently persisted and replayed as a [pending-inbound] system event on restart, even though it would be dropped in normal processing, which can inject unrelated chat text into an agent session.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bd2c0988f — drain path now runs mention gate, requireTopic, and named-account binding checks before calling writePendingInbound. Messages that would fail these gates are dropped, not queued.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: bd2c0988f6

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/gateway/server-startup.ts Outdated
Comment on lines +186 to +189
const eventText = `[pending-inbound] Missed message during restart from ${senderLabel}: "${textPreview || "(no text)"}"`;
enqueueSystemEvent(eventText, {
sessionKey,
contextKey: `pending-inbound:${entry.channel}:${entry.id}`,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve distinct repeated messages in pending-inbound replay

Each drained inbound entry is converted to eventText and sent through enqueueSystemEvent, but that queue deduplicates consecutive entries by text only (it ignores contextKey). If a user sends the same message twice during restart (or two different long messages that share the same 200-char preview), the second replay event is dropped even though it has a different inbound ID, and because entries are cleared first this loss is permanent.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bd2c0988f — replay event text now includes a unique timestamp suffix per entry so consecutive identical messages are not collapsed by enqueueSystemEvent deduplication.

Comment on lines +187 to +189
enqueueSystemEvent(eventText, {
sessionKey,
contextKey: `pending-inbound:${entry.channel}:${entry.id}`,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid truncating replay when a session has >20 queued entries

This loop replays every pending entry via enqueueSystemEvent, but the system-event queue keeps only 20 events per session (MAX_EVENTS in src/infra/system-events.ts). During longer drains, a busy chat can easily exceed 20 queued messages, causing older replayed entries to be shifted out and silently lost; since clearPendingInboundEntries runs before replay, those dropped entries cannot be recovered.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bd2c0988f — replay is capped at MAX_EVENTS per session. When a session has more queued entries, only the most recent N are replayed and a [N older messages skipped during restart] notice is prepended.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 77419f089c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/gateway/server-startup.ts Outdated

// Per-session cap: system-event queue holds at most MAX_EVENTS entries.
// Reserve one slot for a "skipped" summary so we never silently drop events.
const REPLAY_CAP_PER_SESSION = MAX_EVENTS - 1;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Replay all pending entries when queue is at capacity

The per-session replay cap is set to MAX_EVENTS - 1, which means a session with exactly MAX_EVENTS queued inbound entries is treated as overflow: one real message is dropped and replaced with the "older messages skipped" summary even though the queue can still hold all entries. This introduces avoidable message loss during restart for sessions that have exactly 20 pending messages.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6f038bddf — cap threshold changed from MAX_EVENTS - 1 to MAX_EVENTS. Sessions with exactly MAX_EVENTS queued entries now replay all of them without triggering the skip notice. Only sessions with strictly more than MAX_EVENTS entries truncate.

amittell added 19 commits March 17, 2026 22:22
Fix 1 (comment 2909025398): Include chatId in pending-inbound dedupe key
to prevent collisions when multiple chats have messages with the same
message_id. Change id from String(message_id) to `${chatId}:${message_id}`.

Fix 2 (comment 2909025402): Apply resolveThreadSessionKeys() in the drain
path for DM topics, mirroring resolveTelegramSessionState(). The drain path
was storing drainRoute.sessionKey directly without thread-scoping, which
caused replayed messages in DM topics to route to the wrong session.
dmThreadId is already resolved from eventAuthContext earlier in
handleInboundMessageLike, so we can apply the same thread key derivation
used in the normal path.

Fix 3 (comment 2909025405): Move replayPendingInbound() to run BEFORE
startChannels() in startGatewaySidecars(). Previously the replay ran after
channel startup, creating a race where live inbound events could be
processed ahead of drain-captured replays. enqueueSystemEvent() is a pure
in-memory operation requiring no channel infrastructure, so it is safe to
call before channels start. Queued events are consumed by the first
matching turn after channels become active.
…ion; prefix entry id with accountId

Fix 1 (P1 — comment 2911716662): resolveTelegramConversationRoute() in the
drain path was omitting replyThreadId and topicAgentId. Normal Telegram routing
passes both to apply topic conversation bindings (thread binding lookup via
`${chatId}:topic:${replyThreadId}` and agent override from topic config).
Without them, drain events in topic sessions resolved to the base chat session
instead of the correct agent-scoped session.

Compute drainReplyThreadId = resolvedThreadId ?? dmThreadId (mirrors the
threadSpec.id used in the normal path) and pass topicAgentId: topicConfig?.agentId
(already available from eventAuthContext) so the drain route exactly matches
the non-drain route for topic messages.

Fix 2 (P2 — comment 2911716667): The pending-inbound dedupe key was
`${chatId}:message_id`. In multi-account deployments two bot accounts in the
same chat observe the same message_id, so one account's drain entry overwrote
the other's. Prefix the key with accountId (available in closure scope) using
the separator '::' so the resulting key is unique per-account:
  `${accountId}::${chatId}:${message_id}`
- network-errors.ts: extend isSafeToRetrySendError to handle 429 rate-limit
  errors (safe to retry since Telegram rejected before delivery) and grammY
  'Network request failed after N attempts' envelope errors (network-layer
  failure, message was never delivered).
  Fixes: 'retries on transient errors with retry_after' and
  'retries when grammY network envelope message includes failed-after wording'
  in src/telegram/send.test.ts.

- bot-native-commands.ts: fix sendAuthMessage to not pass empty {} as 3rd
  argument to bot.api.sendMessage when there are no thread params.
  buildTelegramThreadParams returns undefined (not {}) for no-thread cases;
  wrapping with ?? {} caused an extraneous empty options object to be passed.
  Fixes: 'blocks native DM commands for unpaired users' in bot.test.ts.

- bot.create-telegram-bot.test.ts: update stale 'drops non-default account
  DMs without explicit bindings' test to reflect behaviour restored by
  fix(telegram): restore named-account DM fallback routing (6337666).
  Named-account DMs without explicit bindings are now routed through with
  a per-account isolated session key, not dropped. Updated test name and
  assertions accordingly.
P1 (inline #2912274100): suppress unhandled rejection from withInProcessQueue finalizer
- next.finally() mirrors next's rejection on the cleanup wrapper promise
- void-discarding that promise causes Node 22 unhandled-rejection in lock timeout
  or atomic write error scenarios, which can terminate the gateway
- Fixed: chain .catch(() => {}) on the cleanup wrapper; rejection is still
  surfaced to the caller via the returned  promise

Aisle Low (#2): unauthorized plain-text DMs could reach the drain store
- enforceTelegramDmAccess only runs for DMs with media; text-only DMs had
  no auth gate before the drain guard
- Fixed: inline dmPolicy + isAllowlistAuthorized check in handleInboundMessageLike
  for text-only DMs, mirroring the check buildTelegramMessageContext performs
  in the normal (non-drain) path

Aisle Low (openclaw#3): temp file and state dir created with permissive permissions
- fs.writeFile(tmp, payload, 'utf8') created the temp file with default
  (umask-based) permissions before a subsequent chmod, leaving a brief
  window where sensitive payload was world-readable
- fs.mkdir with recursive:true silently ignores the mode option on macOS
- Fixed: pass { encoding, mode } to writeFile so the file is never created
  with permissive permissions; add explicit fs.chmod after mkdir to ensure
  the state dir carries 0o700 regardless of platform mkdir behavior

Tests: add 3 new cases to pending-inbound-store.test.ts covering permissions
(file 0o600, dir 0o700) and unhandled rejection non-leakage
…(round 3)

P1 (2912417168): apply full Telegram gating before queuing drain events

The drain fast-path was missing three gates that buildTelegramMessageContext
enforces in the normal message path, causing the pending-inbound store to
accumulate entries the session would discard on replay.

1. requireTopic (DMs): add check before the drain guard — if the direct config
   has requireTopic=true and no dmThreadId is present, return early.  This
   mirrors the topicRequiredButMissing guard in buildTelegramMessageContext.

2. Named-account explicit binding (groups): inside the drain guard, after
   resolveTelegramConversationRoute, check drainRoute.accountId !== DEFAULT_ACCOUNT_ID
   && drainRoute.matchedBy === 'default'.  Named-account groups without an explicit
   binding are dropped in buildTelegramMessageContext; mirror that here.  DMs use
   a per-account isolated session key (not dropped), so the gate is groups-only.

3. Mention gating (groups): inside the drain guard, after resolving drainSessionKey,
   call resolveGroupActivation + resolveGroupRequireMention + firstDefined to
   compute drainRequireMention the same way buildTelegramMessageContext does.  When
   a mention is required and canDetectMention, use matchesMentionWithExplicit +
   hasBotMention to verify the message contains the bot mention before persisting.
   resolveGroupRequireMention and resolveGroupActivation are added to
   RegisterTelegramHandlerParams and passed through from bot.ts.

P2 (2912623799): preserve distinct repeated messages in pending-inbound replay

enqueueSystemEvent deduplicates consecutive entries by text.  Two identical
messages sent during a drain window (same text, same sender) collapsed to one.
Fix: include entry.id in the eventText:
  '[pending-inbound:<id>] Missed message...'
Since entry.id encodes accountId::chatId:messageId, each entry produces unique
text, preventing dedup even across sessions with matching message bodies.

P2 (2912623802): avoid truncating replay when session has >20 queued entries

The system-event queue caps at MAX_EVENTS=20.  Replaying >20 entries for a
single session silently dropped the oldest via queue.shift().  Fix:
- Export MAX_EVENTS from system-events.ts
- In the replay loop (server-startup.ts), group pending entries by sessionKey
- Cap per-session delivery at MAX_EVENTS-1 (REPLAY_CAP_PER_SESSION)
- When skipped > 0, prepend a '[N older messages skipped during restart
  (queue cap 20)]' summary so the agent knows messages were omitted
- Log a warn for trimmed entries

Tests: add 3 new cases to system-events.test.ts
- MAX_EVENTS exported and positive
- distinct entries with same text but different IDs are not deduplicated
- queue caps at MAX_EVENTS and drops oldest on overflow
REPLAY_CAP_PER_SESSION was MAX_EVENTS - 1, which caused a session with
exactly MAX_EVENTS queued entries to be treated as overflow — the oldest
entry was unnecessarily skipped and a 'skipped' notice was injected.

Fix: set REPLAY_CAP_PER_SESSION = MAX_EVENTS. Sessions with exactly
MAX_EVENTS entries now replay all of them; only sessions with MORE than
MAX_EVENTS entries trigger truncation.

Fixes review comment #2912819041 on PR openclaw#41597.
…ireMention check

The drain-time requireMention gate used matchesMentionWithExplicit (explicit
text/entity matching only) but dropped drained messages when wasMentioned was
false, without consulting the implicit-mention signal used by the normal
(non-drain) message processing path.

In normal flow, resolveTelegramInboundBody also computes implicitMention:
  const replyToBotMessage = botId != null && replyFromId === botId;
  const implicitMention = replyToBotMessage && !isReplyToServiceMessage;

This commits mirrors that same logic inside the drain guard so that:
- replies to a (non-service) bot message pass requireMention at drain time
- consistent with how the same message would be handled in live (non-drain) mode

Fixes review comment #2914066766 on PR openclaw#41597.
… 2914517448

[2914066766 / 2914517448] Use resolveMentionGatingWithBypass in drain mention
gate instead of plain wasMentioned || implicitMention check. This preserves
the command bypass path so that slash commands (e.g. /status) bypass mention
gating during drain — matching the normal (non-drain) message processing path
in resolveTelegramInboundBody. The sender has already passed group access
checks at the drain guard entry point, so commandAuthorized=true is correct.

[2914404221] Fix replay overflow: when a session has >MAX_EVENTS pending
entries the skip notice consumes a queue slot, so the body must be capped at
REPLAY_CAP_PER_SESSION-1 to keep the total ≤ MAX_EVENTS. Previously toReplay
was sized to MAX_EVENTS and the skip notice made it MAX_EVENTS+1.

Add src/telegram/bot-handlers.test.ts to cover drain-time mention gating:
slash command bypass, regular message drop, explicit mention, implicit mention
(reply to bot), and no-drain pass-through.
…mention-gating path

The hasControlCommand call in the drain-time requireMention bypass check was
passing an invalid  field (which does not exist in CommandNormalizeOptions)
instead of the correct  from the Telegram bot context.

This caused hasControlCommand to call normalizeCommandBody without a botUsername,
meaning commands like /status@botname would not be stripped of the @mention
suffix before matching against known command aliases. The bot username was
already available in the same scope (event.ctx.me?.username?.toLowerCase()) and
is consistent with how the normal (non-drain) message processing path passes it.

Fixes review comment 2914717825.
P1 (#2915493782): Re-validate active turn before clearing to fix TOCTOU race.
The stale-turn recovery loop in server-startup.ts read all turns into a
snapshot, then called clearActiveTurn for each. If a new turn started under
the same sessionId after the snapshot was taken (startedAt check using
snapshot value would still pass since < processStartedAt), clearActiveTurn
would silently delete the live turn.

Fix: add readActiveTurn(stateDir, sessionId) to pending-inbound-store and
call it inside the recovery loop before clearing. If the on-disk startedAt
no longer matches the snapshot value, the entry was refreshed by a new turn
and is skipped.

P2 (#2915493785): Port audio-preflight bypass into drain-time mention gate.
The drain mention gate only checked text/caption/entities, so voice-only
group messages (no text, no caption) were always dropped even when mention
patterns are configured. In normal Telegram processing, audio-only messages
trigger preflight transcription before mention gating when mentionRegexes
exist — at drain time we cannot transcribe, so defer the check to replay.

Fix: inside the drainRequireMention block, detect voice/audio-only messages
(audio present, no user text) matching the needsPreflightTranscription
conditions (mentionRegexes.length > 0, !disableAudioPreflight) and skip
the drain-time gate, letting the message through to be replayed later when
the audio can be transcribed and mentions checked properly.

Tests: 5 new tests for readActiveTurn + TOCTOU guard in
pending-inbound-store.test.ts; 4 new tests for voice-only drain mention
preflight in bot-handlers.test.ts.
…#2915554218)

In multi-account Discord setups two bot accounts watching the same channel
both capture the same message.  The previous dedup key (channel:messageId)
caused the second capture to silently overwrite the first.

Fix:
- Add optional `accountId` field to PendingInboundEntry
- Update storeKey() to produce `channel:accountId:messageId` when
  accountId is present, keeping `channel:id` for channels without it
  (backward-compatible)
- Pass `accountId` from the Discord drain site in
  message-handler.process.ts so each account's capture gets its own slot

Tests:
- multi-account Discord: same message seen by two accounts stored independently
- same Discord account + same message still deduplicates (regression guard)
When the drain fast-path captures a Discord message before normal routing
runs, it was storing sessionKey: boundSessionKey ?? baseSessionKey.
Normal routing uses the broader chain:
  boundSessionKey ?? autoThreadContext?.SessionKey ?? threadKeys.sessionKey

This meant thread channels without an explicit binding could have their
drain-captured messages replayed into the parent/base session instead of
the correct thread session after restart.

Compute threadKeys (synchronous) and autoThreadContext (synchronous, but
null at drain time since no thread is created during drain) before the
drain guard and use the full resolution chain:
  boundSessionKey ?? drainAutoThreadContext?.SessionKey ?? drainThreadKeys.sessionKey ?? baseSessionKey

Also fix a duplicate import of ensureGlobalUndiciStreamTimeouts in
attempt.ts introduced by rebase conflict resolution.

Addresses review comment 2921255125 on PR openclaw#41597.
…ldest; persist voice/media

Three fixes for PR review comments:

1. writePendingInbound: reject new entries when store is at capacity (200)
   instead of silently dropping oldest messages. Dedup overwrites are still
   allowed. Returns false when rejected so callers can log a warning.
   (#2927515913)

2. writeActiveTurn: reject new entries when tracking map is at capacity (50)
   instead of evicting oldest live active turns. Overwrites of the same
   sessionId are still allowed. This prevents crash-recovery from missing
   turns that were evicted while still running. (#2927515915)

3. Telegram drain capture: persist voice, audio, video, video_note, photo,
   document, and sticker metadata in the queued payload so replay can
   recover media-only messages that have no text/caption. (#2928317074)
The text-only DM gate was executing unconditionally, causing unauthorized
senders with dmPolicy: pairing to be silently dropped before reaching
processInboundMessage/buildTelegramMessageContext where
enforceTelegramDmPolicy issues pairing challenges.

Wrap the short-circuit in isGatewayDraining() so it only fires when
actually draining.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 375927d06b

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/process/command-queue.ts Outdated
* Returns whether the gateway is currently draining for restart.
*/
export function isGatewayDraining(): boolean {
return gatewayDraining;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Return shared drain flag from isGatewayDraining

isGatewayDraining() currently returns gatewayDraining, but that identifier is never defined in this module. Any inbound path that calls the new drain guard (Telegram/Discord handlers) will throw a ReferenceError at runtime instead of a boolean, breaking message intake during normal operation and restart drain windows.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: isGatewayDraining() now reads from queueState.gatewayDraining instead of the undefined bare gatewayDraining symbol.

import { convertMarkdownTables } from "openclaw/plugin-sdk/text-runtime";
import { stripReasoningTagsFromText } from "openclaw/plugin-sdk/text-runtime";
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-runtime";
} from "../../channels/status-reactions.js";
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Restore valid module paths in Discord message handler

These new ../../... imports resolve from extensions/discord/src/monitor to extensions/discord/..., but those modules are not present there (for example channels/status-reactions.js exists under src/channels). This causes module resolution failure when loading the Discord monitor, so Discord inbound processing cannot start.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: all ../../ imports in message-handler.process.ts have been corrected to use openclaw/plugin-sdk/* paths (matching the pattern used by sister files like agent-components.ts). The two new modules not yet in plugin-sdk (writePendingInbound, isGatewayDraining) use ../../../../src/ relative paths until they're surfaced in plugin-sdk exports.

textMode: "html",
plainText: chunk.text,
linkPreview: params.linkPreview,
replyMarkup: shouldAttachButtons ? params.replyMarkup : undefined,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve silent delivery in chunked Telegram sends

The refactored chunk loops no longer pass silent into sendTelegramText, so replies requested as silent are sent with normal notifications. This changes user-visible behavior in muted setups (including follow-up and voice-fallback chunk paths in this file), producing unexpected pings whenever a response is chunked.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: restored silent: params.silent forwarding in all chunked send paths (deliverTextReply, sendPendingFollowUpText, and sendTelegramVoiceFallbackText).

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 6ff4edd927

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

type StatusReactionAdapter,
} from "openclaw/plugin-sdk/channel-runtime";
import { createTypingCallbacks } from "openclaw/plugin-sdk/channel-runtime";
} from "openclaw/plugin-sdk/channels/status-reactions";
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Revert to exported plugin-sdk imports

This import (and the other new openclaw/plugin-sdk/.../... imports in this file) points to subpaths that are not exported by package.json, so the module fails to load at runtime with ERR_PACKAGE_PATH_NOT_EXPORTED and Discord message handling cannot start. Fresh evidence: openclaw/plugin-sdk/channels/status-reactions, .../channels/typing, .../config/discord-preview-streaming, .../logger, and .../shared/text/reasoning-tags are all currently unexported, while the plugin loader aliasing only maps exported one-segment plugin-sdk/<entry> paths.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: replaced all 5 unexported openclaw/plugin-sdk/* deep subpaths with the same import patterns used by the original refactor commit (9ebe38b) and other files in extensions/discord/src/:

  • channels/status-reactionschannel-runtime (re-exports)
  • channels/typingchannel-runtime (re-exports)
  • config/discord-preview-streamingconfig-runtime (re-exports)
  • shared/text/reasoning-tagstext-runtime (re-exports)
  • openclaw/plugin-sdk/logger../../../../src/logger.js (not exported by plugin-sdk; matches telegram extension pattern)

Also restored 8 missing reply-runtime imports (createReplyDispatcherWithTyping, dispatchInboundMessage, resolveEnvelopeFormatOptions, formatInboundEnvelope, buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, resolveChunkMode, finalizeInboundContext, ReplyPayload type) that were dropped when the drain feature replaced the original import block.

…er.process.ts

Replace unexported openclaw/plugin-sdk/* deep subpaths with properly
exported plugin-sdk entries, matching patterns from the refactor commit
(9ebe38b) and other files in extensions/discord/src/:

- channels/status-reactions → channel-runtime (re-exports)
- channels/typing → channel-runtime (re-exports)
- config/discord-preview-streaming → config-runtime (re-exports)
- shared/text/reasoning-tags → text-runtime (re-exports)
- openclaw/plugin-sdk/logger → ../../../../src/logger.js (not in sdk)

Also restore missing reply-runtime imports (createReplyDispatcherWithTyping,
dispatchInboundMessage, resolveEnvelopeFormatOptions, formatInboundEnvelope,
buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled,
resolveChunkMode, finalizeInboundContext, ReplyPayload type) that were
dropped when the drain feature replaced the original import block.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f26f162080

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/gateway/server-startup.ts Outdated
Comment on lines +154 to +158
if (pending.length > 0) {
params.log.warn(`replaying ${pending.length} inbound message(s) captured during drain`);
// Consume-then-process: clear only inbound entries to prevent infinite retry on crash.
// Active turns remain intact in the shared file.
await clearPendingInboundEntries(stateDir);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Claim pending drain entries atomically before clearing

This replay block reads pending and then clears the store in a separate operation, so if another gateway process is still draining and writes a new entry between those two calls, clearPendingInboundEntries can erase that newly captured message before it is replayed. In restart overlap scenarios (or accidental dual-process runs), this reintroduces silent message loss in the exact path this feature is meant to protect; the read+consume step needs to be atomic under the same lock/queue.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: pending drain entries are now claimed atomically via a new claimPendingInboundEntries function that reads and clears entries in a single file-lock-protected operation. This replaces the separate readPendingInboundclearPendingInboundEntries sequence, preventing the race where a concurrent drain writer inserts a new entry between the read and clear calls, which would have been erased before replay.

@amittell
Copy link
Copy Markdown
Contributor Author

Superseded by #49407 with clean rebase onto current main

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 52201932a1

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

writePendingInbound: vi.fn<(...args: unknown[]) => Promise<void>>(() => Promise.resolve()),
}));

vi.mock("../process/command-queue.js", async (importOriginal) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Mock the module specifiers used by bot-handlers

Update these vi.mock paths to match the actual imports in bot-handlers.ts; right now the test mocks ../process/command-queue.js (and similarly ../infra/pending-inbound-store.js / ../config/paths.js), but production code imports ../../../src/process/command-queue.js, ../../../src/infra/pending-inbound-store.js, and openclaw/plugin-sdk/state-paths. Because the specifiers differ, the hoisted stubs are not applied, so drain-gating assertions execute against real modules and can fail or pass for the wrong reason.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the latest rebase. The test mocks now target the actual module specifiers used by bot-handlers.runtime.ts: openclaw/plugin-sdk/process-runtime, openclaw/plugin-sdk/infra-runtime, and openclaw/plugin-sdk/state-paths — matching the plugin-sdk imports in the production code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agents Agent runtime and tooling channel: discord Channel integration: discord channel: telegram Channel integration: telegram gateway Gateway runtime size: XL

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants