Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions packages/rivetkit/src/actor/protocol/serde.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,81 @@ export function encodeDataToString(message: OutputData): string {
}
}

function base64DecodeToUint8Array(base64: string): Uint8Array {
// Check if Buffer is available (Node.js)
if (typeof Buffer !== "undefined") {
return new Uint8Array(Buffer.from(base64, "base64"));
}

// Browser environment - use atob
const binary = atob(base64);
const len = binary.length;
const bytes = new Uint8Array(len);
for (let i = 0; i < len; i++) {
bytes[i] = binary.charCodeAt(i);
}
return bytes;
}

function base64DecodeToArrayBuffer(base64: string): ArrayBuffer {
return base64DecodeToUint8Array(base64).buffer as ArrayBuffer;
}

/** Stringifies with compat for values that BARE & CBOR supports. */
export function jsonStringifyCompat(input: any): string {
return JSON.stringify(input, (_key, value) =>
typeof value === "bigint" ? value.toString() : value,
);
return JSON.stringify(input, (_key, value) => {
if (typeof value === "bigint") {
return ["$BigInt", value.toString()];
} else if (value instanceof ArrayBuffer) {
return ["$ArrayBuffer", base64EncodeArrayBuffer(value)];
} else if (value instanceof Uint8Array) {
return ["$Uint8Array", base64EncodeUint8Array(value)];
}

// Escape user arrays that start with $ by prepending another $
if (
Array.isArray(value) &&
value.length === 2 &&
typeof value[0] === "string" &&
value[0].startsWith("$")
) {
return ["$" + value[0], value[1]];
}

return value;
});
}

/** Parses JSON with compat for values that BARE & CBOR supports. */
export function jsonParseCompat(input: string): any {
return JSON.parse(input, (_key, value) => {
// Handle arrays with $ prefix
if (
Array.isArray(value) &&
value.length === 2 &&
typeof value[0] === "string" &&
value[0].startsWith("$")
) {
// Known special types
if (value[0] === "$BigInt") {
return BigInt(value[1]);
} else if (value[0] === "$ArrayBuffer") {
return base64DecodeToArrayBuffer(value[1]);
} else if (value[0] === "$Uint8Array") {
return base64DecodeToUint8Array(value[1]);
}

// Unescape user arrays that started with $ ($$foo -> $foo)
if (value[0].startsWith("$$")) {
return [value[0].substring(1), value[1]];
}

// Unknown type starting with $ - this is an error
throw new Error(
`Unknown JSON encoding type: ${value[0]}. This may indicate corrupted data or a version mismatch.`,
);
}

return value;
});
}
93 changes: 54 additions & 39 deletions packages/rivetkit/src/driver-test-suite/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { createNodeWebSocket, type NodeWebSocket } from "@hono/node-ws";
import { bundleRequire } from "bundle-require";
import invariant from "invariant";
import { describe } from "vitest";
import type { Transport } from "@/client/mod";
import type { Encoding, Transport } from "@/client/mod";
import { configureInspectorAccessToken } from "@/inspector/utils";
import { createManagerRouter } from "@/manager/router";
import {
Expand Down Expand Up @@ -60,6 +60,8 @@ export interface DriverTestConfig {

transport?: Transport;

encoding?: Encoding;

clientType: ClientType;

cleanup?: () => Promise<void>;
Expand All @@ -83,68 +85,81 @@ export interface DriverDeployOutput {

/** Runs all Vitest tests against the provided drivers. */
export function runDriverTests(
driverTestConfigPartial: Omit<DriverTestConfig, "clientType" | "transport">,
driverTestConfigPartial: Omit<
DriverTestConfig,
"clientType" | "transport" | "encoding"
>,
) {
const clientTypes: ClientType[] = driverTestConfigPartial.skip?.inline
? ["http"]
: ["http", "inline"];
for (const clientType of clientTypes) {
const driverTestConfig: DriverTestConfig = {
...driverTestConfigPartial,
clientType,
};

describe(`client type (${clientType})`, () => {
runActorDriverTests(driverTestConfig);
runManagerDriverTests(driverTestConfig);
const encodings: Encoding[] = ["bare", "cbor", "json"];

const transports: Transport[] = driverTestConfig.skip?.sse
? ["websocket"]
: ["websocket", "sse"];
for (const transport of transports) {
describe(`transport (${transport})`, () => {
runActorConnTests({
...driverTestConfig,
transport,
});
for (const encoding of encodings) {
describe(`encoding (${encoding})`, () => {
const driverTestConfig: DriverTestConfig = {
...driverTestConfigPartial,
clientType,
encoding,
};

runActorConnStateTests({ ...driverTestConfig, transport });
runActorDriverTests(driverTestConfig);
runManagerDriverTests(driverTestConfig);

runActorReconnectTests({ ...driverTestConfig, transport });
const transports: Transport[] = driverTestConfig.skip?.sse
? ["websocket"]
: ["websocket", "sse"];
for (const transport of transports) {
describe(`transport (${transport})`, () => {
runActorConnTests({
...driverTestConfig,
transport,
});

runRequestAccessTests({ ...driverTestConfig, transport });
runActorConnStateTests({ ...driverTestConfig, transport });

runActorDriverTestsWithTransport({ ...driverTestConfig, transport });
});
}
runActorReconnectTests({ ...driverTestConfig, transport });

runActorHandleTests(driverTestConfig);
runRequestAccessTests({ ...driverTestConfig, transport });

runActionFeaturesTests(driverTestConfig);
runActorDriverTestsWithTransport({
...driverTestConfig,
transport,
});
});
}

runActorVarsTests(driverTestConfig);
runActorHandleTests(driverTestConfig);

runActorMetadataTests(driverTestConfig);
runActionFeaturesTests(driverTestConfig);

runActorOnStateChangeTests(driverTestConfig);
runActorVarsTests(driverTestConfig);

runActorErrorHandlingTests(driverTestConfig);
runActorMetadataTests(driverTestConfig);

runActorInlineClientTests(driverTestConfig);
runActorOnStateChangeTests(driverTestConfig);

runRawHttpTests(driverTestConfig);
runActorErrorHandlingTests(driverTestConfig);

runRawHttpRequestPropertiesTests(driverTestConfig);
runActorInlineClientTests(driverTestConfig);

runRawWebSocketTests(driverTestConfig);
runRawHttpTests(driverTestConfig);

// TODO: re-expose this once we can have actor queries on the gateway
// runRawHttpDirectRegistryTests(driverTestConfig);
runRawHttpRequestPropertiesTests(driverTestConfig);

// TODO: re-expose this once we can have actor queries on the gateway
// runRawWebSocketDirectRegistryTests(driverTestConfig);
runRawWebSocketTests(driverTestConfig);

runActorInspectorTests(driverTestConfig);
// TODO: re-expose this once we can have actor queries on the gateway
// runRawHttpDirectRegistryTests(driverTestConfig);

// TODO: re-expose this once we can have actor queries on the gateway
// runRawWebSocketDirectRegistryTests(driverTestConfig);

runActorInspectorTests(driverTestConfig);
});
}
});
}
}
Expand Down
5 changes: 4 additions & 1 deletion packages/rivetkit/src/driver-test-suite/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,20 @@ export async function setupDriverTest(
namespace,
runnerName,
transport: driverTestConfig.transport,
encoding: driverTestConfig.encoding,
});
} else if (driverTestConfig.clientType === "inline") {
// Use inline client from driver
const transport = driverTestConfig.transport ?? "websocket";
const encoding = driverTestConfig.encoding ?? "bare";
const managerDriver = createTestInlineClientDriver(
endpoint,
"bare",
encoding,
transport,
);
const runConfig = RunConfigSchema.parse({
transport: transport,
encoding: encoding,
});
client = createClientWithDriver(managerDriver, runConfig);
} else {
Expand Down
6 changes: 3 additions & 3 deletions packages/rivetkit/src/serde.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import invariant from "invariant";
import { assertUnreachable } from "@/common/utils";
import type { VersionedDataHandler } from "@/common/versioned-data";
import type { Encoding } from "@/mod";
import { jsonStringifyCompat } from "./actor/protocol/serde";
import { jsonParseCompat, jsonStringifyCompat } from "./actor/protocol/serde";

export function uint8ArrayToBase64(uint8Array: Uint8Array): string {
// Check if Buffer is available (Node.js)
Expand Down Expand Up @@ -78,11 +78,11 @@ export function deserializeWithEncoding<T>(
): T {
if (encoding === "json") {
if (typeof buffer === "string") {
return JSON.parse(buffer);
return jsonParseCompat(buffer);
} else {
const decoder = new TextDecoder("utf-8");
const jsonString = decoder.decode(buffer);
return JSON.parse(jsonString);
return jsonParseCompat(jsonString);
}
} else if (encoding === "cbor") {
invariant(
Expand Down
Loading
Loading