Merged
Conversation
Contributor
|
Someone is attempting to deploy a commit to the Vercel Labs Team on Vercel. A member of the Team first needs to authorize it. |
visyat
approved these changes
Mar 21, 2026
cramforce
pushed a commit
that referenced
this pull request
Mar 23, 2026
Co-authored-by: Arif Kobel <arif.kobel@phorax.com>
cramforce
added a commit
that referenced
this pull request
Mar 24, 2026
…ounce, concurrent) (#277) * feat: add concurrency strategies for overlapping messages (queue, debounce, concurrent) ## Problem When multiple messages arrive on the same thread while a handler is still processing, the SDK has only one behavior: **lock-and-drop**. The incoming message is silently discarded (or force-released, which creates uncontrolled concurrency). This is insufficient for most real-world use cases: - **AI chatbots** lose user follow-up messages sent while the model is streaming - **Customer support bots** miss messages entirely, breaking conversation flow - **Collaborative editing bots** need to coalesce rapid corrections into one action ## Solution Introduce a new `concurrency` option on `ChatConfig` with four strategies: ### `'drop'` (default, backward-compatible) Existing behavior. Lock acquired or `LockError` thrown. No changes. ### `'queue'` Messages that arrive while a handler is running are enqueued in the state adapter. When the current handler finishes, the queue is drained: **only the latest message is dispatched**, with all intermediate messages provided as `context.skipped`. This gives the handler full visibility into what happened while it was busy, without forcing it to re-process every message sequentially. ```typescript const chat = new Chat({ concurrency: 'queue', // ... }); chat.onNewMention(async (thread, message, context) => { if (context && context.skipped.length > 0) { // "You sent 4 messages while I was thinking. Responding to your latest." const allMessages = [...context.skipped, message]; // Pass all messages to the LLM for full context } }); ``` Flow: ``` A arrives → acquire lock → process A B arrives → lock busy → enqueue B C arrives → lock busy → enqueue C D arrives → lock busy → enqueue D A done → drain: [B, C, D] → handler(D, { skipped: [B, C] }) D done → queue empty → release lock ``` ### `'debounce'` Every message (including the first) starts or resets a debounce timer. Only the **final message in a burst** is processed. The lock-holding function stays alive through `waitUntil` during the debounce window. ```typescript const chat = new Chat({ concurrency: { strategy: 'debounce', debounceMs: 1500 }, // ... }); ``` Flow: ``` A arrives → acquire lock → store A as pending → sleep(debounceMs) B arrives → lock busy → overwrite pending with B (A dropped) C arrives → lock busy → overwrite pending with C (B dropped) ... debounceMs elapses with no new message ... → process C → release lock ``` ### `'concurrent'` No locking at all. Every message is processed immediately in its own handler invocation. Suitable for stateless handlers (lookups, translations) where thread ordering doesn't matter. ```typescript const chat = new Chat({ concurrency: 'concurrent', // ... }); ``` ## API Surface ### ChatConfig ```typescript interface ChatConfig { concurrency?: ConcurrencyStrategy | ConcurrencyConfig; /** @deprecated Use `concurrency` instead */ onLockConflict?: 'force' | 'drop' | ((threadId, message) => ...); } type ConcurrencyStrategy = 'drop' | 'queue' | 'debounce' | 'concurrent'; interface ConcurrencyConfig { strategy: ConcurrencyStrategy; maxQueueSize?: number; // Default: 10 onQueueFull?: 'drop-oldest' | 'drop-newest'; // Default: 'drop-oldest' queueEntryTtlMs?: number; // Default: 90_000 (90s) debounceMs?: number; // Default: 1500 maxConcurrent?: number; // Default: Infinity } ``` ### MessageContext (new, passed to handlers) ```typescript interface MessageContext { skipped: Message[]; // Intermediate messages, chronological totalSinceLastHandler: number; // skipped.length + 1 } ``` All handler types (`MentionHandler`, `MessageHandler`, `SubscribedMessageHandler`, `DirectMessageHandler`) now accept an optional `MessageContext` as their last parameter. Existing handlers that don't use it are unaffected. ### StateAdapter (new methods) ```typescript interface StateAdapter { enqueue(threadId: string, entry: QueueEntry, maxSize: number): Promise<number>; dequeue(threadId: string): Promise<QueueEntry | null>; queueDepth(threadId: string): Promise<number>; } ``` Implemented across all four state adapters: - **MemoryStateAdapter**: in-process array - **RedisStateAdapter**: Lua script (RPUSH + LTRIM + PEXPIRE) - **IoRedisStateAdapter**: same Lua approach - **PostgresStateAdapter**: new `chat_state_queues` table with atomic dequeue ## Architecture `handleIncomingMessage` was refactored into composable pieces: - `dispatchToHandlers()` — shared handler dispatch logic (mention detection, subscription routing, pattern matching). Extracted from the old monolithic method so all strategies reuse it. - `handleDrop()` — original lock-or-fail path (preserves `onLockConflict` compat) - `handleQueueOrDebounce()` — enqueue if busy, drain or debounce after - `handleConcurrent()` — skip locking entirely - `drainQueue()` — collect all pending, dispatch latest with skipped context - `debounceLoop()` — sleep/check/repeat until no new messages arrive ## Queue Entry TTL Queued messages have a configurable TTL (`queueEntryTtlMs`, default 90s). Stale entries are discarded on dequeue with a `message-expired` log event. This prevents unbounded accumulation and ensures handlers don't process messages that are no longer relevant. ## Observability All strategies emit structured log events at `info` level: | Event | Strategy | Data | |-----------------------|------------------|---------------------------------------| | `message-queued` | queue | threadId, messageId, queueDepth | | `message-dequeued` | queue, debounce | threadId, messageId, skippedCount | | `message-dropped` | drop, queue | threadId, messageId, reason | | `message-expired` | queue, debounce | threadId, messageId | | `message-superseded` | debounce | threadId, droppedId | | `message-debouncing` | debounce | threadId, messageId, debounceMs | | `message-debounce-reset` | debounce | threadId, messageId | ## Backward Compatibility - Default remains `'drop'` — zero breaking changes for existing users - `onLockConflict` continues to work but is marked `@deprecated` - Handler signatures are backward-compatible (new `context` param is optional) - Deduplication always runs regardless of strategy ## Files Changed - `packages/chat/src/types.ts` — new types, updated handler signatures - `packages/chat/src/chat.ts` — strategy routing, drain/debounce loops - `packages/chat/src/index.ts` — export new types - `packages/chat/src/mock-adapter.ts` — queue methods for test mock - `packages/state-memory/src/index.ts` — in-memory queue - `packages/state-redis/src/index.ts` — Redis queue (Lua) - `packages/state-ioredis/src/index.ts` — ioredis queue (Lua) - `packages/state-pg/src/index.ts` — Postgres queue table - `packages/chat/src/chat.test.ts` — tests for queue, debounce, concurrent Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: comprehensive test coverage for concurrency strategies and queue methods Add tests across all state adapters and the Chat class: **MemoryStateAdapter** (8 new tests): - enqueue/dequeue single entry - dequeue from empty queue returns null - dequeue from nonexistent thread returns null - queueDepth returns 0 for empty queue - FIFO ordering across multiple entries - maxSize trimming (keeps newest) - maxSize=1 debounce behavior (last-write-wins) - queue isolation by thread - queue cleared on disconnect **PostgresStateAdapter** (8 new tests): - INSERT query for enqueue - overflow trimming query - depth return value - parsed entry from dequeue - null from empty dequeue - atomic DELETE-RETURNING for dequeue - queueDepth return value - zero depth for empty queue **RedisStateAdapter / IoRedisStateAdapter** (3+3 existence checks): - enqueue, dequeue, queueDepth method existence **Chat concurrency** (5 new tests): - drop-newest policy when queue is full - drop-oldest policy evicts oldest entries - expired entries skipped during drain - onNewMessage pattern handlers receive context - onSubscribedMessage handlers receive skipped context Total new tests: 27 (780 chat + 33 memory + 59 pg) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Address feedback * Support a channel locking strategy, make it default for WhatsApp and Telegram * docs: fix typo "Committment" → "Commitment" (#274) Co-authored-by: Arif Kobel <arif.kobel@phorax.com> * Add webhook verification to GChat (#287) - Issues a warning if required env vars are not present (also for telegram) - Makes telegram use a time-safe verifier * Make adapters depend on `chat` as a real dep (#289) Without this, changeset will make any dep change a major change * Version Packages (#290) Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Fix serialization --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Arif Kobel <102538661+ArifKobel@users.noreply.github.com> Co-authored-by: Arif Kobel <arif.kobel@phorax.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixed typo in the contributing guide: "Committment" → "Commitment"
Type
Documentation fix
Files changed
apps/docs/content/docs/contributing/building.mdx