Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions packages/actor-core-cli/src/workflow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ export interface Context {
wait: (ms: number) => Promise<undefined>;
task: <T extends UserFnReturnType>(
name: string,
taskFn: (toolbox: Context) => T,
taskFn: (ctx: Context) => T,
opts?: TaskOptions,
) => AsyncGenerator<
WorkflowAction.All,
Expand Down Expand Up @@ -218,6 +218,10 @@ interface TaskOptions {
success?: ReactNode;
}

interface RunnerToolbox {
processTask: (task: WorkflowAction.All) => void;
}

let TASK_ID = 0;

function getTaskId() {
Expand All @@ -226,15 +230,14 @@ function getTaskId() {

export function workflow(
title: string,
workflowFn: (
toolbox: Context,
) => AsyncGenerator<WorkflowAction.All | undefined>,
workflowFn: (ctx: Context) => AsyncGenerator<WorkflowAction.All | undefined>,
opts: TaskOptions = {},
) {
let renderUtils: ReturnType<typeof render> | null = null;

async function* runner<T extends UserFnReturnType>(
meta: TaskMetadata & { processTask: (task: WorkflowAction.All) => void },
meta: TaskMetadata,
toolbox: RunnerToolbox,
name: string,
taskFn: (ctx: Context) => T,
opts: TaskOptions = {},
Expand All @@ -243,7 +246,9 @@ export function workflow(
const p = WorkflowAction.progress.bind(null, { ...meta, id, name, opts });
yield p("running");
try {
const output = taskFn(createContext({ ...meta, id, name, opts, processTask: meta.processTask }));
const output = taskFn(
createContext({ ...meta, id, name, opts }, toolbox),
);
if (output instanceof Promise) {
const result = await output;
yield p("done", { result, ...opts });
Expand All @@ -259,17 +264,19 @@ export function workflow(
}
}

function createContext(
meta: TaskMetadata & { processTask: (task: WorkflowAction.All) => void },
): Context {
function createContext(meta: TaskMetadata, toolbox: RunnerToolbox): Context {
return {
wait: (ms: number) =>
new Promise<undefined>((resolve) => setTimeout(resolve, ms)),
task: runner.bind(null, {
...meta,
parent: meta.id,
name: "",
}) as Context["task"],
task: runner.bind(
null,
{
...meta,
parent: meta.id,
name: "",
},
toolbox,
) as Context["task"],
render(children: React.ReactNode) {
return WorkflowAction.hook("afterAll", ({ tasks, logs }) => {
renderUtils?.rerender(
Expand All @@ -288,7 +295,7 @@ export function workflow(
});
},
changeLabel: (label: string) => {
meta.processTask(
toolbox.processTask(
WorkflowAction.progress({ ...meta, name: label }, "running"),
);
},
Expand Down Expand Up @@ -349,9 +356,9 @@ export function workflow(
};
}

async function* workflowRunner(
processTask: (task: WorkflowAction.All) => void,
): AsyncGenerator<WorkflowAction.All, WorkflowResult> {
async function* workflowRunner({
processTask,
}: RunnerToolbox): AsyncGenerator<WorkflowAction.All, WorkflowResult> {
// task <> parent
const parentMap = new Map<string, string>();
const id = getTaskId();
Expand All @@ -361,7 +368,7 @@ export function workflow(
"running",
);
for await (const task of workflowFn(
createContext({ id, name: title, parent: id, processTask }),
createContext({ id, name: title, parent: id }, { processTask }),
)) {
if (!task || typeof task !== "object") {
continue;
Expand Down Expand Up @@ -457,7 +464,7 @@ export function workflow(
);
}

for await (const task of workflowRunner(processTask)) {
for await (const task of workflowRunner({ processTask })) {
processTask(task);
}

Expand Down
Loading