Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {

/**
* Client Capability
*
*
* Capability ID: http-actions@1.0.0-alpha
* Default Mode: Mode.NODE
* Capability Name: http-actions
Expand All @@ -24,25 +24,22 @@ import {
export class ClientCapability {
/** The capability ID for this service */
static readonly CAPABILITY_ID = "http-actions@1.0.0-alpha";

/** The default execution mode for this capability */
static readonly DEFAULT_MODE = Mode.NODE;

static readonly CAPABILITY_NAME = "http-actions";
static readonly CAPABILITY_VERSION = "1.0.0-alpha";


constructor(
private readonly mode: Mode = ClientCapability.DEFAULT_MODE
) {}
constructor(private readonly mode: Mode = ClientCapability.DEFAULT_MODE) {}

async sendRequest(input: RequestJson): Promise<Response> {
const payload = {
typeUrl: getTypeUrl(RequestSchema),
value: toBinary(RequestSchema, fromJson(RequestSchema, input)),
};
const capabilityId = ClientCapability.CAPABILITY_ID;

return callCapability({
capabilityId,
method: "SendRequest",
Expand All @@ -65,7 +62,10 @@ export class ClientCapability {
});
}

return fromBinary(ResponseSchema, capabilityResponse.response.value.value);
return fromBinary(
ResponseSchema,
capabilityResponse.response.value.value
);
});
}
}
39 changes: 29 additions & 10 deletions src/sdk/engine/execute.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { describe, test, expect, mock, beforeEach } from "bun:test";
import { handleExecuteRequest } from "@cre/sdk/engine/execute";
import { describe, test, expect, mock } from "bun:test";
import { handler } from "@cre/sdk/workflow";
import { create, toBinary, fromBinary } from "@bufbuild/protobuf";
import {
Expand All @@ -15,20 +14,40 @@ import { OutputsSchema as BasicTriggerOutputsSchema } from "@cre/generated/capab
import { getTypeUrl } from "@cre/sdk/utils/typeurl";
import { emptyConfig, basicRuntime } from "@cre/sdk/testhelpers/mocks";

// Mock the hostBindings module before importing handleExecuteRequest
const mockSendResponse = mock((_response: string) => 0);
const mockHostBindings = {
sendResponse: mockSendResponse,
switchModes: mock((_mode: 0 | 1 | 2) => {}),
log: mock((_message: string) => {}),
callCapability: mock((_request: string) => 1),
awaitCapabilities: mock((_awaitRequest: string, _maxResponseLen: number) =>
btoa("mock_await_capabilities_response")
),
getSecrets: mock((_request: string, _maxResponseLen: number) => 1),
awaitSecrets: mock((_awaitRequest: string, _maxResponseLen: number) =>
btoa("mock_await_secrets_response")
),
versionV2: mock(() => {}),
randomSeed: mock((_mode: 1 | 2) => Math.random()),
getWasiArgs: mock(() => '["mock.wasm", ""]'),
};

// Mock the module
mock.module("@cre/sdk/runtime/host-bindings", () => ({
hostBindings: mockHostBindings,
}));

import { handleExecuteRequest } from "@cre/sdk/engine/execute";

const decodeExecutionResult = (b64: string) =>
fromBinary(ExecutionResultSchema, Buffer.from(b64, "base64"));

describe("engine/execute", () => {
let originalSendResponse: typeof globalThis.sendResponse;

beforeEach(() => {
originalSendResponse = globalThis.sendResponse;
});

test("subscribe returns TriggerSubscriptionRequest wrapped in ExecutionResult", async () => {
const subs: string[] = [];
// mock sendResponse to capture base64 payload
globalThis.sendResponse = mock((resp: string) => {
mockSendResponse.mockImplementation((resp: string) => {
const exec = decodeExecutionResult(resp);
const ts =
exec.result.case === "triggerSubscriptions"
Expand Down Expand Up @@ -71,7 +90,7 @@ describe("engine/execute", () => {
"basic-test-trigger@1.0.0:Trigger:type.googleapis.com/capabilities.internal.basictrigger.v1.Config"
);

globalThis.sendResponse = originalSendResponse;
mockSendResponse.mockRestore();
});

test("trigger routes by id and decodes payload for correct handler", async () => {
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/engine/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type {
CapabilityResponse,
} from "@cre/generated/sdk/v1alpha/sdk_pb";
import type { Workflow } from "@cre/sdk/workflow";
import type { Runtime } from "@cre/sdk/runtime";
import type { Runtime } from "@cre/sdk/runtime/runtime";
import { handleSubscribePhase } from "./handleSubscribePhase";
import { handleExecutionPhase } from "./handleExecutionPhase";

Expand Down
2 changes: 1 addition & 1 deletion src/sdk/engine/handleExecutionPhase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type {
import { fromBinary } from "@bufbuild/protobuf";
import type { Workflow } from "@cre/sdk/workflow";
import { getTypeUrl } from "@cre/sdk/utils/typeurl";
import type { Runtime } from "@cre/sdk/runtime";
import type { Runtime } from "@cre/sdk/runtime/runtime";

export const handleExecutionPhase = async <TConfig>(
req: ExecuteRequest,
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/engine/handleSubscribePhase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
ExecutionResultSchema,
} from "@cre/generated/sdk/v1alpha/sdk_pb";
import type { Workflow } from "@cre/sdk/workflow";
import { host } from "@cre/sdk/utils/host";
import { hostBindings } from "@cre/sdk/runtime/host-bindings";

export const handleSubscribePhase = <TConfig>(
req: ExecuteRequest,
Expand All @@ -35,5 +35,5 @@ export const handleSubscribePhase = <TConfig>(
});

const encoded = toBinary(ExecutionResultSchema, execResult);
host.sendResponse(Buffer.from(encoded).toString("base64"));
hostBindings.sendResponse(Buffer.from(encoded).toString("base64"));
};
12 changes: 0 additions & 12 deletions src/sdk/runtime.ts

This file was deleted.

51 changes: 51 additions & 0 deletions src/sdk/runtime/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Mode } from "@cre/generated/sdk/v1alpha/sdk_pb";

export class DonModeError extends Error {
public name: string;
public capabilityId?: string;
public method?: string;
public mode?: Mode;

constructor(
message = "cannot use DON Runtime inside Node mode",
options?: {
capabilityId?: string;
method?: string;
mode?: Mode;
}
) {
super(message);
this.name = "DonModeError";

if (options) {
this.capabilityId = options.capabilityId;
this.method = options.method;
this.mode = options.mode;
}
}
}

export class NodeModeError extends Error {
public name: string;
public capabilityId?: string;
public method?: string;
public mode?: Mode;

constructor(
message = "cannot use Node Runtime inside DON mode",
options?: {
capabilityId?: string;
method?: string;
mode?: Mode;
}
) {
super(message);
this.name = "NodeModeError";

if (options) {
this.capabilityId = options.capabilityId;
this.method = options.method;
this.mode = options.mode;
}
}
}
48 changes: 48 additions & 0 deletions src/sdk/runtime/host-bindings.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { Mode } from "@cre/generated/sdk/v1alpha/sdk_pb";
import { z } from "zod";

// Zod schema for validating global host functions
const globalHostBindingsSchema = z.object({
switchModes: z.function().args(z.nativeEnum(Mode)).returns(z.void()),
log: z.function().args(z.string()).returns(z.void()),
sendResponse: z.function().args(z.string()).returns(z.number()),
randomSeed: z
.function()
.args(z.union([z.literal(Mode.DON), z.literal(Mode.NODE)]))
.returns(z.number()),
versionV2: z.function().args().returns(z.void()),
callCapability: z.function().args(z.string()).returns(z.number()),
awaitCapabilities: z
.function()
.args(z.string(), z.number())
.returns(z.string()),
getSecrets: z.function().args(z.string(), z.number()).returns(z.number()),
awaitSecrets: z.function().args(z.string(), z.number()).returns(z.string()),
getWasiArgs: z.function().args().returns(z.string()),
});

type GlobalHostBindingsMap = z.infer<typeof globalHostBindingsSchema>;

// Validate global host functions at runtime
const validateGlobalHostBindings = (): GlobalHostBindingsMap => {
const globalFunctions =
globalThis as unknown as Partial<GlobalHostBindingsMap>;

try {
return globalHostBindingsSchema.parse(globalFunctions);
} catch (error) {
const missingFunctions = Object.keys(globalHostBindingsSchema.shape).filter(
(key) => !(key in globalFunctions)
);

throw new Error(
`Missing required global host functions: ${missingFunctions.join(
", "
)}. ` +
`This indicates the runtime environment is not properly configured.`
);
}
};

// Initialize validated global functions
export const hostBindings = validateGlobalHostBindings();
45 changes: 34 additions & 11 deletions src/sdk/runtime/run-in-node-mode.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,39 @@
import { describe, test, expect, mock } from "bun:test";
import { runInNodeMode } from "@cre/sdk/runtime/run-in-node-mode";
import { create } from "@bufbuild/protobuf";
import {
SimpleConsensusInputsSchema,
type SimpleConsensusInputsJson,
} from "@cre/generated/sdk/v1alpha/sdk_pb";
import { ConsensusCapability } from "@cre/generated-sdk/capabilities/internal/consensus/v1alpha/consensus_sdk_gen";
import { host } from "@cre/sdk/utils/host";
import { type NodeRuntime } from "@cre/sdk/runtime/runtime";

// Mock hostBindings before importing runInNodeMode
const calls: string[] = [];
const mockHostBindings = {
sendResponse: mock((_response: string) => 0),
switchModes: mock((mode: 0 | 1 | 2) => {
calls.push(mode === 2 ? "NODE" : mode === 1 ? "DON" : "UNSPECIFIED");
}),
log: mock((_message: string) => {}),
callCapability: mock((_request: string) => 1),
awaitCapabilities: mock((_awaitRequest: string, _maxResponseLen: number) =>
btoa("mock_await_capabilities_response")
),
getSecrets: mock((_request: string, _maxResponseLen: number) => 1),
awaitSecrets: mock((_awaitRequest: string, _maxResponseLen: number) =>
btoa("mock_await_secrets_response")
),
versionV2: mock(() => {}),
randomSeed: mock((_mode: 1 | 2) => Math.random()),
getWasiArgs: mock(() => '["mock.wasm", ""]'),
};

// Mock the module
mock.module("@cre/sdk/runtime/host-bindings", () => ({
hostBindings: mockHostBindings,
}));

import { runInNodeMode } from "@cre/sdk/runtime/run-in-node-mode";

describe("runInNodeMode", () => {
test("accepts message input and returns Value", async () => {
Expand Down Expand Up @@ -39,11 +66,8 @@ describe("runInNodeMode", () => {
});

test("restores DON mode before calling consensus", async () => {
const calls: string[] = [];
const origSwitch = (globalThis as any).switchModes;
(globalThis as any).switchModes = (mode: 0 | 1 | 2) => {
calls.push(mode === 2 ? "NODE" : mode === 1 ? "DON" : "UNSPECIFIED");
};
// Clear the calls array for this test
calls.length = 0;

const origSimple = ConsensusCapability.prototype.simple;
ConsensusCapability.prototype.simple = mock(
Expand All @@ -59,11 +83,10 @@ describe("runInNodeMode", () => {

// restore
ConsensusCapability.prototype.simple = origSimple;
(globalThis as any).switchModes = origSwitch;
});

test("guards DON calls while in node mode", async () => {
// Simulate host.switchModes by touching global function used by host
// Simulate switchModes by touching global function used by host
const origSwitch = (globalThis as any).switchModes;
(globalThis as any).switchModes = (_m: 0 | 1 | 2) => {};

Expand All @@ -77,9 +100,9 @@ describe("runInNodeMode", () => {

let threw = false;
try {
await runInNodeMode(async () => {
await runInNodeMode(async (nodeRuntime: NodeRuntime) => {
// During builder, we are in NODE mode, performing a DON call should throw
expect(() => host.log(""));
expect(() => nodeRuntime.logger.log(""));
return create(SimpleConsensusInputsSchema);
});
} catch (_e) {
Expand Down
10 changes: 5 additions & 5 deletions src/sdk/runtime/run-in-node-mode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import {
type SimpleConsensusInputsJson,
} from "@cre/generated/sdk/v1alpha/sdk_pb";
import { ConsensusCapability } from "@cre/generated-sdk/capabilities/internal/consensus/v1alpha/consensus_sdk_gen";
import { host } from "@cre/sdk/utils/host";
import type { Value } from "@cre/generated/values/v1/values_pb";
import { runtime, type NodeRuntime } from "@cre/sdk/runtime/runtime";

type Inputs = SimpleConsensusInputs | SimpleConsensusInputsJson;

Expand Down Expand Up @@ -49,16 +49,16 @@ const toInputsJson = (input: Inputs): SimpleConsensusInputsJson => {
* Ensures mode is switched back to DON even if errors occur.
*/
export const runInNodeMode = async (
buildConsensusInputs: () => Promise<Inputs> | Inputs
buildConsensusInputs: (nodeRuntime: NodeRuntime) => Promise<Inputs> | Inputs
): Promise<Value> => {
host.switchModes(Mode.NODE);
const nodeRuntime: NodeRuntime = runtime.switchModes(Mode.NODE);
let consensusInputJson: SimpleConsensusInputsJson;
try {
const consensusInput = await buildConsensusInputs();
const consensusInput = await buildConsensusInputs(nodeRuntime);
consensusInputJson = toInputsJson(consensusInput);
} finally {
// Always restore DON mode before invoking consensus
host.switchModes(Mode.DON);
runtime.switchModes(Mode.DON);
}

const consensus = new ConsensusCapability();
Expand Down
Loading
Loading