Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cody: Bot response multiplexer publishes, then completes turns #70

Merged
merged 1 commit into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions lib/shared/src/chat/bot-response-multiplexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,47 @@ and a donut
assert.deepStrictEqual(rowTopic, ['S, V F X'])
assert.deepStrictEqual(cellTopic, ['variety', 'limburger'])
})

it('can handle slow consumers', async () => {
const multiplexer = new BotResponseMultiplexer()

const defaultTopic: string[] = []
multiplexer.sub(BotResponseMultiplexer.DEFAULT_TOPIC, {
onResponse(content) {
defaultTopic.push(content)
return Promise.resolve()
},
onTurnComplete() {
return Promise.resolve()
},
})

// This consumer is "slow" to digest messages and delays responses
// until after the LLM is finished.
const foodTopic: string[] = []
let finishedEating = () => {}
const foodPromise: Promise<void> = new Promise(resolve => {
finishedEating = resolve
multiplexer.sub('food', {
onResponse(content) {
foodTopic.push(content)
return foodPromise
},
onTurnComplete() {
foodTopic.push(' ...BURP!')
return Promise.resolve()
},
})
})

// eslint-disable-next-line no-void
void multiplexer.publish("Tonight's menu: <")
// eslint-disable-next-line no-void
void multiplexer.publish('food>hamburger\ndonuts</food>')
const turnDone = multiplexer.notifyTurnComplete()
finishedEating()
await turnDone
assert.deepStrictEqual(defaultTopic, ["Tonight's menu: "])
assert.deepStrictEqual(foodTopic, ['hamburger\ndonuts', ' ...BURP!'])
})
})
20 changes: 14 additions & 6 deletions lib/shared/src/chat/bot-response-multiplexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export class BotResponseMultiplexer {

// Buffers responses until topics can be parsed
private buffer_ = ''
private publishInProgress_ = Promise.resolve()

/**
* Subscribes to a topic in the bot response. Each topic can have only one subscriber at a time. New subscribers overwrite old ones.
Expand All @@ -122,6 +123,9 @@ export class BotResponseMultiplexer {
* Notifies all subscribers that the bot response is complete.
*/
public async notifyTurnComplete(): Promise<void> {
// Ensure any existing publishing is done.
await this.publishInProgress_

// Flush buffered content, if any
if (this.buffer_) {
const content = this.buffer_
Expand All @@ -142,11 +146,15 @@ export class BotResponseMultiplexer {
*
* @param response the text of the next incremental response from the bot.
*/
public async publish(response: string): Promise<void> {
// This is basically a loose parser of an XML-like language which forwards
// incremental content to subscribers which handle specific tags. The parser
// is forgiving if tags are not closed in the right order.
public publish(response: string): Promise<void> {
// If an existing publication hasn't finished, convoy behind that one.
return (this.publishInProgress_ = this.publishInProgress_.then(() => this.publishStep(response)))
abeatrix marked this conversation as resolved.
Show resolved Hide resolved
}

// This is basically a loose parser of an XML-like language which forwards
// incremental content to subscribers which handle specific tags. The parser
// is forgiving if tags are not closed in the right order.
private async publishStep(response: string): Promise<void> {
this.buffer_ += response
let last
while (this.buffer_) {
Expand Down Expand Up @@ -207,8 +215,8 @@ export class BotResponseMultiplexer {

// Publishes the content of `buffer_` up to `index` in the current topic. Discards the published content.
private publishBufferUpTo(index: number): Promise<void> {
let content
;[content, this.buffer_] = splitAt(this.buffer_, index)
const [content, remaining] = splitAt(this.buffer_, index)
this.buffer_ = remaining
return this.publishInTopic(this.currentTopic, content)
}

Expand Down
Loading