diff --git a/.release-please-manifest.json b/.release-please-manifest.json index 13708fa..64f3cdd 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,3 +1,3 @@ { - ".": "0.7.1" + ".": "0.8.0" } diff --git a/.stats.yml b/.stats.yml index bb12a99..eb05b46 100644 --- a/.stats.yml +++ b/.stats.yml @@ -1,4 +1,4 @@ configured_endpoints: 5 openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/perplexity-ai%2Fperplexity-aba0c21c569842e93e17b69cae9cee58d389fce9c2482f4d251fd8727db05679.yml openapi_spec_hash: 3d01b1c1425f7d43a8acf8b99bb9b321 -config_hash: 43831e7f153f67b8f23f7091d14db066 +config_hash: 0be7520657a7a0fb6b5a839e716fe30c diff --git a/CHANGELOG.md b/CHANGELOG.md index 031dedd..1db148d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 0.8.0 (2025-10-02) + +Full Changelog: [v0.7.1...v0.8.0](https://github.com/perplexityai/perplexity-node/compare/v0.7.1...v0.8.0) + +### Features + +* **api:** manual updates ([743c3aa](https://github.com/perplexityai/perplexity-node/commit/743c3aad0ea1021edca19f345c67c33665eaf074)) + ## 0.7.1 (2025-10-01) Full Changelog: [v0.7.0...v0.7.1](https://github.com/perplexityai/perplexity-node/compare/v0.7.0...v0.7.1) diff --git a/README.md b/README.md index 3f10fa1..433dc08 100644 --- a/README.md +++ b/README.md @@ -50,12 +50,12 @@ const client = new Perplexity({ apiKey: process.env['PERPLEXITY_API_KEY'], // This is the default and can be omitted }); -const completion = await client.chat.completions.create({ +const streamChunk = await client.chat.completions.create({ messages: [{ role: 'user', content: 'Tell me about the latest developments in AI' }], model: 'sonar', }); -console.log(completion.choices[0].message.content); +console.log(streamChunk.content); ``` ### Advanced Search Features @@ -139,6 +139,28 @@ const localSearch = await client.search.create({ }); ``` +## Streaming responses + +We provide support for streaming responses using Server Sent Events (SSE). + +```ts +import Perplexity from '@perplexity-ai/perplexity_ai'; + +const client = new Perplexity(); + +const stream = await client.chat.completions.create({ + messages: [{ role: 'user', content: 'What is the capital of France?' }], + model: 'sonar', + stream: true, +}); +for await (const streamChunk of stream) { + console.log(streamChunk.id); +} +``` + +If you need to cancel a stream, you can `break` from the loop +or call `stream.controller.abort()`. + ### Request & Response types This library includes TypeScript definitions for all request params and response fields. You may import and use them like so: @@ -169,7 +191,7 @@ const chatParams: Perplexity.Chat.CompletionCreateParams = { messages: [{ role: 'user', content: 'What is the capital of France?' }], model: 'sonar', }; -const chatResponse: Perplexity.Chat.CompletionCreateResponse = await client.chat.completions.create(chatParams); +const streamChunk: Perplexity.StreamChunk = await client.chat.completions.create(chatParams); ``` Documentation for each method, request param, and response field are available in docstrings and will appear on hover in most modern editors. @@ -195,7 +217,7 @@ const search = await client.search }); // Chat completions error handling -const completion = await client.chat.completions +const streamChunk = await client.chat.completions .create({ messages: [{ role: 'user', content: 'What is the capital of France?' }], model: 'sonar' }) .catch(async (err) => { if (err instanceof Perplexity.APIError) { @@ -302,11 +324,11 @@ const chatResponse = await client.chat.completions console.log(chatResponse.headers.get('X-My-Header')); console.log(chatResponse.statusText); // access the underlying Response object -const { data: completion, response: rawChatResponse } = await client.chat.completions +const { data: streamChunk, response: rawChatResponse } = await client.chat.completions .create({ messages: [{ role: 'user', content: 'What is the capital of France?' }], model: 'sonar' }) .withResponse(); console.log(rawChatResponse.headers.get('X-My-Header')); -console.log(completion.id); +console.log(streamChunk.id); ``` ### Logging diff --git a/api.md b/api.md index ba68caf..bd3164f 100644 --- a/api.md +++ b/api.md @@ -10,15 +10,15 @@ Types: # Chat -## Completions - Types: -- CompletionCreateResponse +- StreamChunk + +## Completions Methods: -- client.chat.completions.create({ ...params }) -> CompletionCreateResponse +- client.chat.completions.create({ ...params }) -> StreamChunk # Async diff --git a/package.json b/package.json index 78661bf..6bc9cf3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@perplexity-ai/perplexity_ai", - "version": "0.7.1", + "version": "0.8.0", "description": "The official TypeScript library for the Perplexity API", "author": "Perplexity <>", "types": "dist/index.d.ts", diff --git a/src/client.ts b/src/client.ts index 75d82a3..c450d7b 100644 --- a/src/client.ts +++ b/src/client.ts @@ -18,7 +18,7 @@ import * as API from './resources/index'; import { APIPromise } from './core/api-promise'; import { Search, SearchCreateParams, SearchCreateResponse } from './resources/search'; import { Async } from './resources/async/async'; -import { Chat } from './resources/chat/chat'; +import { Chat, StreamChunk } from './resources/chat/chat'; import { type Fetch } from './internal/builtin-types'; import { HeadersLike, NullableHeaders, buildHeaders } from './internal/headers'; import { FinalRequestOptions, RequestOptions } from './internal/request-options'; @@ -728,7 +728,7 @@ Perplexity.Search = Search; export declare namespace Perplexity { export type RequestOptions = Opts.RequestOptions; - export { Chat as Chat }; + export { Chat as Chat, type StreamChunk as StreamChunk }; export { Async as Async }; diff --git a/src/core/streaming.ts b/src/core/streaming.ts new file mode 100644 index 0000000..4a04d77 --- /dev/null +++ b/src/core/streaming.ts @@ -0,0 +1,331 @@ +import { PerplexityError } from './error'; +import { type ReadableStream } from '../internal/shim-types'; +import { makeReadableStream } from '../internal/shims'; +import { findDoubleNewlineIndex, LineDecoder } from '../internal/decoders/line'; +import { ReadableStreamToAsyncIterable } from '../internal/shims'; +import { isAbortError } from '../internal/errors'; +import { safeJSON } from '../internal/utils/values'; +import { encodeUTF8 } from '../internal/utils/bytes'; +import { loggerFor } from '../internal/utils/log'; +import type { Perplexity } from '../client'; + +import { APIError } from './error'; + +type Bytes = string | ArrayBuffer | Uint8Array | null | undefined; + +export type ServerSentEvent = { + event: string | null; + data: string; + raw: string[]; +}; + +export class Stream implements AsyncIterable { + controller: AbortController; + #client: Perplexity | undefined; + + constructor( + private iterator: () => AsyncIterator, + controller: AbortController, + client?: Perplexity, + ) { + this.controller = controller; + this.#client = client; + } + + static fromSSEResponse( + response: Response, + controller: AbortController, + client?: Perplexity, + ): Stream { + let consumed = false; + const logger = client ? loggerFor(client) : console; + + async function* iterator(): AsyncIterator { + if (consumed) { + throw new PerplexityError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.'); + } + consumed = true; + let done = false; + try { + for await (const sse of _iterSSEMessages(response, controller)) { + if (done) continue; + + if (sse.data.startsWith('[DONE]')) { + done = true; + continue; + } + + if (sse.event === 'error') { + throw new APIError(undefined, safeJSON(sse.data) ?? sse.data, undefined, response.headers); + } + + if (sse.event === null) { + try { + yield JSON.parse(sse.data); + } catch (e) { + logger.error(`Could not parse message into JSON:`, sse.data); + logger.error(`From chunk:`, sse.raw); + throw e; + } + } + } + done = true; + } catch (e) { + // If the user calls `stream.controller.abort()`, we should exit without throwing. + if (isAbortError(e)) return; + throw e; + } finally { + // If the user `break`s, abort the ongoing request. + if (!done) controller.abort(); + } + } + + return new Stream(iterator, controller, client); + } + + /** + * Generates a Stream from a newline-separated ReadableStream + * where each item is a JSON value. + */ + static fromReadableStream( + readableStream: ReadableStream, + controller: AbortController, + client?: Perplexity, + ): Stream { + let consumed = false; + + async function* iterLines(): AsyncGenerator { + const lineDecoder = new LineDecoder(); + + const iter = ReadableStreamToAsyncIterable(readableStream); + for await (const chunk of iter) { + for (const line of lineDecoder.decode(chunk)) { + yield line; + } + } + + for (const line of lineDecoder.flush()) { + yield line; + } + } + + async function* iterator(): AsyncIterator { + if (consumed) { + throw new PerplexityError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.'); + } + consumed = true; + let done = false; + try { + for await (const line of iterLines()) { + if (done) continue; + if (line) yield JSON.parse(line); + } + done = true; + } catch (e) { + // If the user calls `stream.controller.abort()`, we should exit without throwing. + if (isAbortError(e)) return; + throw e; + } finally { + // If the user `break`s, abort the ongoing request. + if (!done) controller.abort(); + } + } + + return new Stream(iterator, controller, client); + } + + [Symbol.asyncIterator](): AsyncIterator { + return this.iterator(); + } + + /** + * Splits the stream into two streams which can be + * independently read from at different speeds. + */ + tee(): [Stream, Stream] { + const left: Array>> = []; + const right: Array>> = []; + const iterator = this.iterator(); + + const teeIterator = (queue: Array>>): AsyncIterator => { + return { + next: () => { + if (queue.length === 0) { + const result = iterator.next(); + left.push(result); + right.push(result); + } + return queue.shift()!; + }, + }; + }; + + return [ + new Stream(() => teeIterator(left), this.controller, this.#client), + new Stream(() => teeIterator(right), this.controller, this.#client), + ]; + } + + /** + * Converts this stream to a newline-separated ReadableStream of + * JSON stringified values in the stream + * which can be turned back into a Stream with `Stream.fromReadableStream()`. + */ + toReadableStream(): ReadableStream { + const self = this; + let iter: AsyncIterator; + + return makeReadableStream({ + async start() { + iter = self[Symbol.asyncIterator](); + }, + async pull(ctrl: any) { + try { + const { value, done } = await iter.next(); + if (done) return ctrl.close(); + + const bytes = encodeUTF8(JSON.stringify(value) + '\n'); + + ctrl.enqueue(bytes); + } catch (err) { + ctrl.error(err); + } + }, + async cancel() { + await iter.return?.(); + }, + }); + } +} + +export async function* _iterSSEMessages( + response: Response, + controller: AbortController, +): AsyncGenerator { + if (!response.body) { + controller.abort(); + if ( + typeof (globalThis as any).navigator !== 'undefined' && + (globalThis as any).navigator.product === 'ReactNative' + ) { + throw new PerplexityError( + `The default react-native fetch implementation does not support streaming. Please use expo/fetch: https://docs.expo.dev/versions/latest/sdk/expo/#expofetch-api`, + ); + } + throw new PerplexityError(`Attempted to iterate over a response with no body`); + } + + const sseDecoder = new SSEDecoder(); + const lineDecoder = new LineDecoder(); + + const iter = ReadableStreamToAsyncIterable(response.body); + for await (const sseChunk of iterSSEChunks(iter)) { + for (const line of lineDecoder.decode(sseChunk)) { + const sse = sseDecoder.decode(line); + if (sse) yield sse; + } + } + + for (const line of lineDecoder.flush()) { + const sse = sseDecoder.decode(line); + if (sse) yield sse; + } +} + +/** + * Given an async iterable iterator, iterates over it and yields full + * SSE chunks, i.e. yields when a double new-line is encountered. + */ +async function* iterSSEChunks(iterator: AsyncIterableIterator): AsyncGenerator { + let data = new Uint8Array(); + + for await (const chunk of iterator) { + if (chunk == null) { + continue; + } + + const binaryChunk = + chunk instanceof ArrayBuffer ? new Uint8Array(chunk) + : typeof chunk === 'string' ? encodeUTF8(chunk) + : chunk; + + let newData = new Uint8Array(data.length + binaryChunk.length); + newData.set(data); + newData.set(binaryChunk, data.length); + data = newData; + + let patternIndex; + while ((patternIndex = findDoubleNewlineIndex(data)) !== -1) { + yield data.slice(0, patternIndex); + data = data.slice(patternIndex); + } + } + + if (data.length > 0) { + yield data; + } +} + +class SSEDecoder { + private data: string[]; + private event: string | null; + private chunks: string[]; + + constructor() { + this.event = null; + this.data = []; + this.chunks = []; + } + + decode(line: string) { + if (line.endsWith('\r')) { + line = line.substring(0, line.length - 1); + } + + if (!line) { + // empty line and we didn't previously encounter any messages + if (!this.event && !this.data.length) return null; + + const sse: ServerSentEvent = { + event: this.event, + data: this.data.join('\n'), + raw: this.chunks, + }; + + this.event = null; + this.data = []; + this.chunks = []; + + return sse; + } + + this.chunks.push(line); + + if (line.startsWith(':')) { + return null; + } + + let [fieldname, _, value] = partition(line, ':'); + + if (value.startsWith(' ')) { + value = value.substring(1); + } + + if (fieldname === 'event') { + this.event = value; + } else if (fieldname === 'data') { + this.data.push(value); + } + + return null; + } +} + +function partition(str: string, delimiter: string): [string, string, string] { + const index = str.indexOf(delimiter); + if (index !== -1) { + return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)]; + } + + return [str, '', '']; +} diff --git a/src/internal/decoders/line.ts b/src/internal/decoders/line.ts new file mode 100644 index 0000000..b3bfa97 --- /dev/null +++ b/src/internal/decoders/line.ts @@ -0,0 +1,135 @@ +import { concatBytes, decodeUTF8, encodeUTF8 } from '../utils/bytes'; + +export type Bytes = string | ArrayBuffer | Uint8Array | null | undefined; + +/** + * A re-implementation of httpx's `LineDecoder` in Python that handles incrementally + * reading lines from text. + * + * https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258 + */ +export class LineDecoder { + // prettier-ignore + static NEWLINE_CHARS = new Set(['\n', '\r']); + static NEWLINE_REGEXP = /\r\n|[\n\r]/g; + + #buffer: Uint8Array; + #carriageReturnIndex: number | null; + + constructor() { + this.#buffer = new Uint8Array(); + this.#carriageReturnIndex = null; + } + + decode(chunk: Bytes): string[] { + if (chunk == null) { + return []; + } + + const binaryChunk = + chunk instanceof ArrayBuffer ? new Uint8Array(chunk) + : typeof chunk === 'string' ? encodeUTF8(chunk) + : chunk; + + this.#buffer = concatBytes([this.#buffer, binaryChunk]); + + const lines: string[] = []; + let patternIndex; + while ((patternIndex = findNewlineIndex(this.#buffer, this.#carriageReturnIndex)) != null) { + if (patternIndex.carriage && this.#carriageReturnIndex == null) { + // skip until we either get a corresponding `\n`, a new `\r` or nothing + this.#carriageReturnIndex = patternIndex.index; + continue; + } + + // we got double \r or \rtext\n + if ( + this.#carriageReturnIndex != null && + (patternIndex.index !== this.#carriageReturnIndex + 1 || patternIndex.carriage) + ) { + lines.push(decodeUTF8(this.#buffer.subarray(0, this.#carriageReturnIndex - 1))); + this.#buffer = this.#buffer.subarray(this.#carriageReturnIndex); + this.#carriageReturnIndex = null; + continue; + } + + const endIndex = + this.#carriageReturnIndex !== null ? patternIndex.preceding - 1 : patternIndex.preceding; + + const line = decodeUTF8(this.#buffer.subarray(0, endIndex)); + lines.push(line); + + this.#buffer = this.#buffer.subarray(patternIndex.index); + this.#carriageReturnIndex = null; + } + + return lines; + } + + flush(): string[] { + if (!this.#buffer.length) { + return []; + } + return this.decode('\n'); + } +} + +/** + * This function searches the buffer for the end patterns, (\r or \n) + * and returns an object with the index preceding the matched newline and the + * index after the newline char. `null` is returned if no new line is found. + * + * ```ts + * findNewLineIndex('abc\ndef') -> { preceding: 2, index: 3 } + * ``` + */ +function findNewlineIndex( + buffer: Uint8Array, + startIndex: number | null, +): { preceding: number; index: number; carriage: boolean } | null { + const newline = 0x0a; // \n + const carriage = 0x0d; // \r + + for (let i = startIndex ?? 0; i < buffer.length; i++) { + if (buffer[i] === newline) { + return { preceding: i, index: i + 1, carriage: false }; + } + + if (buffer[i] === carriage) { + return { preceding: i, index: i + 1, carriage: true }; + } + } + + return null; +} + +export function findDoubleNewlineIndex(buffer: Uint8Array): number { + // This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n) + // and returns the index right after the first occurrence of any pattern, + // or -1 if none of the patterns are found. + const newline = 0x0a; // \n + const carriage = 0x0d; // \r + + for (let i = 0; i < buffer.length - 1; i++) { + if (buffer[i] === newline && buffer[i + 1] === newline) { + // \n\n + return i + 2; + } + if (buffer[i] === carriage && buffer[i + 1] === carriage) { + // \r\r + return i + 2; + } + if ( + buffer[i] === carriage && + buffer[i + 1] === newline && + i + 3 < buffer.length && + buffer[i + 2] === carriage && + buffer[i + 3] === newline + ) { + // \r\n\r\n + return i + 4; + } + } + + return -1; +} diff --git a/src/internal/parse.ts b/src/internal/parse.ts index 42d2c66..a9270fb 100644 --- a/src/internal/parse.ts +++ b/src/internal/parse.ts @@ -1,6 +1,7 @@ // File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. import type { FinalRequestOptions } from './request-options'; +import { Stream } from '../core/streaming'; import { type Perplexity } from '../client'; import { formatRequestDetails, loggerFor } from './utils/log'; @@ -16,6 +17,19 @@ export type APIResponseProps = { export async function defaultParseResponse(client: Perplexity, props: APIResponseProps): Promise { const { response, requestLogID, retryOfRequestLogID, startTime } = props; const body = await (async () => { + if (props.options.stream) { + loggerFor(client).debug('response', response.status, response.url, response.headers, response.body); + + // Note: there is an invariant here that isn't represented in the type system + // that if you set `stream: true` the response type must also be `Stream` + + if (props.options.__streamClass) { + return props.options.__streamClass.fromSSEResponse(response, props.controller, client) as any; + } + + return Stream.fromSSEResponse(response, props.controller, client) as any; + } + // fetch refuses to read the body when the status code is 204. if (response.status === 204) { return null as T; diff --git a/src/internal/request-options.ts b/src/internal/request-options.ts index 2aabf9a..56765e5 100644 --- a/src/internal/request-options.ts +++ b/src/internal/request-options.ts @@ -3,6 +3,7 @@ import { NullableHeaders } from './headers'; import type { BodyInit } from './builtin-types'; +import { Stream } from '../core/streaming'; import type { HTTPMethod, MergedRequestInit } from './types'; import { type HeadersLike } from './headers'; @@ -76,6 +77,7 @@ export type RequestOptions = { defaultBaseURL?: string | undefined; __binaryResponse?: boolean | undefined; + __streamClass?: typeof Stream; }; export type EncodedContent = { bodyHeaders: HeadersLike; body: BodyInit }; diff --git a/src/resources/async/chat/completions.ts b/src/resources/async/chat/completions.ts index e35018f..5e3dfda 100644 --- a/src/resources/async/chat/completions.ts +++ b/src/resources/async/chat/completions.ts @@ -2,6 +2,7 @@ import { APIResource } from '../../../core/resource'; import * as Shared from '../../shared'; +import * as ChatAPI from '../../chat/chat'; import { APIPromise } from '../../../core/api-promise'; import { buildHeaders } from '../../../internal/headers'; import { RequestOptions } from '../../../internal/request-options'; @@ -76,35 +77,11 @@ export interface CompletionCreateResponse { failed_at?: number | null; - response?: CompletionCreateResponse.Response | null; + response?: ChatAPI.StreamChunk | null; started_at?: number | null; } -export namespace CompletionCreateResponse { - export interface Response { - id: string; - - choices: Array; - - created: number; - - model: string; - - usage: Shared.UsageInfo; - - citations?: Array | null; - - object?: string; - - search_results?: Array | null; - - status?: 'PENDING' | 'COMPLETED' | null; - - type?: 'message' | 'info' | 'end_of_stream' | null; - } -} - export interface CompletionListResponse { requests: Array; @@ -150,35 +127,11 @@ export interface CompletionGetResponse { failed_at?: number | null; - response?: CompletionGetResponse.Response | null; + response?: ChatAPI.StreamChunk | null; started_at?: number | null; } -export namespace CompletionGetResponse { - export interface Response { - id: string; - - choices: Array; - - created: number; - - model: string; - - usage: Shared.UsageInfo; - - citations?: Array | null; - - object?: string; - - search_results?: Array | null; - - status?: 'PENDING' | 'COMPLETED' | null; - - type?: 'message' | 'info' | 'end_of_stream' | null; - } -} - export interface CompletionCreateParams { request: CompletionCreateParams.Request; diff --git a/src/resources/chat/chat.ts b/src/resources/chat/chat.ts index 4214d94..753ea43 100644 --- a/src/resources/chat/chat.ts +++ b/src/resources/chat/chat.ts @@ -1,19 +1,50 @@ // File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. import { APIResource } from '../../core/resource'; +import * as Shared from '../shared'; import * as CompletionsAPI from './completions'; -import { CompletionCreateParams, CompletionCreateResponse, Completions } from './completions'; +import { + CompletionCreateParams, + CompletionCreateParamsNonStreaming, + CompletionCreateParamsStreaming, + Completions, +} from './completions'; export class Chat extends APIResource { completions: CompletionsAPI.Completions = new CompletionsAPI.Completions(this._client); } +export interface StreamChunk { + id: string; + + choices: Array; + + created: number; + + model: string; + + usage: Shared.UsageInfo; + + citations?: Array | null; + + object?: string; + + search_results?: Array | null; + + status?: 'PENDING' | 'COMPLETED' | null; + + type?: 'message' | 'info' | 'end_of_stream' | null; +} + Chat.Completions = Completions; export declare namespace Chat { + export { type StreamChunk as StreamChunk }; + export { Completions as Completions, - type CompletionCreateResponse as CompletionCreateResponse, type CompletionCreateParams as CompletionCreateParams, + type CompletionCreateParamsNonStreaming as CompletionCreateParamsNonStreaming, + type CompletionCreateParamsStreaming as CompletionCreateParamsStreaming, }; } diff --git a/src/resources/chat/completions.ts b/src/resources/chat/completions.ts index 39e76dc..99285d9 100644 --- a/src/resources/chat/completions.ts +++ b/src/resources/chat/completions.ts @@ -1,42 +1,39 @@ // File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. import { APIResource } from '../../core/resource'; +import * as CompletionsAPI from './completions'; import * as Shared from '../shared'; +import * as ChatAPI from './chat'; import { APIPromise } from '../../core/api-promise'; +import { Stream } from '../../core/streaming'; import { RequestOptions } from '../../internal/request-options'; export class Completions extends APIResource { /** * FastAPI wrapper around chat completions */ - create(body: CompletionCreateParams, options?: RequestOptions): APIPromise { - return this._client.post('/chat/completions', { body, ...options }); + create(body: CompletionCreateParamsNonStreaming, options?: RequestOptions): APIPromise; + create( + body: CompletionCreateParamsStreaming, + options?: RequestOptions, + ): APIPromise>; + create( + body: CompletionCreateParamsBase, + options?: RequestOptions, + ): APIPromise | ChatAPI.StreamChunk>; + create( + body: CompletionCreateParams, + options?: RequestOptions, + ): APIPromise | APIPromise> { + return this._client.post('/chat/completions', { body, ...options, stream: body.stream ?? false }) as + | APIPromise + | APIPromise>; } } -export interface CompletionCreateResponse { - id: string; +export type CompletionCreateParams = CompletionCreateParamsNonStreaming | CompletionCreateParamsStreaming; - choices: Array; - - created: number; - - model: string; - - usage: Shared.UsageInfo; - - citations?: Array | null; - - object?: string; - - search_results?: Array | null; - - status?: 'PENDING' | 'COMPLETED' | null; - - type?: 'message' | 'info' | 'end_of_stream' | null; -} - -export interface CompletionCreateParams { +export interface CompletionCreateParamsBase { messages: Array; model: string; @@ -250,11 +247,23 @@ export namespace CompletionCreateParams { region?: string | null; } } + + export type CompletionCreateParamsNonStreaming = CompletionsAPI.CompletionCreateParamsNonStreaming; + export type CompletionCreateParamsStreaming = CompletionsAPI.CompletionCreateParamsStreaming; +} + +export interface CompletionCreateParamsNonStreaming extends CompletionCreateParamsBase { + stream?: false | null; +} + +export interface CompletionCreateParamsStreaming extends CompletionCreateParamsBase { + stream: true; } export declare namespace Completions { export { - type CompletionCreateResponse as CompletionCreateResponse, type CompletionCreateParams as CompletionCreateParams, + type CompletionCreateParamsNonStreaming as CompletionCreateParamsNonStreaming, + type CompletionCreateParamsStreaming as CompletionCreateParamsStreaming, }; } diff --git a/src/resources/chat/index.ts b/src/resources/chat/index.ts index aeea357..1a4bddc 100644 --- a/src/resources/chat/index.ts +++ b/src/resources/chat/index.ts @@ -1,4 +1,9 @@ // File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. -export { Chat } from './chat'; -export { Completions, type CompletionCreateResponse, type CompletionCreateParams } from './completions'; +export { Chat, type StreamChunk } from './chat'; +export { + Completions, + type CompletionCreateParams, + type CompletionCreateParamsNonStreaming, + type CompletionCreateParamsStreaming, +} from './completions'; diff --git a/src/resources/index.ts b/src/resources/index.ts index 67d2556..da22cc3 100644 --- a/src/resources/index.ts +++ b/src/resources/index.ts @@ -2,5 +2,5 @@ export * from './shared'; export { Async } from './async/async'; -export { Chat } from './chat/chat'; +export { Chat, type StreamChunk } from './chat/chat'; export { Search, type SearchCreateResponse, type SearchCreateParams } from './search'; diff --git a/src/streaming.ts b/src/streaming.ts new file mode 100644 index 0000000..9e6da10 --- /dev/null +++ b/src/streaming.ts @@ -0,0 +1,2 @@ +/** @deprecated Import from ./core/streaming instead */ +export * from './core/streaming'; diff --git a/src/version.ts b/src/version.ts index 5e85c4b..23f967c 100644 --- a/src/version.ts +++ b/src/version.ts @@ -1 +1 @@ -export const VERSION = '0.7.1'; // x-release-please-version +export const VERSION = '0.8.0'; // x-release-please-version diff --git a/tests/api-resources/chat/completions.test.ts b/tests/api-resources/chat/completions.test.ts index bb0d8f3..0aaeac1 100644 --- a/tests/api-resources/chat/completions.test.ts +++ b/tests/api-resources/chat/completions.test.ts @@ -113,7 +113,7 @@ describe('resource completions', () => { search_recency_filter: 'hour', search_tenant: 'search_tenant', stop: 'string', - stream: true, + stream: false, temperature: 0, tool_choice: 'none', tools: [ diff --git a/tests/internal/decoders/line.test.ts b/tests/internal/decoders/line.test.ts new file mode 100644 index 0000000..b444e4c --- /dev/null +++ b/tests/internal/decoders/line.test.ts @@ -0,0 +1,128 @@ +import { findDoubleNewlineIndex, LineDecoder } from '@perplexity-ai/perplexity_ai/internal/decoders/line'; + +function decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] { + const decoder = new LineDecoder(); + const lines: string[] = []; + for (const chunk of chunks) { + lines.push(...decoder.decode(chunk)); + } + + if (flush) { + lines.push(...decoder.flush()); + } + + return lines; +} + +describe('line decoder', () => { + test('basic', () => { + // baz is not included because the line hasn't ended yet + expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']); + }); + + test('basic with \\r', () => { + expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']); + expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']); + }); + + test('trailing new lines', () => { + expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']); + }); + + test('trailing new lines with \\r', () => { + expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']); + }); + + test('escaped new lines', () => { + expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']); + }); + + test('escaped new lines with \\r', () => { + expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']); + }); + + test('\\r & \\n split across multiple chunks', () => { + expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']); + }); + + test('single \\r', () => { + expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']); + }); + + test('double \\r', () => { + expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']); + expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']); + // implementation detail that we don't yield the single \r line until a new \r or \n is encountered + expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']); + }); + + test('double \\r then \\r\\n', () => { + expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']); + expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']); + }); + + test('double newline', () => { + expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']); + expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']); + expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']); + expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']); + }); + + test('multi-byte characters across chunks', () => { + const decoder = new LineDecoder(); + + // bytes taken from the string 'известни' and arbitrarily split + // so that some multi-byte characters span multiple chunks + expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0); + expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0); + expect( + decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])), + ).toHaveLength(0); + + const decoded = decoder.decode(new Uint8Array([0xa])); + expect(decoded).toEqual(['известни']); + }); + + test('flushing trailing newlines', () => { + expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']); + }); + + test('flushing empty buffer', () => { + expect(decodeChunks([], { flush: true })).toEqual([]); + }); +}); + +describe('findDoubleNewlineIndex', () => { + test('finds \\n\\n', () => { + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\nbar'))).toBe(5); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\nbar'))).toBe(2); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\n'))).toBe(5); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\n'))).toBe(2); + }); + + test('finds \\r\\r', () => { + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\rbar'))).toBe(5); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\rbar'))).toBe(2); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\r'))).toBe(5); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\r'))).toBe(2); + }); + + test('finds \\r\\n\\r\\n', () => { + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\nbar'))).toBe(7); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\nbar'))).toBe(4); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\n'))).toBe(7); + expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\n'))).toBe(4); + }); + + test('returns -1 when no double newline found', () => { + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\nbar'))).toBe(-1); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\rbar'))).toBe(-1); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\nbar'))).toBe(-1); + expect(findDoubleNewlineIndex(new TextEncoder().encode(''))).toBe(-1); + }); + + test('handles incomplete patterns', () => { + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r'))).toBe(-1); + expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n'))).toBe(-1); + }); +}); diff --git a/tests/streaming.test.ts b/tests/streaming.test.ts new file mode 100644 index 0000000..5c1599c --- /dev/null +++ b/tests/streaming.test.ts @@ -0,0 +1,245 @@ +import assert from 'assert'; +import { Stream, _iterSSEMessages } from '@perplexity-ai/perplexity_ai/core/streaming'; +import { APIError } from '@perplexity-ai/perplexity_ai/core/error'; +import { ReadableStreamFrom } from '@perplexity-ai/perplexity_ai/internal/shims'; + +describe('streaming decoding', () => { + test('basic', async () => { + async function* body(): AsyncGenerator { + yield Buffer.from('event: completion\n'); + yield Buffer.from('data: {"foo":true}\n'); + yield Buffer.from('\n'); + } + + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ + Symbol.asyncIterator + ](); + + let event = await stream.next(); + assert(event.value); + expect(JSON.parse(event.value.data)).toEqual({ foo: true }); + + event = await stream.next(); + expect(event.done).toBeTruthy(); + }); + + test('data without event', async () => { + async function* body(): AsyncGenerator { + yield Buffer.from('data: {"foo":true}\n'); + yield Buffer.from('\n'); + } + + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ + Symbol.asyncIterator + ](); + + let event = await stream.next(); + assert(event.value); + expect(event.value.event).toBeNull(); + expect(JSON.parse(event.value.data)).toEqual({ foo: true }); + + event = await stream.next(); + expect(event.done).toBeTruthy(); + }); + + test('event without data', async () => { + async function* body(): AsyncGenerator { + yield Buffer.from('event: foo\n'); + yield Buffer.from('\n'); + } + + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ + Symbol.asyncIterator + ](); + + let event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('foo'); + expect(event.value.data).toEqual(''); + + event = await stream.next(); + expect(event.done).toBeTruthy(); + }); + + test('multiple events', async () => { + async function* body(): AsyncGenerator { + yield Buffer.from('event: foo\n'); + yield Buffer.from('\n'); + yield Buffer.from('event: ping\n'); + yield Buffer.from('\n'); + } + + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ + Symbol.asyncIterator + ](); + + let event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('foo'); + expect(event.value.data).toEqual(''); + + event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('ping'); + expect(event.value.data).toEqual(''); + + event = await stream.next(); + expect(event.done).toBeTruthy(); + }); + + test('multiple events with data', async () => { + async function* body(): AsyncGenerator { + yield Buffer.from('event: foo\n'); + yield Buffer.from('data: {"foo":true}\n'); + yield Buffer.from('\n'); + yield Buffer.from('event: ping\n'); + yield Buffer.from('data: {"bar":false}\n'); + yield Buffer.from('\n'); + } + + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ + Symbol.asyncIterator + ](); + + let event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('foo'); + expect(JSON.parse(event.value.data)).toEqual({ foo: true }); + + event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('ping'); + expect(JSON.parse(event.value.data)).toEqual({ bar: false }); + + event = await stream.next(); + expect(event.done).toBeTruthy(); + }); + + test('multiple data lines with empty line', async () => { + async function* body(): AsyncGenerator { + yield Buffer.from('event: ping\n'); + yield Buffer.from('data: {\n'); + yield Buffer.from('data: "foo":\n'); + yield Buffer.from('data: \n'); + yield Buffer.from('data:\n'); + yield Buffer.from('data: true}\n'); + yield Buffer.from('\n\n'); + } + + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ + Symbol.asyncIterator + ](); + + let event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('ping'); + expect(JSON.parse(event.value.data)).toEqual({ foo: true }); + expect(event.value.data).toEqual('{\n"foo":\n\n\ntrue}'); + + event = await stream.next(); + expect(event.done).toBeTruthy(); + }); + + test('data json escaped double new line', async () => { + async function* body(): AsyncGenerator { + yield Buffer.from('event: ping\n'); + yield Buffer.from('data: {"foo": "my long\\n\\ncontent"}'); + yield Buffer.from('\n\n'); + } + + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ + Symbol.asyncIterator + ](); + + let event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('ping'); + expect(JSON.parse(event.value.data)).toEqual({ foo: 'my long\n\ncontent' }); + + event = await stream.next(); + expect(event.done).toBeTruthy(); + }); + + test('special new line characters', async () => { + async function* body(): AsyncGenerator { + yield Buffer.from('data: {"content": "culpa "}\n'); + yield Buffer.from('\n'); + yield Buffer.from('data: {"content": "'); + yield Buffer.from([0xe2, 0x80, 0xa8]); + yield Buffer.from('"}\n'); + yield Buffer.from('\n'); + yield Buffer.from('data: {"content": "foo"}\n'); + yield Buffer.from('\n'); + } + + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ + Symbol.asyncIterator + ](); + + let event = await stream.next(); + assert(event.value); + expect(JSON.parse(event.value.data)).toEqual({ content: 'culpa ' }); + + event = await stream.next(); + assert(event.value); + expect(JSON.parse(event.value.data)).toEqual({ content: Buffer.from([0xe2, 0x80, 0xa8]).toString() }); + + event = await stream.next(); + assert(event.value); + expect(JSON.parse(event.value.data)).toEqual({ content: 'foo' }); + + event = await stream.next(); + expect(event.done).toBeTruthy(); + }); + + test('multi-byte characters across chunks', async () => { + async function* body(): AsyncGenerator { + yield Buffer.from('event: completion\n'); + yield Buffer.from('data: {"content": "'); + // bytes taken from the string 'известни' and arbitrarily split + // so that some multi-byte characters span multiple chunks + yield Buffer.from([0xd0]); + yield Buffer.from([0xb8, 0xd0, 0xb7, 0xd0]); + yield Buffer.from([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8]); + yield Buffer.from('"}\n'); + yield Buffer.from('\n'); + } + + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), new AbortController())[ + Symbol.asyncIterator + ](); + + let event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('completion'); + expect(JSON.parse(event.value.data)).toEqual({ content: 'известни' }); + + event = await stream.next(); + expect(event.done).toBeTruthy(); + }); +}); + +test('error handling', async () => { + async function* body(): AsyncGenerator { + yield Buffer.from('event: error\n'); + yield Buffer.from('data: {"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}'); + yield Buffer.from('\n\n'); + } + + const stream = Stream.fromSSEResponse( + new Response(await ReadableStreamFrom(body())), + new AbortController(), + ); + + const err = expect( + (async () => { + for await (const _event of stream) { + } + })(), + ).rejects; + + await err.toMatchInlineSnapshot( + `[Error: {"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}]`, + ); + await err.toBeInstanceOf(APIError); +});