Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-circular-dependency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Fix circular dependency between runtime.ts and runtime/start.ts that caused issues with Bun's module resolution
2 changes: 1 addition & 1 deletion .changeset/pre.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"@workflow/vite": "4.0.0-beta.1",
"@workflow/docs-typecheck": "0.0.0",
"@workflow/serde": "4.0.0",
"@workflow/nest": "0.0.0-development",
"@workflow/nest": "4.0.0",
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes nest version starting at 0.0.0. No actual change otherwise.

"@workflow/example-nest": "0.0.0"
},
"changesets": [
Expand Down
195 changes: 7 additions & 188 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
import {
WorkflowRunCancelledError,
WorkflowRunFailedError,
WorkflowRunNotCompletedError,
WorkflowRuntimeError,
} from '@workflow/errors';
import { WorkflowRuntimeError } from '@workflow/errors';
import { parseWorkflowName } from '@workflow/utils/parse-name';
import {
type Event,
SPEC_VERSION_CURRENT,
WorkflowInvokePayloadSchema,
type WorkflowRun,
type WorkflowRunStatus,
type World,
} from '@workflow/world';
import { WorkflowSuspension } from './global.js';
import { runtimeLogger } from './logger.js';
Expand All @@ -24,18 +17,11 @@ import {
} from './runtime/helpers.js';
import { handleSuspension } from './runtime/suspension-handler.js';
import { getWorld, getWorldHandlers } from './runtime/world.js';
import {
getExternalRevivers,
hydrateWorkflowReturnValue,
} from './serialization.js';
import { remapErrorStack } from './source-map.js';
import * as Attribute from './telemetry/semantic-conventions.js';
import { linkToCurrentContext, trace, withTraceContext } from './telemetry.js';
import { getErrorName, getErrorStack } from './types.js';
import {
buildWorkflowSuspensionMessage,
getWorkflowRunStreamId,
} from './util.js';
import { buildWorkflowSuspensionMessage } from './util.js';
import { runWorkflow } from './workflow.js';

export type { Event, WorkflowRun };
Expand All @@ -59,178 +45,11 @@ export {
getWorldHandlers,
setWorld,
} from './runtime/world.js';

/**
* Options for configuring a workflow's readable stream.
*/
export interface WorkflowReadableStreamOptions {
/**
* An optional namespace to distinguish between multiple streams associated
* with the same workflow run.
*/
namespace?: string;
/**
* The index number of the starting chunk to begin reading the stream from.
*/
startIndex?: number;
/**
* Any asynchronous operations that need to be performed before the execution
* environment is paused / terminated
* (i.e. using [`waitUntil()`](https://developer.mozilla.org/docs/Web/API/ExtendableEvent/waitUntil) or similar).
*/
ops?: Promise<any>[];
/**
* The global object to use for hydrating types from the global scope.
*
* Defaults to {@link [`globalThis`](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/globalThis)}.
*/
global?: Record<string, any>;
}

/**
* A handler class for a workflow run.
*/
export class Run<TResult> {
/**
* The ID of the workflow run.
*/
runId: string;

/**
* The world object.
* @internal
*/
private world: World;

constructor(runId: string) {
this.runId = runId;
this.world = getWorld();
}

/**
* Cancels the workflow run.
*/
async cancel(): Promise<void> {
await this.world.events.create(this.runId, {
eventType: 'run_cancelled',
specVersion: SPEC_VERSION_CURRENT,
});
}

/**
* The status of the workflow run.
*/
get status(): Promise<WorkflowRunStatus> {
return this.world.runs.get(this.runId).then((run) => run.status);
}

/**
* The return value of the workflow run.
* Polls the workflow return value until it is completed.
*/
get returnValue(): Promise<TResult> {
return this.pollReturnValue();
}

/**
* The name of the workflow.
*/
get workflowName(): Promise<string> {
return this.world.runs.get(this.runId).then((run) => run.workflowName);
}

/**
* The timestamp when the workflow run was created.
*/
get createdAt(): Promise<Date> {
return this.world.runs.get(this.runId).then((run) => run.createdAt);
}

/**
* The timestamp when the workflow run started execution.
* Returns undefined if the workflow has not started yet.
*/
get startedAt(): Promise<Date | undefined> {
return this.world.runs.get(this.runId).then((run) => run.startedAt);
}

/**
* The timestamp when the workflow run completed.
* Returns undefined if the workflow has not completed yet.
*/
get completedAt(): Promise<Date | undefined> {
return this.world.runs.get(this.runId).then((run) => run.completedAt);
}

/**
* The readable stream of the workflow run.
*/
get readable(): ReadableStream {
return this.getReadable();
}

/**
* Retrieves the workflow run's default readable stream, which reads chunks
* written to the corresponding writable stream {@link getWritable}.
*
* @param options - The options for the readable stream.
* @returns The `ReadableStream` for the workflow run.
*/
getReadable<R = any>(
options: WorkflowReadableStreamOptions = {}
): ReadableStream<R> {
const { ops = [], global = globalThis, startIndex, namespace } = options;
const name = getWorkflowRunStreamId(this.runId, namespace);
return getExternalRevivers(global, ops, this.runId).ReadableStream({
name,
startIndex,
}) as ReadableStream<R>;
}

/**
* Polls the workflow return value every 1 second until it is completed.
* @internal
* @returns The workflow return value.
*/
private async pollReturnValue(): Promise<TResult> {
while (true) {
try {
const run = await this.world.runs.get(this.runId);

if (run.status === 'completed') {
return hydrateWorkflowReturnValue(run.output, [], this.runId);
}

if (run.status === 'cancelled') {
throw new WorkflowRunCancelledError(this.runId);
}

if (run.status === 'failed') {
throw new WorkflowRunFailedError(this.runId, run.error);
}

throw new WorkflowRunNotCompletedError(this.runId, run.status);
} catch (error) {
if (WorkflowRunNotCompletedError.is(error)) {
await new Promise((resolve) => setTimeout(resolve, 1_000));
continue;
}
throw error;
}
}
}
}

/**
* Retrieves a `Run` object for a given run ID.
*
* @param runId - The workflow run ID obtained from {@link start}.
* @returns A `Run` object.
* @throws WorkflowRunNotFoundError if the run ID is not found.
*/
export function getRun<TResult>(runId: string): Run<TResult> {
return new Run(runId);
}
export {
getRun,
Run,
type WorkflowReadableStreamOptions,
} from './runtime/run.js';

/**
* Function that creates a single route which handles any workflow execution
Expand Down
Loading
Loading