Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"author": "Layercode",
"license": "MIT",
"name": "@layercode/node-server-sdk",
"version": "1.2.0",
"version": "1.2.2",
"description": "Layercode Node.js Server Side SDK",
"type": "module",
"main": "./dist/cjs/index.js",
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { streamResponse } from './streamResponse';
export { verifySignature } from './verifySignature';
export { ttsWorkersAIStream } from './ttsWorkersAIStream';
111 changes: 111 additions & 0 deletions src/sseParse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
export interface SseMessage {
id?: string;
event: string;
data: string;
retry?: number;
}

export function messageListFromString(input: string): {
messages: SseMessage[];
leftoverData: string | undefined;
} {
const messages: SseMessage[] = [];
let line: string = "";
let ignoreNextNewline = false;
let data: string | undefined;
let id: string | undefined;
let event: string | undefined;
let retry: number | undefined;
let previousChar: string | undefined;
let pendingIndex = 0;
let isEndOfMessage = false;
function handleParseLine(pIndex: number) {
const result = parseLine(line);
data = result.data ?? data;
id = result.id ?? id;
event = result.event ?? event;
retry = result.retry ?? retry;
if (isEndOfMessage) {
if (typeof data === "string") {
messages.push({
id: id,
data: data,
event: event ?? "message",
retry: retry,
});
}
id = undefined;
data = undefined;
event = undefined;
retry = undefined;
pendingIndex = pIndex;
}
line = "";
}
for (let i = 0; i < input.length; i++) {
const char = input[i];
switch (char) {
case "\r": {
isEndOfMessage = previousChar === "\n" || previousChar === "\r";
ignoreNextNewline = true;
const pIndex = input[i + 1] === "\n" ? i + 2 : i + 1;
handleParseLine(pIndex);
break;
}
case "\n": {
if (ignoreNextNewline) {
ignoreNextNewline = false;
break;
}
isEndOfMessage = previousChar === "\n";
handleParseLine(i + 1);
break;
}
default:
line += char;
break;
}
previousChar = char;
}
return {
messages,
leftoverData: input.substring(pendingIndex),
};
}

export function parseLine(input: string): Partial<SseMessage> {
if (input.startsWith("data:")) {
return { data: input.substring(5).trim() };
}
if (input.startsWith("id:")) {
return { id: input.substring(3).trim() };
}
if (input.startsWith("event:")) {
return {
event: input.substring(6).trim(),
};
}
if (input.startsWith("retry:")) {
const val = Number(input.substring(6).trim());
if (!Number.isNaN(val)) {
if (Number.isInteger(val)) {
return { retry: val };
} else {
return { retry: Math.round(val) };
}
}
}
return {};
}

export async function getBytes(
controller: AbortController,
stream: ReadableStream<Uint8Array>,
onChunk: (arr: Uint8Array) => void,
) {
const reader = stream.getReader();
let result: ReadableStreamReadResult<Uint8Array>;
while (!controller.signal.aborted && !(result = await reader.read()).done) {
onChunk(result.value);
}
}
9 changes: 9 additions & 0 deletions src/streamResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@
* @returns Response object
*/

import { ttsWorkersAIStream as ttsWorkersAIStreamUtil } from './ttsWorkersAIStream';

export interface StreamResponseHandlerHelpers {
stream: {
tts: (content: string) => void;
ttsTextStream: (textStream: AsyncIterable<string>) => Promise<void>;
/**
* Parse a Workers AI (SSE JSON) stream and speak chunks via TTS.
* Returns the concatenated text that was spoken.
*/
ttsWorkersAIStream: (llmResponseStream: ReadableStream<Uint8Array>) => Promise<string>;
data: (content: any) => void;
// other?: (type: string, payload: any) => void;
end: () => void;
Expand Down Expand Up @@ -38,6 +45,8 @@ export function streamResponse(requestBody: Record<string, any>, handler: Stream
stream.tts(chunk);
}
},
ttsWorkersAIStream: async (llmResponseStream: ReadableStream<Uint8Array>): Promise<string> =>
ttsWorkersAIStreamUtil(llmResponseStream, (text) => stream.tts(text)),
data: (content: any) => sendEvent('response.data', { content }),
// other: (type: string, payload: any) => sendEvent(type, payload),
end: () => {
Expand Down
41 changes: 41 additions & 0 deletions src/ttsWorkersAIStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { getBytes, messageListFromString } from './sseParse';

/**
* Parses a Workers AI SSE JSON stream and invokes the provided TTS callback
* with each text chunk. Returns the concatenated text.
*/
export async function ttsWorkersAIStream(
llmResponseStream: ReadableStream<Uint8Array>,
tts: (content: string) => void,
): Promise<string> {
const decoder = new TextDecoder();
const abort = new AbortController();
let pendingData = '';
let combinedText = '';

await getBytes(abort, llmResponseStream, (arr) => {
const text = pendingData + decoder.decode(arr, { stream: true });
const result = messageListFromString(text);
pendingData = result.leftoverData ?? '';
for (const msg of result.messages) {
const data = (msg.data || '').trim();
if (!data) continue;
if (data === '[DONE]') {
abort.abort('done');
break;
}
try {
const json = JSON.parse(data);
const chunk = typeof json?.response === 'string' ? json.response : '';
if (chunk) {
tts(chunk);
combinedText += chunk;
}
} catch {
// ignore non-JSON payloads
}
}
});

return combinedText;
}