Skip to content

Commit

Permalink
feat(ai/rsc): Patchable string values (#1190)
Browse files Browse the repository at this point in the history
  • Loading branch information
shuding committed Mar 20, 2024
1 parent fac9a6e commit 20007b9
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 12 deletions.
5 changes: 5 additions & 0 deletions .changeset/lemon-beans-bathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'ai': patch
---

feat(ai/rsc): support string diff and patch in streamable value
28 changes: 26 additions & 2 deletions packages/core/rsc/shared-client/streamable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export function readStreamableValue<T = unknown>(
streamableValue;
let curr = row.curr;
let done = false;
let initial = true;

return {
async next() {
Expand All @@ -67,8 +68,22 @@ export function readStreamableValue<T = unknown>(
if (typeof row.error !== 'undefined') {
throw row.error;
}
if ('curr' in row) {
curr = row.curr;
if ('curr' in row || row.diff) {
if (row.diff) {
switch (row.diff[0]) {
case 0:
if (typeof curr !== 'string') {
throw new Error(
'Invalid patch: can only append to string types. This is a bug in the AI SDK.',
);
} else {
(curr as string) = curr + row.diff[1];
}
break;
}
} else {
curr = row.curr;
}

// The last emitted { done: true } won't be used as the value
// by the for await...of syntax.
Expand All @@ -89,6 +104,15 @@ export function readStreamableValue<T = unknown>(
}

row = row.next;
if (initial) {
initial = false;
if (typeof curr === 'undefined') {
// This is the initial chunk and there isn't an initial value yet.
// Let's skip this one.
return this.next();
}
}

return {
value: curr,
done: false,
Expand Down
69 changes: 69 additions & 0 deletions packages/core/rsc/shared-client/streamable.ui.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ function nextTick() {
return Promise.resolve();
}

async function getRawChunks(s: any) {
const { next, ...otherFields } = s;
const chunks = [otherFields];
if (next) {
chunks.push(...(await getRawChunks(await next)));
}
return chunks;
}

describe('rsc - readStreamableValue()', () => {
it('should return an async iterable', () => {
const streamable = createStreamableValue();
Expand Down Expand Up @@ -160,4 +169,64 @@ describe('rsc - readStreamableValue()', () => {
]
`);
});

describe('patch', () => {
it('should be able to append strings as patch', async () => {
const streamable = createStreamableValue();
const value = streamable.value;

streamable.update('hello');
streamable.update('hello world');
streamable.update('hello world!');
streamable.update('new string');
streamable.done('new string with patch!');

expect(await getRawChunks(value)).toMatchInlineSnapshot(`
[
{
"curr": undefined,
"type": Symbol(ui.streamable.value),
},
{
"curr": "hello",
},
{
"diff": [
0,
" world",
],
},
{
"diff": [
0,
"!",
],
},
{
"curr": "new string",
},
{
"diff": [
0,
" with patch!",
],
},
]
`);

const values = [];
for await (const v of readStreamableValue(value)) {
values.push(v);
}
expect(values).toMatchInlineSnapshot(`
[
"hello",
"hello world",
"hello world!",
"new string",
"new string with patch!",
]
`);
});
});
});
81 changes: 71 additions & 10 deletions packages/core/rsc/streamable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
createSuspensedChunk,
consumeStream,
} from './utils';
import type { StreamableValue } from './types';
import type { StreamablePatch, StreamableValue } from './types';

/**
* Create a piece of changable UI that can be streamed to the client.
Expand Down Expand Up @@ -48,7 +48,13 @@ export function createStreamableUI(initialValue?: React.ReactNode) {
warnUnclosedStream();

return {
/**
* The value of the streamable UI. This can be returned from a Server Action and received by the client.
*/
value: row,
/**
* This method updates the current UI node. It takes a new UI node and replaces the old one.
*/
update(value: React.ReactNode) {
assertStream('.update()');

Expand All @@ -67,6 +73,22 @@ export function createStreamableUI(initialValue?: React.ReactNode) {

warnUnclosedStream();
},
/**
* This method is used to append a new UI node to the end of the old one.
* Once appended a new UI node, the previous UI node cannot be updated anymore.
*
* @example
* ```jsx
* const ui = createStreamableUI(<div>hello</div>)
* ui.append(<div>world</div>)
*
* // The UI node will be:
* // <>
* // <div>hello</div>
* // <div>world</div>
* // </>
* ```
*/
append(value: React.ReactNode) {
assertStream('.append()');

Expand All @@ -79,6 +101,10 @@ export function createStreamableUI(initialValue?: React.ReactNode) {

warnUnclosedStream();
},
/**
* This method is used to signal that there is an error in the UI stream.
* It will be thrown on the client side and caught by the nearest error boundary component.
*/
error(error: any) {
assertStream('.error()');

Expand All @@ -88,6 +114,12 @@ export function createStreamableUI(initialValue?: React.ReactNode) {
closed = true;
reject(error);
},
/**
* This method marks the UI node as finalized. You can either call it without any parameters or with a new UI node as the final state.
* Once called, the UI node cannot be updated or appended anymore.
*
* This method is always **required** to be called, otherwise the response will be stuck in a loading state.
*/
done(...args: [] | [React.ReactNode]) {
assertStream('.done()');

Expand Down Expand Up @@ -116,6 +148,7 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
let currentError: E | undefined;
let currentPromise: typeof resolvable.promise | undefined =
resolvable.promise;
let currentPatchValue: StreamablePatch;

function assertStream(method: string) {
if (closed) {
Expand All @@ -138,35 +171,63 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
}
warnUnclosedStream();

function createWrapped(withType?: boolean): StreamableValue<T, E> {
function createWrapped(initialChunk?: boolean): StreamableValue<T, E> {
// This makes the payload much smaller if there're mutative updates before the first read.
const init: Partial<StreamableValue<T, E>> =
currentError === undefined
? { curr: currentValue }
: { error: currentError };
let init: Partial<StreamableValue<T, E>>;

if (currentError !== undefined) {
init = { error: currentError };
} else {
if (currentPatchValue && !initialChunk) {
init = { diff: currentPatchValue };
} else {
init = { curr: currentValue };
}
}

if (currentPromise) {
init.next = currentPromise;
}

if (withType) {
if (initialChunk) {
init.type = STREAMABLE_VALUE_TYPE;
}

return init;
}

// Update the internal `currentValue` and `currentPatchValue` if needed.
function updateValueStates(value: T) {
// If we can only send a patch over the wire, it's better to do so.
currentPatchValue = undefined;
if (typeof value === 'string') {
if (typeof currentValue === 'string') {
if (value.startsWith(currentValue)) {
currentPatchValue = [0, value.slice(currentValue.length)];
}
}
}

currentValue = value;
}

return {
/**
* The value of the streamable. This can be returned from a Server Action and received by the client.
*/
get value() {
return createWrapped(true);
},
/**
* This method updates the current value with a new one.
*/
update(value: T) {
assertStream('.update()');

const resolvePrevious = resolvable.resolve;
resolvable = createResolvablePromise();

currentValue = value;
updateValueStates(value);
currentPromise = resolvable.promise;
resolvePrevious(createWrapped());

Expand Down Expand Up @@ -194,8 +255,8 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
currentPromise = undefined;

if (args.length) {
currentValue = args[0];
resolvable.resolve({ curr: args[0] });
updateValueStates(args[0]);
resolvable.resolve(createWrapped());
return;
}

Expand Down
3 changes: 3 additions & 0 deletions packages/core/rsc/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ export type MutableAIState<AIState> = {
done: ((newState: AIState) => void) | (() => void);
};

export type StreamablePatch = undefined | [0, string]; // Append string.

export type StreamableValue<T = any, E = any> = {
type?: typeof STREAMABLE_VALUE_TYPE;
curr?: T;
error?: E;
diff?: StreamablePatch;
next?: Promise<StreamableValue<T, E>>;
};

0 comments on commit 20007b9

Please sign in to comment.