Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use msgpack to encode & decode data for channel transfer #204

Merged
merged 2 commits into from May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/package.json
Expand Up @@ -72,6 +72,7 @@
"node": ">=14.0.0"
},
"dependencies": {
"@msgpack/msgpack": "^2.8.0",
"@types/emscripten": "^1.39.6",
"jquery": "^3.6.0",
"jstree": "^3.3.12",
Expand Down
7 changes: 3 additions & 4 deletions src/webR/chan/channel-service.ts
Expand Up @@ -5,9 +5,8 @@ import {
Response,
Request,
newResponse,
encodeData,
decodeData,
} from './message';
import { encode, decode } from '@msgpack/msgpack';
import { Endpoint } from './task-common';
import { ChannelMain, ChannelWorker } from './channel';
import { ChannelType } from './channel-common';
Expand Down Expand Up @@ -249,8 +248,8 @@ export class ServiceWorkerChannelWorker implements ChannelWorker {
clientId: this.#mainThreadId,
uuid: request.data.uuid,
};
xhr.send(encodeData(fetchReqBody));
return decodeData(new Uint8Array(xhr.response as ArrayBuffer)) as Response;
xhr.send(encode(fetchReqBody));
return decode(xhr.response as ArrayBuffer) as Response;
} catch (e: any) {
if (e instanceof DOMException && retryCount++ < 1000) {
console.log('Service worker request failed - resending request');
Expand Down
24 changes: 0 additions & 24 deletions src/webR/chan/message.ts
Expand Up @@ -93,27 +93,3 @@ export function newSyncRequest(msg: Message, data: SyncRequestData): SyncRequest
data: { msg, reqData: data },
};
}

const encoder = new TextEncoder();
const decoder = new TextDecoder('utf-8');

/**
* Encode data for transfering from worker thread to main thread.
* @param {any} data The message data to be serialised and encoded.
* @return {Uint8Array} The encoded data.
* @internal
* */
export function encodeData(data: any): Uint8Array {
// TODO: Pass a `replacer` function
return encoder.encode(JSON.stringify(data));
}

/**
* Decode data that has been transferred from worker thread to main thread.
* @param {any} data The message data to be decoded.
* @return {unknown} The data after decoding.
* @internal
* */
export function decodeData(data: Uint8Array): unknown {
return JSON.parse(decoder.decode(data)) as unknown;
}
6 changes: 3 additions & 3 deletions src/webR/chan/serviceworker.ts
@@ -1,5 +1,5 @@
import { promiseHandles } from '../utils';
import { decodeData, encodeData } from './message';
import { encode, decode } from '@msgpack/msgpack';
import { ServiceWorkerHandlers } from './channel';

declare let self: ServiceWorkerGlobalScope;
Expand Down Expand Up @@ -35,7 +35,7 @@ async function sendRequest(clientId: string, uuid: string): Promise<Response> {

const response = await requests[uuid].promise;
const headers = { 'Cross-Origin-Embedder-Policy': 'require-corp' };
return new Response(encodeData(response), { headers });
return new Response(encode(response), { headers });
}

export function handleFetch(event: FetchEvent) {
Expand All @@ -46,7 +46,7 @@ export function handleFetch(event: FetchEvent) {
}
const requestBody = event.request.arrayBuffer();
const requestReponse = requestBody.then(async (body) => {
const data = decodeData(new Uint8Array(body)) as { clientId: string; uuid: string };
const data = decode(body) as { clientId: string; uuid: string };
return await sendRequest(data.clientId, data.uuid);
});
event.waitUntil(requestReponse);
Expand Down
5 changes: 3 additions & 2 deletions src/webR/chan/task-main.ts
Expand Up @@ -3,7 +3,8 @@
import { Endpoint, SZ_BUF_FITS_IDX, SZ_BUF_SIZE_IDX, generateUUID } from './task-common';

import { sleep } from '../utils';
import { SyncRequestData, encodeData } from './message';
import { SyncRequestData } from './message';
import { encode } from '@msgpack/msgpack';

import { IN_NODE } from '../compat';
import type { Worker as NodeWorker } from 'worker_threads';
Expand All @@ -28,7 +29,7 @@ export async function syncResponse(endpoint: Endpoint, data: SyncRequestData, re
let { taskId, sizeBuffer, dataBuffer, signalBuffer } = data;
// console.warn(msg);

const bytes = encodeData(response);
const bytes = encode(response);
const fits = bytes.length <= dataBuffer.length;

Atomics.store(sizeBuffer, SZ_BUF_SIZE_IDX, bytes.length);
Expand Down
5 changes: 3 additions & 2 deletions src/webR/chan/task-worker.ts
Expand Up @@ -8,7 +8,8 @@ import {
UUID_LENGTH,
} from './task-common';

import { newSyncRequest, Message, decodeData } from './message';
import { newSyncRequest, Message } from './message';
import { decode } from '@msgpack/msgpack';

const decoder = new TextDecoder('utf-8');

Expand Down Expand Up @@ -99,7 +100,7 @@ export class SyncTask {

const size = Atomics.load(sizeBuffer, SZ_BUF_SIZE_IDX);
// console.log("===completing", taskId);
return decodeData(dataBuffer.slice(0, size));
return decode(dataBuffer.slice(0, size));
}

get result() {
Expand Down
2 changes: 1 addition & 1 deletion src/webR/robj-worker.ts
Expand Up @@ -1105,7 +1105,7 @@ function toWebRData(jsObj: WebRData): WebRData;
function toWebRData(jsObj: WebRData): WebRData {
if (isWebRDataJs(jsObj)) {
return jsObj;
} else if (Array.isArray(jsObj)) {
} else if (Array.isArray(jsObj) || ArrayBuffer.isView(jsObj)) {
return { names: null, values: jsObj };
} else if (jsObj && typeof jsObj === 'object' && !isComplex(jsObj)) {
return {
Expand Down