Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ai/rsc): Implement bulked AI actions #1108

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 75 additions & 6 deletions packages/core/rsc/provider.tsx
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
// This file provides the AI context to all AI Actions via AsyncLocalStorage.

import * as React from 'react';
import { InternalAIProvider } from './rsc-shared.mjs';
import {
withAIState,
getAIStateDeltaPromise,
sealMutableAIState,
} from './ai-state';
import * as jsondiffpatch from 'jsondiffpatch';

import type {
ServerWrappedActions,
AIAction,
Expand All @@ -17,6 +13,14 @@ import type {
OnGetUIState,
} from './types';

import { InternalAIProvider } from './rsc-shared.mjs';
import {
withAIState,
getAIStateDeltaPromise,
sealMutableAIState,
} from './ai-state';
import { createResolvablePromise } from './utils';

async function innerAction<T>(
{
action,
Expand Down Expand Up @@ -46,6 +50,70 @@ function wrapAction<T = unknown>(
return innerAction.bind(null, { action, options }) as AIAction<T>;
}

async function bulkActions<T, D, R>(
initialState: any,
wrappedActions: [AIAction<T, [D, R]>, args: T[]][],
): Promise<[Promise<D | undefined>, Promise<R>[]]> {
'use server';

let state = initialState;

let diffPromise = Promise.resolve<D | undefined>(undefined);
const results: Promise<R>[] = [];
const resolveResults: {
resolve: (value: R) => void;
reject: (error: Error) => void;
}[] = [];

// This makes it possible to return earlier results before waiting for next
// action to be executed.
for (let i = 0; i < wrappedActions.length; i++) {
const { resolve, reject, promise } = createResolvablePromise<R>();
resolveResults.push({ resolve, reject });
results.push(promise);
}

// Execute all actions in order.
for (let i = 0; i < wrappedActions.length; i++) {
const [action, args] = wrappedActions[i];

try {
const [diff, result] = await action(state, ...args);
resolveResults[i].resolve(result);

if (i === wrappedActions.length - 1) {
// Last action
diffPromise = Promise.resolve(diff);
} else {
// Not the last action, apply the diff and continue.
const delta = await diff;
if (delta) {
state = jsondiffpatch.patch(jsondiffpatch.clone(state), delta);
}
}
} catch (e) {
console.error(e);

// Reject all remaining results and return.
// It's fine to return directly because diffPromise should be undefined at this point
// so it won't affect the client state.
for (let j = i; j < wrappedActions.length; j++) {
resolveResults[j].reject(e as Error);
}
}
}

return [
diffPromise.then(delta => {
if (delta) {
state = jsondiffpatch.patch(jsondiffpatch.clone(state), delta);
}
return jsondiffpatch.diff(initialState, state) as D;
}),
results,
];
}

export function createAI<
AIState = any,
UIState = any,
Expand Down Expand Up @@ -103,6 +171,7 @@ export function createAI<
<InternalAIProvider
wrappedActions={wrappedActions}
wrappedSyncUIState={wrappedSyncUIState}
bulkActions={bulkActions}
initialUIState={uiState}
initialAIState={aiState}
initialAIStatePatch={aiStateDelta}
Expand Down
55 changes: 26 additions & 29 deletions packages/core/rsc/shared-client/context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
InferUIState,
} from '../types';
import { isFunction } from '../utils';
import { createActionQueue } from './utils';

const InternalUIStateProvider = React.createContext<null | any>(null);
const InternalAIStateProvider = React.createContext<undefined | any>(undefined);
Expand All @@ -23,13 +24,16 @@
initialUIState,
initialAIState,
initialAIStatePatch,
bulkActions,
wrappedActions,
wrappedSyncUIState,
}: InternalAIProviderProps) {
if (!('use' in React)) {
throw new Error('Unsupported React version.');
}

const q = React.useState(() => createActionQueue(bulkActions))[0];

const uiState = React.useState(initialUIState);
const setUIState = uiState[1];

Expand All @@ -52,7 +56,7 @@

React.useEffect(() => {
aiStateRef.current = aiState[0];
}, [aiState[0]]);

Check warning on line 59 in packages/core/rsc/shared-client/context.tsx

View workflow job for this annotation

GitHub Actions / ESLint

React Hook React.useEffect has a missing dependency: 'aiState'. Either include it or remove the dependency array

Check warning on line 59 in packages/core/rsc/shared-client/context.tsx

View workflow job for this annotation

GitHub Actions / ESLint

React Hook React.useEffect has a complex expression in the dependency array. Extract it to a separate variable so it can be statically checked

const clientWrappedActions = React.useMemo(
() =>
Expand All @@ -60,27 +64,23 @@
Object.entries(wrappedActions).map(([key, action]) => [
key,
async (...args: any) => {
const aiStateSnapshot = aiStateRef.current;
const [aiStateDelta, result] = await action(
aiStateSnapshot,
...args,
const result = await q(
() => aiStateRef.current,
args,
action,
(state, delta) => {
if (delta !== undefined) {
setAIState(
jsondiffpatch.patch(jsondiffpatch.clone(state), delta),
);
}
},
);
(async () => {
const delta = await aiStateDelta;
if (delta !== undefined) {
aiState[1](
jsondiffpatch.patch(
jsondiffpatch.clone(aiStateSnapshot),
delta,
),
);
}
})();
return result;
},
]),
),
[wrappedActions],
[wrappedActions, q],

Check warning on line 83 in packages/core/rsc/shared-client/context.tsx

View workflow job for this annotation

GitHub Actions / ESLint

React Hook React.useMemo has a missing dependency: 'setAIState'. Either include it or remove the dependency array
);

const clientWrappedSyncUIStateAction = React.useMemo(() => {
Expand All @@ -89,25 +89,22 @@
}

return async () => {
const aiStateSnapshot = aiStateRef.current;
const [aiStateDelta, uiState] = await wrappedSyncUIState!(
aiStateSnapshot,
const uiState = await q(
() => aiStateRef.current,
[],
wrappedSyncUIState!,
(state, delta) => {
if (delta !== undefined) {
setAIState(jsondiffpatch.patch(jsondiffpatch.clone(state), delta));
}
},
);

if (uiState !== undefined) {
setUIState(uiState);
}

const delta = await aiStateDelta;
if (delta !== undefined) {
const patchedAiState = jsondiffpatch.patch(
jsondiffpatch.clone(aiStateSnapshot),
delta,
);
setAIState(patchedAiState);
}
};
}, [wrappedSyncUIState]);
}, [wrappedSyncUIState, q]);

Check warning on line 107 in packages/core/rsc/shared-client/context.tsx

View workflow job for this annotation

GitHub Actions / ESLint

React Hook React.useMemo has missing dependencies: 'setAIState' and 'setUIState'. Either include them or remove the dependency array

return (
<InternalAIStateProvider.Provider value={aiState}>
Expand Down Expand Up @@ -182,7 +179,7 @@
}

const key = args[0];
const setter = React.useCallback(

Check warning on line 182 in packages/core/rsc/shared-client/context.tsx

View workflow job for this annotation

GitHub Actions / ESLint

React Hook useCallback received a function whose dependencies are unknown. Pass an inline function instead
typeof key === 'undefined'
? state[1]
: (newState: ValueOrUpdater<T>) => {
Expand Down
124 changes: 124 additions & 0 deletions packages/core/rsc/shared-client/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
'use client';

import type { BulkAction } from '../types';

export function createResolvablePromise<T = any>() {
let resolve: (value: T) => void, reject: (error: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return {
promise,
resolve: resolve!,
reject: reject!,
};
}

export function createActionQueue<T>(bulkActions: BulkAction<T>) {
let executing: Promise<any> | null = null;
let queueExecuting: Promise<any> | null = null;
const queued: any[] = [];

return async (
getState: () => any,
args: any[],
action: (...args: any[]) => Promise<[Promise<any>, unknown]>,
applyStateDiff: (state: any, delta: any) => void,
) => {
const index = queued.length;

// Add execution to the queue
queued.push([action, args]);

if (executing) {
await executing;
}

const resolvable = createResolvablePromise();
const state = getState();

try {
const bulkSize = queued.length;
if (bulkSize > 1) {
const queueResolvable = createResolvablePromise();

try {
// Bulk execute all queued actions. The first action needs to start
// the request and others should just wait.
executing = resolvable.promise;
queueExecuting = queueResolvable.promise;

// Remove executed actions from the queue
const actions = [...queued];
queued.length = 0;

const bulkResult = await bulkActions(state, actions);
executing = null;
queueExecuting = null;

const aiStateDelta = bulkResult[0];
const result = bulkResult[1][index];

(async () => {
try {
const delta = await aiStateDelta;
applyStateDiff(state, delta);
resolvable.resolve(result);
executing = null;
queueExecuting = null;
} catch (e) {
resolvable.reject(e);
executing = null;
queueExecuting = null;
throw e;
}
})();

queueResolvable.resolve(bulkResult);
return result;
} catch (e) {
queueResolvable.reject(e);
throw e;
}
} else if (queueExecuting) {
// Wait for bulk execution to finish. No need to apply any state changes.
const bulkResult = await queueExecuting;
const result = bulkResult[1][index];

resolvable.resolve(null);
return result;
} else {
// Single execution.
executing = resolvable.promise;

// Remove executed action from the queue
queued.shift();

const [aiStateDelta, result] = await action(state, ...args);

(async () => {
try {
const delta = await aiStateDelta;
applyStateDiff(state, delta);
resolvable.resolve(result);
executing = null;
queueExecuting = null;
} catch (e) {
resolvable.reject(e);
executing = null;
queueExecuting = null;
throw e;
}
})();

return result;
}
} catch (e) {
resolvable.reject(e);
executing = null;
queueExecuting = null;
throw e;
}
};
}
6 changes: 6 additions & 0 deletions packages/core/rsc/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ export type ServerWrappedActions<T = unknown> = Record<
ServerWrappedAction<T>
>;

export type BulkAction<T, D = any, R = any> = (
initialState: any,
wrappedActions: [AIAction<T, [D, R]>, args: T[]][],
) => Promise<[Promise<D | undefined>, Promise<R>[]]>;

export type InternalAIProviderProps<AIState = any, UIState = any> = {
children: React.ReactNode;
initialUIState: UIState;
initialAIState: AIState;
initialAIStatePatch: undefined | Promise<AIState>;
bulkActions: BulkAction<AIState>;
wrappedActions: ServerWrappedActions<AIState>;
wrappedSyncUIState?: ServerWrappedAction<AIState>;
};
Expand Down
Loading