diff --git a/packages/cre-sdk/src/sdk/impl/runtime-impl.ts b/packages/cre-sdk/src/sdk/impl/runtime-impl.ts index 8d0038c..4be5c09 100644 --- a/packages/cre-sdk/src/sdk/impl/runtime-impl.ts +++ b/packages/cre-sdk/src/sdk/impl/runtime-impl.ts @@ -1,4 +1,5 @@ import { create, type Message } from '@bufbuild/protobuf' +import type { GenMessage } from '@bufbuild/protobuf/codegenv2' import { type Any, anyPack, anyUnpack } from '@bufbuild/protobuf/wkt' import { type AwaitCapabilitiesRequest, @@ -38,8 +39,20 @@ import { import { CapabilityError } from '@cre/sdk/utils/capabilities/capability-error' import { DonModeError, NodeModeError, SecretsError } from '../errors' +/** + * Base implementation shared by DON and Node runtimes. + * + * Call ID Management: + * - DON mode: IDs increment (1, 2, 3...) + * - Node mode: IDs decrement (-1, -2, -3...) + * This prevents collisions when both modes are active. + */ export class BaseRuntimeImpl implements BaseRuntime { - // modeError must only be set from within NodeRuntimeImpl + /** + * When set, prevents operations that aren't allowed in current mode. + * - Set in DON mode when code tries to use NodeRuntime + * - Set in Node mode when code tries to use Runtime + */ public modeError?: Error constructor( @@ -50,6 +63,10 @@ export class BaseRuntimeImpl implements BaseRuntime { private mode: Mode, ) {} + /** + * Calls a capability and returns a lazy result. + * The actual call happens immediately, but result retrieval is deferred. + */ callCapability({ capabilityId, method, @@ -57,6 +74,7 @@ export class BaseRuntimeImpl implements BaseRuntime { inputSchema, outputSchema, }: CallCapabilityParams): { result: () => O } { + // Enforce mode restrictions if (this.modeError) { return { result: () => { @@ -65,29 +83,18 @@ export class BaseRuntimeImpl implements BaseRuntime { } } - // nextCallId tracks the unique id for a request to the WASM host. - // to avoid collisions of the ID in different modes, it is - // incremented in DON mode and decremented in Node mode. - // eg. - first call don mode: nextCallId = 1 - // - second call: nextCallId = 2 - // - first call node mode: nextCallId = -1 - // - second call node mode: nextCallId = -2 - // - etc... + // Allocate unique callback ID for this request + const callbackId = this.allocateCallbackId() + // Send request to WASM host const anyPayload = anyPack(inputSchema, payload) - const callbackId = this.nextCallId - if (this.mode === Mode.DON) { - this.nextCallId++ - } else { - this.nextCallId-- - } - const req = create(CapabilityRequestSchema, { id: capabilityId, method, payload: anyPayload, callbackId, }) + if (!this.helpers.call(req)) { return { result: () => { @@ -100,49 +107,75 @@ export class BaseRuntimeImpl implements BaseRuntime { } } + // Return lazy result - await and unwrap when .result() is called return { - result: () => { - const awaitRequest = create(AwaitCapabilitiesRequestSchema, { - ids: [callbackId], - }) - const awaitResponse = this.helpers.await(awaitRequest, this.maxResponseSize) - const capabilityResponse = awaitResponse.responses[callbackId] + result: () => + this.awaitAndUnwrapCapabilityResponse(callbackId, capabilityId, method, outputSchema), + } + } + + /** + * Allocates a unique callback ID for a capability request. + * DON mode increments, Node mode decrements (prevents collisions). + */ + private allocateCallbackId(): number { + const callbackId = this.nextCallId + if (this.mode === Mode.DON) { + this.nextCallId++ + } else { + this.nextCallId-- + } + return callbackId + } - if (!capabilityResponse) { - throw new CapabilityError(`No response found for callback ID ${callbackId}`, { + /** + * Awaits capability response and unwraps the result or throws error. + */ + private awaitAndUnwrapCapabilityResponse( + callbackId: number, + capabilityId: string, + method: string, + outputSchema: GenMessage, + ): O { + const awaitRequest = create(AwaitCapabilitiesRequestSchema, { + ids: [callbackId], + }) + const awaitResponse = this.helpers.await(awaitRequest, this.maxResponseSize) + const capabilityResponse = awaitResponse.responses[callbackId] + + if (!capabilityResponse) { + throw new CapabilityError(`No response found for callback ID ${callbackId}`, { + capabilityId, + method, + callbackId, + }) + } + + const response = capabilityResponse.response + switch (response.case) { + case 'payload': { + try { + return anyUnpack(response.value as Any, outputSchema) as O + } catch { + throw new CapabilityError(`Error cannot unwrap payload`, { capabilityId, method, callbackId, }) } - - const response = capabilityResponse.response - switch (response.case) { - case 'payload': { - try { - return anyUnpack(response.value as Any, outputSchema) as O - } catch { - throw new CapabilityError(`Error cannot unwrap payload`, { - capabilityId, - method, - callbackId, - }) - } - } - case 'error': - throw new CapabilityError(`Error ${response.value}`, { - capabilityId, - method, - callbackId, - }) - default: - throw new CapabilityError(`Error cannot unwrap ${response.case}`, { - capabilityId, - method, - callbackId, - }) - } - }, + } + case 'error': + throw new CapabilityError(`Error ${response.value}`, { + capabilityId, + method, + callbackId, + }) + default: + throw new CapabilityError(`Error cannot unwrap ${response.case}`, { + capabilityId, + method, + callbackId, + }) } } @@ -160,14 +193,27 @@ export class BaseRuntimeImpl implements BaseRuntime { } } +/** + * It is used when a BFT guarantee cannot be provided automatically (e.g. calling a standard API). + * You tell each node to perform a task on its own. + * Each node returns its own individual answer, and you are responsible for telling the SDK how to combine them into a single, trusted result by providing an aggregation algorithm. + * + * Useful in situation where you already expect non-determinism (e.g., inherently variable HTTP responses). + * Switching from Node Mode back to DON mode requires workflow authors to handle consensus themselves. + */ export class NodeRuntimeImpl extends BaseRuntimeImpl implements NodeRuntime { _isNodeRuntime: true = true + constructor(config: C, nextCallId: number, helpers: RuntimeHelpers, maxResponseSize: bigint) { helpers.switchModes(Mode.NODE) super(config, nextCallId, helpers, maxResponseSize, Mode.NODE) } } +/** + * It is used for operations that are guaranteed to be Byzantine Fault Tolerant (BFT). + * You ask the network to execute something, and CRE handles the underlying complexity to ensure you get back one final, secure, and trustworthy result. + */ export class RuntimeImpl extends BaseRuntimeImpl implements Runtime { private nextNodeCallId: number = -1 @@ -176,12 +222,24 @@ export class RuntimeImpl extends BaseRuntimeImpl implements Runtime { super(config, nextCallId, helpers, maxResponseSize, Mode.DON) } + /** + * Executes a function in Node mode on each node, then aggregates via consensus. + * + * Flow: + * 1. Switches to Node mode, preventing DON operations + * 2. Executes fn() on each node independently + * 3. Captures result or error as "observation" + * 4. Switches back to DON mode + * 5. Runs consensus to aggregate observations + * 6. Returns aggregated result + */ runInNodeMode( fn: (nodeRuntime: NodeRuntime, ...args: TArgs) => TOutput, - consensusAggretation: ConsensusAggregation, + consensusAggregation: ConsensusAggregation, unwrapOptions?: TOutput extends PrimitiveTypes ? never : UnwrapOptions, ): (...args: TArgs) => { result: () => TOutput } { return (...args: TArgs): { result: () => TOutput } => { + // Step 1: Create node runtime and prevent DON operations this.modeError = new DonModeError() const nodeRuntime = new NodeRuntimeImpl( this.config, @@ -190,54 +248,87 @@ export class RuntimeImpl extends BaseRuntimeImpl implements Runtime { this.maxResponseSize, ) - const consensusInput = create(SimpleConsensusInputsSchema, { - descriptors: consensusAggretation.descriptor, - }) - if (consensusAggretation.defaultValue) { - // This cast is safe, since ConsensusAggregation can only have true its second argument if T extends CreSerializable - consensusInput.default = Value.from( - consensusAggretation.defaultValue as CreSerializable, - ).proto() - } + // Step 2: Prepare consensus input with config + const consensusInput = this.prepareConsensusInput(consensusAggregation) + // Step 3: Execute node function and capture result/error try { const observation = fn(nodeRuntime, ...args) - // This cast is safe, since ConsensusAggregation can only have true its second argument if T extends CreSerializable - consensusInput.observation = { - case: 'value', - value: Value.from(observation as CreSerializable).proto(), - } + this.captureObservation(consensusInput, observation) } catch (e: unknown) { - consensusInput.observation = { - case: 'error', - value: (e instanceof Error && e.message) || String(e), - } + this.captureError(consensusInput, e) } finally { - // Always restore DON mode before invoking consensus - this.modeError = undefined - this.nextNodeCallId = nodeRuntime.nextCallId - nodeRuntime.modeError = new NodeModeError() - this.helpers.switchModes(Mode.DON) + // Step 4: Always restore DON mode + this.restoreDonMode(nodeRuntime) } - const consensus = new ConsensusCapability() - const call = consensus.simple(this, consensusInput) - return { - result: () => { - const result = call.result() - const wrappedValue = Value.wrap(result) + // Step 5: Run consensus and return lazy result + return this.runConsensusAndWrap(consensusInput, unwrapOptions) + } + } - return unwrapOptions - ? wrappedValue.unwrapToType(unwrapOptions) - : (wrappedValue.unwrap() as TOutput) - }, - } + private prepareConsensusInput( + consensusAggregation: ConsensusAggregation, + ) { + const consensusInput = create(SimpleConsensusInputsSchema, { + descriptors: consensusAggregation.descriptor, + }) + + if (consensusAggregation.defaultValue) { + // Safe cast: ConsensusAggregation implies T extends CreSerializable + consensusInput.default = Value.from( + consensusAggregation.defaultValue as CreSerializable, + ).proto() + } + + return consensusInput + } + + private captureObservation(consensusInput: any, observation: TOutput) { + // Safe cast: ConsensusAggregation implies T extends CreSerializable + consensusInput.observation = { + case: 'value', + value: Value.from(observation as CreSerializable).proto(), + } + } + + private captureError(consensusInput: any, e: unknown) { + consensusInput.observation = { + case: 'error', + value: (e instanceof Error && e.message) || String(e), + } + } + + private restoreDonMode(nodeRuntime: NodeRuntimeImpl) { + this.modeError = undefined + this.nextNodeCallId = nodeRuntime.nextCallId + nodeRuntime.modeError = new NodeModeError() + this.helpers.switchModes(Mode.DON) + } + + private runConsensusAndWrap( + consensusInput: any, + unwrapOptions?: TOutput extends PrimitiveTypes ? never : UnwrapOptions, + ): { result: () => TOutput } { + const consensus = new ConsensusCapability() + const call = consensus.simple(this, consensusInput) + + return { + result: () => { + const result = call.result() + const wrappedValue = Value.wrap(result) + + return unwrapOptions + ? wrappedValue.unwrapToType(unwrapOptions) + : (wrappedValue.unwrap() as TOutput) + }, } } getSecret(request: SecretRequest | SecretRequestJson): { result: () => Secret } { + // Enforce mode restrictions if (this.modeError) { return { result: () => { @@ -246,15 +337,19 @@ export class RuntimeImpl extends BaseRuntimeImpl implements Runtime { } } + // Normalize request (accept both protobuf and JSON formats) const secretRequest = (request as unknown as { $typeName?: string }).$typeName ? (request as SecretRequest) : create(SecretRequestSchema, request) + + // Allocate callback ID and send request const id = this.nextCallId this.nextCallId++ const secretsReq = create(GetSecretsRequestSchema, { callbackId: id, requests: [secretRequest], }) + if (!this.helpers.getSecrets(secretsReq, this.maxResponseSize)) { return { result: () => { @@ -263,55 +358,72 @@ export class RuntimeImpl extends BaseRuntimeImpl implements Runtime { } } + // Return lazy result return { - result: () => { - const awaitRequest = create(AwaitSecretsRequestSchema, { ids: [id] }) - const awaitResponse = this.helpers.awaitSecrets(awaitRequest, this.maxResponseSize) - const secretsResponse = awaitResponse.responses[id] + result: () => this.awaitAndUnwrapSecret(id, secretRequest), + } + } - if (!secretsResponse) { - throw new SecretsError(secretRequest, 'no response') - } + private awaitAndUnwrapSecret(id: number, secretRequest: SecretRequest): Secret { + const awaitRequest = create(AwaitSecretsRequestSchema, { ids: [id] }) + const awaitResponse = this.helpers.awaitSecrets(awaitRequest, this.maxResponseSize) + const secretsResponse = awaitResponse.responses[id] - const responses = secretsResponse.responses - if (responses.length !== 1) { - throw new SecretsError(secretRequest, 'invalid value returned from host') - } + if (!secretsResponse) { + throw new SecretsError(secretRequest, 'no response') + } - const response = responses[0].response - switch (response.case) { - case 'secret': - return response.value - case 'error': - throw new SecretsError(secretRequest, response.value.error) - default: - throw new SecretsError(secretRequest, 'cannot unmarshal returned value from host') - } - }, + const responses = secretsResponse.responses + if (responses.length !== 1) { + throw new SecretsError(secretRequest, 'invalid value returned from host') + } + + const response = responses[0].response + switch (response.case) { + case 'secret': + return response.value + case 'error': + throw new SecretsError(secretRequest, response.value.error) + default: + throw new SecretsError(secretRequest, 'cannot unmarshal returned value from host') } } + /** + * Generates a report via consensus mechanism. + */ report(input: ReportRequest | ReportRequestJson): { result: () => Report } { const consensus = new ConsensusCapability() const call = consensus.report(this, input) return { - result: () => { - return call.result() - }, + result: () => call.result(), } } } +/** + * Interface to the WASM host environment. + * Provides low-level access to capabilities, secrets, and utilities. + */ export interface RuntimeHelpers { + /** Initiates a capability call. Returns false if capability not found. */ call(request: CapabilityRequest): boolean + + /** Awaits capability responses. Blocks until responses are ready. */ await(request: AwaitCapabilitiesRequest, maxResponseSize: bigint): AwaitCapabilitiesResponse + /** Requests secrets from host. Returns false if host rejects request. */ getSecrets(request: GetSecretsRequest, maxResponseSize: bigint): boolean + + /** Awaits secret responses. Blocks until secrets are ready. */ awaitSecrets(request: AwaitSecretsRequest, maxResponseSize: bigint): AwaitSecretsResponse + /** Switches execution mode (DON vs Node). Affects available operations. */ switchModes(mode: Mode): void + /** Returns current time in nanoseconds. */ now(): number + /** Logs a message to the host environment. */ log(message: string): void } diff --git a/packages/cre-sdk/src/sdk/runtime.ts b/packages/cre-sdk/src/sdk/runtime.ts index eacce6d..fd1c3be 100644 --- a/packages/cre-sdk/src/sdk/runtime.ts +++ b/packages/cre-sdk/src/sdk/runtime.ts @@ -1,6 +1,7 @@ import type { Message } from '@bufbuild/protobuf' import type { GenMessage } from '@bufbuild/protobuf/codegenv2' import type { ReportRequest, ReportRequestJson } from '@cre/generated/sdk/v1alpha/sdk_pb' +import type { Report } from '@cre/sdk/report' import type { ConsensusAggregation, PrimitiveTypes, UnwrapOptions } from '@cre/sdk/utils' import type { SecretsProvider } from '.' @@ -14,9 +15,11 @@ export type CallCapabilityParams = { outputSchema: GenMessage } -import type { Report } from '@cre/sdk/report' - -export type BaseRuntime = { +/** + * Base runtime available in both DON and Node execution modes. + * Provides core functionality for calling capabilities and logging. + */ +export interface BaseRuntime { config: C // callCapability is meant to be called by generated code only. @@ -29,16 +32,30 @@ export type BaseRuntime = { log(message: string): void } -export type Runtime = BaseRuntime & - SecretsProvider & { - runInNodeMode( - fn: (nodeRuntime: NodeRuntime, ...args: TArgs) => TOutput, - consensusAggregation: ConsensusAggregation, - unwrapOptions?: TOutput extends PrimitiveTypes ? never : UnwrapOptions, - ): (...args: TArgs) => { result: () => TOutput } - report(input: ReportRequest | ReportRequestJson): { result: () => Report } - } - -export type NodeRuntime = BaseRuntime & { +/** + * Runtime for Node mode execution. + */ +export interface NodeRuntime extends BaseRuntime { readonly _isNodeRuntime: true } + +/** + * Runtime for DON mode execution. + */ +export interface Runtime extends BaseRuntime, SecretsProvider { + /** + * Executes a function in Node mode and aggregates results via consensus. + * + * @param fn - Function to execute in each node (receives NodeRuntime) + * @param consensusAggregation - How to aggregate results across nodes + * @param unwrapOptions - Optional unwrapping config for complex return types + * @returns Wrapped function that returns aggregated result + */ + runInNodeMode( + fn: (nodeRuntime: NodeRuntime, ...args: TArgs) => TOutput, + consensusAggregation: ConsensusAggregation, + unwrapOptions?: TOutput extends PrimitiveTypes ? never : UnwrapOptions, + ): (...args: TArgs) => { result: () => TOutput } + + report(input: ReportRequest | ReportRequestJson): { result: () => Report } +}