Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions src/core/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ const INVOCATION_TIMEOUT_MS = 60000;
* Wrapper to explicitly mark data for transfer (zero-copy)
*/
export class Transfer<T> {
constructor(public readonly value: T, public readonly transferables: any[]) {}
constructor(public readonly value: T, public readonly transferables: Transferable[]) {}
}

/**
* Dispatcher function type for sending messages.
*/
type MessageDispatcher = (envelope: ProtocolEnvelope, transfer?: any[]) => void;
type MessageDispatcher = (envelope: ProtocolEnvelope, transfer?: Transferable[]) => void;

/**
* Build a proxy object that forwards method calls to a remote context.
Expand All @@ -115,7 +115,7 @@ export function buildMethodProxy<T extends object>(
* Recursively extract Transfer wrappers from a value.
* Returns the unwrapped value and collects transferables into the provided array.
*/
const extractTransferables = (value: unknown, transferList: any[]): unknown => {
const extractTransferables = (value: unknown, transferList: Transferable[]): unknown => {
// Handle Transfer wrapper
if (value instanceof Transfer) {
if (value.transferables) {
Expand Down Expand Up @@ -145,7 +145,7 @@ export function buildMethodProxy<T extends object>(
const correlationId = generateCorrelationId();

// Handle Transfer wrappers (including nested ones)
const transferList: any[] = [];
const transferList: Transferable[] = [];
const cleanParameters = parameters.map(p => extractTransferables(p, transferList));

// Set up expiration timer
Expand Down Expand Up @@ -189,7 +189,7 @@ type MethodImplementations = Record<string, (...args: any[]) => unknown>;
/**
* Response dispatcher type.
*/
type ResponseDispatcher = (response: ResponseEnvelope, transfer?: any[]) => void;
type ResponseDispatcher = (response: ResponseEnvelope, transfer?: Transferable[]) => void;

/**
* Process an incoming protocol message.
Expand Down Expand Up @@ -246,7 +246,7 @@ export function processProtocolMessage(
.then(result => {
// Handle zero-copy Transfer wrapper in return value
let payload = result;
let transferables: any[] | undefined;
let transferables: Transferable[] | undefined;

if (result instanceof Transfer) {
payload = result.value;
Expand Down Expand Up @@ -300,7 +300,7 @@ export function processProtocolMessage(
* Worker-like interface for message passing.
*/
interface WorkerPort {
postMessage(data: unknown): void;
postMessage(data: unknown, transfer?: Transferable[]): void;
on(event: 'message', handler: (data: unknown) => void): void;
}

Expand All @@ -320,7 +320,6 @@ export function connectWorkerPort<T extends object>(
if (transfer && transfer.length > 0 && typeof port.postMessage === 'function') {
// Try to pass transfer list
try {
// @ts-ignore - Handle mixed signatures of postMessage
port.postMessage(envelope, transfer);
} catch (e) {
// Fallback if transfer fails (e.g. not supported in this env)
Expand Down
12 changes: 9 additions & 3 deletions src/workerFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,13 @@ async function createWasmDatabaseConnection(
// Browser Workers use addEventListener, Node.js Workers use .on()
const workerProxy = connectWorkerPort<WorkerMethods>(
{
postMessage: (data: unknown) => workerThread.postMessage(data),
postMessage: (data: unknown, transfer?: Transferable[]) => {
if (transfer) {
workerThread.postMessage(data, transfer);
} else {
workerThread.postMessage(data);
}
},
on: (event: 'message', handler: (data: unknown) => void) => {
if (import.meta.env.VSCODE_BROWSER_EXT) {
// Browser: Web Worker uses addEventListener with MessageEvent wrapper
Expand Down Expand Up @@ -279,7 +285,7 @@ async function createWasmDatabaseConnection(

// Initialize database in worker
// Use Transfer wrapper to zero-copy transfer the array buffers
const transferables: any[] = [];
const transferables: Transferable[] = [];
if (initConfig.content && initConfig.content.buffer) {
transferables.push(initConfig.content.buffer);
}
Expand All @@ -292,7 +298,7 @@ async function createWasmDatabaseConnection(

const result = await workerProxy.initializeDatabase(
displayName,
new Transfer(initConfig, transferables) as any // Cast to satisfy type signature
new Transfer(initConfig, transferables) as unknown as DatabaseInitConfig
);

// Create operations facade that routes to worker
Expand Down
65 changes: 64 additions & 1 deletion tests/unit/rpc.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it } from 'node:test';
import assert from 'node:assert';
import { processProtocolMessage, buildMethodProxy } from '../../src/core/rpc';
import { processProtocolMessage, buildMethodProxy, Transfer } from '../../src/core/rpc';

describe('RPC', () => {
describe('processProtocolMessage', () => {
Expand Down Expand Up @@ -55,5 +55,68 @@ describe('RPC', () => {
errorText: 'Unknown method: unknown'
});
});

it('should handle Transfer wrappers in return value', () => {
const buffer = new ArrayBuffer(8);
const methods = {
getData: () => new Transfer({ data: buffer }, [buffer])
};

let response: any = null;
let transfer: Transferable[] | undefined = undefined;
const sendResponse = (msg: any, t?: Transferable[]) => {
response = msg;
transfer = t;
};

processProtocolMessage({
kind: 'invoke',
correlationId: '123',
methodName: 'getData',
parameters: []
}, methods, sendResponse);

setTimeout(() => {
assert.deepStrictEqual(response, {
kind: 'result',
correlationId: '123',
payload: { data: buffer }
});
assert.strictEqual(transfer?.length, 1);
assert.strictEqual(transfer?.[0], buffer);
}, 0);
});
});

describe('buildMethodProxy', () => {
it('should extract transferables from arguments', () => {
let dispatchedEnvelope: any = null;
let dispatchedTransfer: Transferable[] | undefined = undefined;

const dispatcher = (envelope: any, transfer?: Transferable[]) => {
dispatchedEnvelope = envelope;
dispatchedTransfer = transfer;
};

const proxy = buildMethodProxy<{ sendData: (data: any) => Promise<void> }>(
dispatcher,
['sendData']
);

const buffer = new ArrayBuffer(8);
const data = new Transfer({ buf: buffer }, [buffer]);

// Promise executor runs synchronously so dispatcher should be called immediately
proxy.sendData(data).catch(() => {}); // Prevent unhandled rejection

assert.ok(dispatchedEnvelope);
assert.strictEqual(dispatchedEnvelope.kind, 'invoke');
assert.strictEqual(dispatchedEnvelope.methodName, 'sendData');
assert.deepStrictEqual(dispatchedEnvelope.parameters, [{ buf: buffer }]);

assert.ok(dispatchedTransfer);
assert.strictEqual(dispatchedTransfer.length, 1);
assert.strictEqual(dispatchedTransfer[0], buffer);
});
});
});