From d12bfe45f2c8d01277a1f8186844f0dd3598cbd7 Mon Sep 17 00:00:00 2001 From: James Sumners Date: Wed, 29 Nov 2023 12:12:39 -0500 Subject: [PATCH] chore: Improve OpenAI mock server streams (#1890) --- test/versioned/openai/chat-completions.tap.js | 2 +- test/versioned/openai/mock-server.js | 54 +++++++++++++------ 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/test/versioned/openai/chat-completions.tap.js b/test/versioned/openai/chat-completions.tap.js index 449d59835..f9dfa9bd3 100644 --- a/test/versioned/openai/chat-completions.tap.js +++ b/test/versioned/openai/chat-completions.tap.js @@ -233,7 +233,7 @@ tap.test('OpenAI instrumentation - chat completions', (t) => { // are asserted in other tests t.match(tx.exceptions[0], { customAttributes: { - 'error.message': '"exceeded count"', + 'error.message': 'Premature close', 'completion_id': /\w{32}/ } }) diff --git a/test/versioned/openai/mock-server.js b/test/versioned/openai/mock-server.js index e43333f0b..7f03f7ba4 100644 --- a/test/versioned/openai/mock-server.js +++ b/test/versioned/openai/mock-server.js @@ -76,8 +76,21 @@ function handler(req, res) { // OpenAI streamed responses are double newline delimited lines that // are prefixed with the string `data: `. The end of the stream is // terminated with a `done: [DONE]` string. - const outStream = - streamData !== 'do random' ? goodStream(streamData, { ...body }) : badStream({ ...body }) + let outStream + if (streamData !== 'do random') { + outStream = finiteStream(streamData, { ...body }) + } else { + outStream = randomStream({ ...body }) + let streamChunkCount = 0 + outStream.on('data', () => { + if (streamChunkCount >= 100) { + outStream.destroy() + res.destroy() + } + streamChunkCount += 1 + }) + } + outStream.pipe(res) } else { res.write(JSON.stringify(body)) @@ -86,7 +99,17 @@ function handler(req, res) { }) } -function goodStream(dataToStream, chunkTemplate) { +/** + * Splits the provided `dataToStream` into chunks and returns a stream that + * sends those chunks as OpenAI data stream messages. This stream has a finite + * number of messages that will be sent. + * + * @param {string} dataToStream A fairly long string to split on space chars. + * @param {object} chunkTemplate An object that is shaped like an OpenAI stream + * data object. + * @returns {Readable} A paused stream. + */ +function finiteStream(dataToStream, chunkTemplate) { const parts = dataToStream.split(' ') let i = 0 return new Readable({ @@ -105,28 +128,25 @@ function goodStream(dataToStream, chunkTemplate) { this.push(null) } } - }) + }).pause() } -function badStream(chunkTemplate) { - let count = 0 +/** + * Creates a stream that will stream an infinite number of OpenAI stream data + * chunks. + * + * @param {object} chunkTemplate An object that is shaped like an OpenAI stream + * data object. + * @returns {Readable} A paused stream. + */ +function randomStream(chunkTemplate) { return new Readable({ read(size = 16) { - if (count > 100) { - // something is up with OpenAI - // you shouldn't have to do this. a throw would be enough - chunkTemplate.error = 'exceeded count' - this.push('data: ' + JSON.stringify(chunkTemplate) + '\n\n') - this.push(null) - return - } - const data = crypto.randomBytes(size) chunkTemplate.choices[0].delta.content = data.toString('base64') this.push('data: ' + JSON.stringify(chunkTemplate) + '\n\n') - count += 1 } - }) + }).pause() } function getShortenedPrompt(reqBody) {