Skip to content

Make adapters depend on chat as a real dep#289

Merged
cramforce merged 1 commit intomainfrom
deb-alignment
Mar 23, 2026
Merged

Make adapters depend on chat as a real dep#289
cramforce merged 1 commit intomainfrom
deb-alignment

Conversation

@cramforce
Copy link
Collaborator

Without this, changeset will make any dep change a major change

Without this, changeset will make any dep change a major change
@vercel
Copy link
Contributor

vercel bot commented Mar 23, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
chat Ready Ready Preview, Comment, Open in v0 Mar 23, 2026 2:55pm
chat-sdk-nextjs-chat Ready Ready Preview, Comment, Open in v0 Mar 23, 2026 2:55pm

@cramforce cramforce merged commit d778f72 into main Mar 23, 2026
11 checks passed
@cramforce cramforce deleted the deb-alignment branch March 23, 2026 14:57
cramforce added a commit that referenced this pull request Mar 23, 2026
Without this, changeset will make any dep change a major change
cramforce added a commit that referenced this pull request Mar 24, 2026
…ounce, concurrent) (#277)

* feat: add concurrency strategies for overlapping messages (queue, debounce, concurrent)

## Problem

When multiple messages arrive on the same thread while a handler is still
processing, the SDK has only one behavior: **lock-and-drop**. The incoming
message is silently discarded (or force-released, which creates uncontrolled
concurrency). This is insufficient for most real-world use cases:

- **AI chatbots** lose user follow-up messages sent while the model is streaming
- **Customer support bots** miss messages entirely, breaking conversation flow
- **Collaborative editing bots** need to coalesce rapid corrections into one action

## Solution

Introduce a new `concurrency` option on `ChatConfig` with four strategies:

### `'drop'` (default, backward-compatible)

Existing behavior. Lock acquired or `LockError` thrown. No changes.

### `'queue'`

Messages that arrive while a handler is running are enqueued in the state
adapter. When the current handler finishes, the queue is drained: **only the
latest message is dispatched**, with all intermediate messages provided as
`context.skipped`. This gives the handler full visibility into what happened
while it was busy, without forcing it to re-process every message sequentially.

```typescript
const chat = new Chat({
  concurrency: 'queue',
  // ...
});

chat.onNewMention(async (thread, message, context) => {
  if (context && context.skipped.length > 0) {
    // "You sent 4 messages while I was thinking. Responding to your latest."
    const allMessages = [...context.skipped, message];
    // Pass all messages to the LLM for full context
  }
});
```

Flow:
```
A arrives  → acquire lock → process A
B arrives  → lock busy → enqueue B
C arrives  → lock busy → enqueue C
D arrives  → lock busy → enqueue D
A done     → drain: [B, C, D] → handler(D, { skipped: [B, C] })
D done     → queue empty → release lock
```

### `'debounce'`

Every message (including the first) starts or resets a debounce timer. Only the
**final message in a burst** is processed. The lock-holding function stays alive
through `waitUntil` during the debounce window.

```typescript
const chat = new Chat({
  concurrency: { strategy: 'debounce', debounceMs: 1500 },
  // ...
});
```

Flow:
```
A arrives  → acquire lock → store A as pending → sleep(debounceMs)
B arrives  → lock busy → overwrite pending with B (A dropped)
C arrives  → lock busy → overwrite pending with C (B dropped)
             ... debounceMs elapses with no new message ...
           → process C → release lock
```

### `'concurrent'`

No locking at all. Every message is processed immediately in its own handler
invocation. Suitable for stateless handlers (lookups, translations) where
thread ordering doesn't matter.

```typescript
const chat = new Chat({
  concurrency: 'concurrent',
  // ...
});
```

## API Surface

### ChatConfig

```typescript
interface ChatConfig {
  concurrency?: ConcurrencyStrategy | ConcurrencyConfig;
  /** @deprecated Use `concurrency` instead */
  onLockConflict?: 'force' | 'drop' | ((threadId, message) => ...);
}

type ConcurrencyStrategy = 'drop' | 'queue' | 'debounce' | 'concurrent';

interface ConcurrencyConfig {
  strategy: ConcurrencyStrategy;
  maxQueueSize?: number;           // Default: 10
  onQueueFull?: 'drop-oldest' | 'drop-newest';  // Default: 'drop-oldest'
  queueEntryTtlMs?: number;        // Default: 90_000 (90s)
  debounceMs?: number;              // Default: 1500
  maxConcurrent?: number;           // Default: Infinity
}
```

### MessageContext (new, passed to handlers)

```typescript
interface MessageContext {
  skipped: Message[];               // Intermediate messages, chronological
  totalSinceLastHandler: number;    // skipped.length + 1
}
```

All handler types (`MentionHandler`, `MessageHandler`, `SubscribedMessageHandler`,
`DirectMessageHandler`) now accept an optional `MessageContext` as their last
parameter. Existing handlers that don't use it are unaffected.

### StateAdapter (new methods)

```typescript
interface StateAdapter {
  enqueue(threadId: string, entry: QueueEntry, maxSize: number): Promise<number>;
  dequeue(threadId: string): Promise<QueueEntry | null>;
  queueDepth(threadId: string): Promise<number>;
}
```

Implemented across all four state adapters:
- **MemoryStateAdapter**: in-process array
- **RedisStateAdapter**: Lua script (RPUSH + LTRIM + PEXPIRE)
- **IoRedisStateAdapter**: same Lua approach
- **PostgresStateAdapter**: new `chat_state_queues` table with atomic dequeue

## Architecture

`handleIncomingMessage` was refactored into composable pieces:

- `dispatchToHandlers()` — shared handler dispatch logic (mention detection,
  subscription routing, pattern matching). Extracted from the old monolithic
  method so all strategies reuse it.
- `handleDrop()` — original lock-or-fail path (preserves `onLockConflict` compat)
- `handleQueueOrDebounce()` — enqueue if busy, drain or debounce after
- `handleConcurrent()` — skip locking entirely
- `drainQueue()` — collect all pending, dispatch latest with skipped context
- `debounceLoop()` — sleep/check/repeat until no new messages arrive

## Queue Entry TTL

Queued messages have a configurable TTL (`queueEntryTtlMs`, default 90s). Stale
entries are discarded on dequeue with a `message-expired` log event. This
prevents unbounded accumulation and ensures handlers don't process messages
that are no longer relevant.

## Observability

All strategies emit structured log events at `info` level:

| Event                 | Strategy         | Data                                  |
|-----------------------|------------------|---------------------------------------|
| `message-queued`      | queue            | threadId, messageId, queueDepth       |
| `message-dequeued`    | queue, debounce  | threadId, messageId, skippedCount     |
| `message-dropped`     | drop, queue      | threadId, messageId, reason           |
| `message-expired`     | queue, debounce  | threadId, messageId                   |
| `message-superseded`  | debounce         | threadId, droppedId                   |
| `message-debouncing`  | debounce         | threadId, messageId, debounceMs       |
| `message-debounce-reset` | debounce      | threadId, messageId                   |

## Backward Compatibility

- Default remains `'drop'` — zero breaking changes for existing users
- `onLockConflict` continues to work but is marked `@deprecated`
- Handler signatures are backward-compatible (new `context` param is optional)
- Deduplication always runs regardless of strategy

## Files Changed

- `packages/chat/src/types.ts` — new types, updated handler signatures
- `packages/chat/src/chat.ts` — strategy routing, drain/debounce loops
- `packages/chat/src/index.ts` — export new types
- `packages/chat/src/mock-adapter.ts` — queue methods for test mock
- `packages/state-memory/src/index.ts` — in-memory queue
- `packages/state-redis/src/index.ts` — Redis queue (Lua)
- `packages/state-ioredis/src/index.ts` — ioredis queue (Lua)
- `packages/state-pg/src/index.ts` — Postgres queue table
- `packages/chat/src/chat.test.ts` — tests for queue, debounce, concurrent

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test: comprehensive test coverage for concurrency strategies and queue methods

Add tests across all state adapters and the Chat class:

**MemoryStateAdapter** (8 new tests):
- enqueue/dequeue single entry
- dequeue from empty queue returns null
- dequeue from nonexistent thread returns null
- queueDepth returns 0 for empty queue
- FIFO ordering across multiple entries
- maxSize trimming (keeps newest)
- maxSize=1 debounce behavior (last-write-wins)
- queue isolation by thread
- queue cleared on disconnect

**PostgresStateAdapter** (8 new tests):
- INSERT query for enqueue
- overflow trimming query
- depth return value
- parsed entry from dequeue
- null from empty dequeue
- atomic DELETE-RETURNING for dequeue
- queueDepth return value
- zero depth for empty queue

**RedisStateAdapter / IoRedisStateAdapter** (3+3 existence checks):
- enqueue, dequeue, queueDepth method existence

**Chat concurrency** (5 new tests):
- drop-newest policy when queue is full
- drop-oldest policy evicts oldest entries
- expired entries skipped during drain
- onNewMessage pattern handlers receive context
- onSubscribedMessage handlers receive skipped context

Total new tests: 27 (780 chat + 33 memory + 59 pg)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Address feedback

* Support a channel locking strategy, make it default for WhatsApp and Telegram

* docs: fix typo "Committment" → "Commitment" (#274)

Co-authored-by: Arif Kobel <arif.kobel@phorax.com>

* Add webhook verification to GChat (#287)

- Issues a warning if required env vars are not present (also for telegram)
- Makes telegram use a time-safe verifier

* Make adapters depend on `chat` as a real dep (#289)

Without this, changeset will make any dep change a major change

* Version Packages (#290)

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* Fix serialization

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Arif Kobel <102538661+ArifKobel@users.noreply.github.com>
Co-authored-by: Arif Kobel <arif.kobel@phorax.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant