Skip to content

Commit

Permalink
ai/rsc: Make RSC streamable utils chainable (#1479)
Browse files Browse the repository at this point in the history
  • Loading branch information
shuding committed May 12, 2024
1 parent 3719622 commit 8439884
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
5 changes: 5 additions & 0 deletions .changeset/plenty-onions-invite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'ai': patch
---

ai/rsc: make RSC streamable utils chainable
30 changes: 25 additions & 5 deletions packages/core/rsc/streamable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function createStreamableUI(initialValue?: React.ReactNode) {
}
warnUnclosedStream();

return {
const streamable = {
/**
* The value of the streamable UI. This can be returned from a Server Action and received by the client.
*/
Expand All @@ -61,7 +61,7 @@ function createStreamableUI(initialValue?: React.ReactNode) {
// There is no need to update the value if it's referentially equal.
if (value === currentValue) {
warnUnclosedStream();
return;
return streamable;
}

const resolvable = createResolvablePromise();
Expand All @@ -72,6 +72,8 @@ function createStreamableUI(initialValue?: React.ReactNode) {
reject = resolvable.reject;

warnUnclosedStream();

return streamable;
},
/**
* This method is used to append a new UI node to the end of the old one.
Expand Down Expand Up @@ -100,6 +102,8 @@ function createStreamableUI(initialValue?: React.ReactNode) {
reject = resolvable.reject;

warnUnclosedStream();

return streamable;
},
/**
* This method is used to signal that there is an error in the UI stream.
Expand All @@ -113,6 +117,8 @@ function createStreamableUI(initialValue?: React.ReactNode) {
}
closed = true;
reject(error);

return streamable;
},
/**
* This method marks the UI node as finalized. You can either call it without any parameters or with a new UI node as the final state.
Expand All @@ -129,11 +135,15 @@ function createStreamableUI(initialValue?: React.ReactNode) {
closed = true;
if (args.length) {
resolve({ value: args[0], done: true });
return;
return streamable;
}
resolve({ value: currentValue, done: true });

return streamable;
},
};

return streamable;
}

const STREAMABLE_VALUE_INTERNAL_LOCK = Symbol('streamable.value.lock');
Expand Down Expand Up @@ -276,7 +286,7 @@ function createStreamableValueImpl<T = any, E = any>(initialValue?: T) {
currentValue = value;
}

return {
const streamable = {
/**
* @internal This is an internal lock to prevent the value from being
* updated by the user.
Expand Down Expand Up @@ -306,6 +316,8 @@ function createStreamableValueImpl<T = any, E = any>(initialValue?: T) {
resolvePrevious(createWrapped());

warnUnclosedStream();

return streamable;
},
/**
* This method is used to append a delta string to the current value. It
Expand Down Expand Up @@ -351,6 +363,8 @@ function createStreamableValueImpl<T = any, E = any>(initialValue?: T) {
resolvePrevious(createWrapped());

warnUnclosedStream();

return streamable;
},
/**
* This method is used to signal that there is an error in the value stream.
Expand All @@ -368,6 +382,8 @@ function createStreamableValueImpl<T = any, E = any>(initialValue?: T) {
currentPromise = undefined;

resolvable.resolve({ error });

return streamable;
},
/**
* This method marks the value as finalized. You can either call it without
Expand All @@ -389,12 +405,16 @@ function createStreamableValueImpl<T = any, E = any>(initialValue?: T) {
if (args.length) {
updateValueStates(args[0]);
resolvable.resolve(createWrapped());
return;
return streamable;
}

resolvable.resolve({});

return streamable;
},
};

return streamable;
}

export { createStreamableUI, createStreamableValue };
Expand Down
37 changes: 36 additions & 1 deletion packages/core/rsc/streamable.ui.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import {
openaiFunctionCallChunks,
} from '../tests/snapshots/openai-chat';
import { DEFAULT_TEST_URL, createMockServer } from '../tests/utils/mock-server';
import { createStreamableUI, render } from './streamable';
import {
createStreamableUI,
createStreamableValue,
render,
} from './streamable';
import { z } from 'zod';

const FUNCTION_CALL_TEST_URL = DEFAULT_TEST_URL + 'mock-func-call';
Expand Down Expand Up @@ -445,4 +449,35 @@ describe('rsc - createStreamableUI()', () => {
"
`);
});

it('should return self', async () => {
const ui = createStreamableUI(<div>1</div>)
.update(<div>2</div>)
.update(<div>3</div>)
.done(<div>4</div>);

expect(await flightRender(ui.value)).toMatchInlineSnapshot(`
"1:\\"$Sreact.suspense\\"
2:D{\\"name\\":\\"\\",\\"env\\":\\"Server\\"}
0:[\\"$\\",\\"$1\\",null,{\\"fallback\\":[\\"$\\",\\"div\\",null,{\\"children\\":\\"1\\"}],\\"children\\":\\"$L2\\"}]
3:D{\\"name\\":\\"\\",\\"env\\":\\"Server\\"}
2:[\\"$\\",\\"$1\\",null,{\\"fallback\\":[\\"$\\",\\"div\\",null,{\\"children\\":\\"2\\"}],\\"children\\":\\"$L3\\"}]
4:D{\\"name\\":\\"\\",\\"env\\":\\"Server\\"}
3:[\\"$\\",\\"$1\\",null,{\\"fallback\\":[\\"$\\",\\"div\\",null,{\\"children\\":\\"3\\"}],\\"children\\":\\"$L4\\"}]
4:[\\"$\\",\\"div\\",null,{\\"children\\":\\"4\\"}]
"
`);
});
});

describe('rsc - createStreamableValue()', () => {
it('should return self', async () => {
const value = createStreamableValue(1).update(2).update(3).done(4);
expect(value.value).toMatchInlineSnapshot(`
{
"curr": 4,
"type": Symbol(ui.streamable.value),
}
`);
});
});

0 comments on commit 8439884

Please sign in to comment.