Skip to content

Commit

Permalink
Use msgpack to encode & decode data for channel transfer (#204)
Browse files Browse the repository at this point in the history
* Use msgpack to encode data for channel transfer

* Treat ArrayBuffer as an Array in toWebRData

Improves performance when working with `ArrayBuffer`s by avoiding
the incorrect creation of a very large array of names containing
stringified indexes.
  • Loading branch information
georgestagg committed May 3, 2023
1 parent 57a9f09 commit 407cfba
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 38 deletions.
18 changes: 16 additions & 2 deletions src/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/package.json
Expand Up @@ -72,6 +72,7 @@
"node": ">=14.0.0"
},
"dependencies": {
"@msgpack/msgpack": "^2.8.0",
"@types/emscripten": "^1.39.6",
"jquery": "^3.6.0",
"jstree": "^3.3.12",
Expand Down
7 changes: 3 additions & 4 deletions src/webR/chan/channel-service.ts
Expand Up @@ -5,9 +5,8 @@ import {
Response,
Request,
newResponse,
encodeData,
decodeData,
} from './message';
import { encode, decode } from '@msgpack/msgpack';
import { Endpoint } from './task-common';
import { ChannelMain, ChannelWorker } from './channel';
import { ChannelType } from './channel-common';
Expand Down Expand Up @@ -249,8 +248,8 @@ export class ServiceWorkerChannelWorker implements ChannelWorker {
clientId: this.#mainThreadId,
uuid: request.data.uuid,
};
xhr.send(encodeData(fetchReqBody));
return decodeData(new Uint8Array(xhr.response as ArrayBuffer)) as Response;
xhr.send(encode(fetchReqBody));
return decode(xhr.response as ArrayBuffer) as Response;
} catch (e: any) {
if (e instanceof DOMException && retryCount++ < 1000) {
console.log('Service worker request failed - resending request');
Expand Down
24 changes: 0 additions & 24 deletions src/webR/chan/message.ts
Expand Up @@ -93,27 +93,3 @@ export function newSyncRequest(msg: Message, data: SyncRequestData): SyncRequest
data: { msg, reqData: data },
};
}

const encoder = new TextEncoder();
const decoder = new TextDecoder('utf-8');

/**
* Encode data for transfering from worker thread to main thread.
* @param {any} data The message data to be serialised and encoded.
* @return {Uint8Array} The encoded data.
* @internal
* */
export function encodeData(data: any): Uint8Array {
// TODO: Pass a `replacer` function
return encoder.encode(JSON.stringify(data));
}

/**
* Decode data that has been transferred from worker thread to main thread.
* @param {any} data The message data to be decoded.
* @return {unknown} The data after decoding.
* @internal
* */
export function decodeData(data: Uint8Array): unknown {
return JSON.parse(decoder.decode(data)) as unknown;
}
6 changes: 3 additions & 3 deletions src/webR/chan/serviceworker.ts
@@ -1,5 +1,5 @@
import { promiseHandles } from '../utils';
import { decodeData, encodeData } from './message';
import { encode, decode } from '@msgpack/msgpack';
import { ServiceWorkerHandlers } from './channel';

declare let self: ServiceWorkerGlobalScope;
Expand Down Expand Up @@ -35,7 +35,7 @@ async function sendRequest(clientId: string, uuid: string): Promise<Response> {

const response = await requests[uuid].promise;
const headers = { 'Cross-Origin-Embedder-Policy': 'require-corp' };
return new Response(encodeData(response), { headers });
return new Response(encode(response), { headers });
}

export function handleFetch(event: FetchEvent) {
Expand All @@ -46,7 +46,7 @@ export function handleFetch(event: FetchEvent) {
}
const requestBody = event.request.arrayBuffer();
const requestReponse = requestBody.then(async (body) => {
const data = decodeData(new Uint8Array(body)) as { clientId: string; uuid: string };
const data = decode(body) as { clientId: string; uuid: string };
return await sendRequest(data.clientId, data.uuid);
});
event.waitUntil(requestReponse);
Expand Down
5 changes: 3 additions & 2 deletions src/webR/chan/task-main.ts
Expand Up @@ -3,7 +3,8 @@
import { Endpoint, SZ_BUF_FITS_IDX, SZ_BUF_SIZE_IDX, generateUUID } from './task-common';

import { sleep } from '../utils';
import { SyncRequestData, encodeData } from './message';
import { SyncRequestData } from './message';
import { encode } from '@msgpack/msgpack';

import { IN_NODE } from '../compat';
import type { Worker as NodeWorker } from 'worker_threads';
Expand All @@ -28,7 +29,7 @@ export async function syncResponse(endpoint: Endpoint, data: SyncRequestData, re
let { taskId, sizeBuffer, dataBuffer, signalBuffer } = data;
// console.warn(msg);

const bytes = encodeData(response);
const bytes = encode(response);
const fits = bytes.length <= dataBuffer.length;

Atomics.store(sizeBuffer, SZ_BUF_SIZE_IDX, bytes.length);
Expand Down
5 changes: 3 additions & 2 deletions src/webR/chan/task-worker.ts
Expand Up @@ -8,7 +8,8 @@ import {
UUID_LENGTH,
} from './task-common';

import { newSyncRequest, Message, decodeData } from './message';
import { newSyncRequest, Message } from './message';
import { decode } from '@msgpack/msgpack';

const decoder = new TextDecoder('utf-8');

Expand Down Expand Up @@ -99,7 +100,7 @@ export class SyncTask {

const size = Atomics.load(sizeBuffer, SZ_BUF_SIZE_IDX);
// console.log("===completing", taskId);
return decodeData(dataBuffer.slice(0, size));
return decode(dataBuffer.slice(0, size));
}

get result() {
Expand Down
2 changes: 1 addition & 1 deletion src/webR/robj-worker.ts
Expand Up @@ -1105,7 +1105,7 @@ function toWebRData(jsObj: WebRData): WebRData;
function toWebRData(jsObj: WebRData): WebRData {
if (isWebRDataJs(jsObj)) {
return jsObj;
} else if (Array.isArray(jsObj)) {
} else if (Array.isArray(jsObj) || ArrayBuffer.isView(jsObj)) {
return { names: null, values: jsObj };
} else if (jsObj && typeof jsObj === 'object' && !isComplex(jsObj)) {
return {
Expand Down

0 comments on commit 407cfba

Please sign in to comment.