Skip to content

Use BroadcastChannel to support sub workers #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build/bin/runBrowserTests.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ async function runTests(location) {
});
server.listen(8080, '127.0.0.1', async () => {
let failCount = 0;
const browser = await playwright['chromium'].launch({ headless: true, devtools: false });
const browser = await playwright['chromium'].launch({ headless: true, devtools: true });
const context = await browser.newContext();
const page = await context.newPage();
page.on('console', msg => console.log('LOG FROM INSIDE PAGE: ', msg))
const emitter = new events.EventEmitter();
emitter.on('fail', () => {
failCount++;
Expand Down
94 changes: 71 additions & 23 deletions sync-api-common/src/browser/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,90 @@
* ------------------------------------------------------------------------------------------ */

import RAL from '../common/ral';
import { BaseServiceConnection, BaseClientConnection, Message, RequestType } from '../common/connection';
import { BaseServiceConnection, BaseClientConnection, Message, RequestType, KnownConnectionIds, BroadcastChannelName } from '../common/connection';

export class ClientConnection<Requests extends RequestType | undefined = undefined> extends BaseClientConnection<Requests> {

private readonly port: MessagePort | Worker | DedicatedWorkerGlobalScope;
private readonly broadcastChannel: BroadcastChannel;
private readonly messageChannel: MessageChannel;
private readonly sendPort: MessagePort;

constructor(port: MessagePort | Worker | DedicatedWorkerGlobalScope) {
super();
this.port = port;
this.port.onmessage = ((event: MessageEvent<Message>) => {
this.handleMessage(event.data);
});
constructor(channelName: string = BroadcastChannelName) {
super(self.location.pathname);
console.log(`Creating client with name ${channelName} with origin ${origin}`);
this.broadcastChannel = new BroadcastChannel(channelName);
this.messageChannel = new MessageChannel();
this.sendPort = this.messageChannel.port1;
this.sendPort.addEventListener('message', this._handleMessage.bind(this));

// Need to send the port as transfer item, but official api doesn't support that.
const postMessageFunc = this.broadcastChannel.postMessage.bind(this.broadcastChannel) as any;
postMessageFunc(this.createPortBroadcastMessage(this.messageChannel.port2), [this.messageChannel.port2]);
}

dispose() {
this.sendPort.removeEventListener('message', this._handleMessage.bind(this));
this.messageChannel.port1.close();
this.messageChannel.port2.close();
this.broadcastChannel.close();
}

protected postMessage(sharedArrayBuffer: SharedArrayBuffer) {
this.port.postMessage(sharedArrayBuffer);
protected override postMessage(sharedArrayBuffer: SharedArrayBuffer): void {
this.sendPort.postMessage(sharedArrayBuffer);
}

private _handleMessage(message: any) {
try {
if (message.data?.dest === this.connectionId || message.data?.dest === KnownConnectionIds.All) {
this.handleMessage(message.data);
}
} catch (error) {
RAL().console.error(error);
}

}
}

export class ServiceConnection<RequestHandlers extends RequestType | undefined = undefined> extends BaseServiceConnection<RequestHandlers> {

private readonly port: MessagePort | Worker | DedicatedWorkerGlobalScope;
private readonly broadcastChannel: BroadcastChannel;
private readonly clientPorts: Map<string, MessagePort>;

constructor(port: MessagePort | Worker | DedicatedWorkerGlobalScope) {
super();
this.port = port;
this.port.onmessage = (async (event: MessageEvent<SharedArrayBuffer>) => {
try {
await this.handleMessage(event.data);
} catch (error) {
RAL().console.error(error);
}
});
constructor(channelName: string = BroadcastChannelName) {
super(KnownConnectionIds.Main);
console.log(`Creating server with name ${channelName} with origin ${origin}`);
this.broadcastChannel = new BroadcastChannel(channelName);
this.clientPorts = new Map<string, MessagePort>();
this.broadcastChannel.addEventListener('message', this.handleBroadcastMessage.bind(this));
}

dispose() {
this.clientPorts.clear();
this.broadcastChannel.removeEventListener('message', this.handleBroadcastMessage.bind(this));
this.broadcastChannel.close();
}

protected postMessage(message: Message) {
if (message.dest === KnownConnectionIds.All) {
const clientPorts = [...this.clientPorts.values()];
clientPorts.forEach(c => c.postMessage(message));
} else {
const clientPort = this.clientPorts.get(message.dest);
clientPort?.postMessage(message);
}
}

protected onBroadcastPort(message: Message): void {
if (message.params && message.src && message.params.port) {
const messagePort = message.params.port as MessagePort;
messagePort.addEventListener('message', this._handleClientMessage.bind(this));
this.clientPorts.set(message.src, message.params.port as MessagePort);
}
}

protected postMessage(message: Message): void {
this.port.postMessage(message);
private _handleClientMessage(ev: MessageEvent) {
if (ev.data?.byteLength) {
this.handleMessage(ev.data);
}
}
}
6 changes: 4 additions & 2 deletions sync-api-common/src/browser/ril.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ class TestServiceConnection<RequestHandlers extends RequestType | undefined = un
constructor(script: string, testCase?: string) {
const url = testCase !== undefined ? `${script}?toRun=${testCase}` : script;
const worker = new Worker(url);
super(worker);
super();
this.worker = worker;
}
public terminate(): Promise<number> {
console.log('terminating service connection');
this.worker.terminate();
this.dispose();
return Promise.resolve(0);
}
}
Expand Down Expand Up @@ -60,7 +62,7 @@ const _ril: RIL = Object.freeze<RIL>({
$testing: Object.freeze({
ClientConnection: Object.freeze({
create<Requests extends RequestType | undefined = undefined>() {
return new ClientConnection<Requests>(self);
return new ClientConnection<Requests>();
}
}),
ServiceConnection: Object.freeze({
Expand Down
56 changes: 48 additions & 8 deletions sync-api-common/src/common/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export type u64 = number;
export type size = u32;

export type Message = {
dest: string;
src: string;
method: string;
params?: Params;
};
Expand Down Expand Up @@ -53,6 +55,8 @@ export type Params = {

export type Request = {
id: number;
src: string;
dest: string;
} & Message;

export namespace Request {
Expand Down Expand Up @@ -85,6 +89,13 @@ export type RequestType = MessageType & ({
result?: TypedArray | object | null;
});

export const BroadcastChannelName = `@vscode/sync-api/default`;

export enum KnownConnectionIds {
Main = 'main',
All = 'all'
}

class NoResult {
public static readonly kind = 0 as const;
constructor() {
Expand Down Expand Up @@ -572,6 +583,7 @@ export class RPCError extends Error {
export interface ClientConnection<Requests extends RequestType | undefined = undefined> {
readonly sendRequest: SendRequestSignatures<Requests>;
serviceReady(): Promise<void>;
dispose(): void;
}

export abstract class BaseClientConnection<Requests extends RequestType | undefined = undefined> implements ClientConnection<Requests> {
Expand All @@ -581,8 +593,7 @@ export abstract class BaseClientConnection<Requests extends RequestType | undefi
private readonly textDecoder: RAL.TextDecoder;
private readonly readyPromise: Promise<void>;
private readyCallbacks: PromiseCallbacks | undefined;

constructor() {
constructor(protected connectionId: string) {
this.id = 1;
this.textEncoder = RAL().TextEncoder.create();
this.textDecoder = RAL().TextDecoder.create();
Expand All @@ -592,14 +603,19 @@ export abstract class BaseClientConnection<Requests extends RequestType | undefi
}

public serviceReady(): Promise<void> {
this._sendRequest('$/checkready');
return this.readyPromise;
}

protected createPortBroadcastMessage(receivePort: object) {
return { method: '$/broadcastport', src: this.connectionId, dest: KnownConnectionIds.Main, params: {port: receivePort}};
}

public readonly sendRequest: SendRequestSignatures<Requests> = this._sendRequest as SendRequestSignatures<Requests>;

private _sendRequest(method: string, arg1?: Params | ResultType | number, arg2?: ResultType | number, arg3?: number): { errno: 0; data: any } | { errno: RPCErrno } {
const id = this.id++;
const request: Request = { id: id, method };
const request: Request = { id: id, dest: 'main', src: this.connectionId, method };
let params: Params | undefined = undefined;
let resultType: ResultType = new NoResult();
let timeout: number | undefined = undefined;
Expand Down Expand Up @@ -658,10 +674,11 @@ export abstract class BaseClientConnection<Requests extends RequestType | undefi
raw.set(binaryData, binaryOffset);
}

// Send the shard array buffer to the other worker
// Send the shared array buffer to the other worker
const sync = new Int32Array(sharedArrayBuffer, 0, 1);
Atomics.store(sync, 0, 0);
// Send the shared array buffer to the extension host worker
console.log(`Client sending sharedArrayBuffer with length ${sharedArrayBuffer.byteLength}`);
this.postMessage(sharedArrayBuffer);

// Wait for the answer
Expand Down Expand Up @@ -719,7 +736,9 @@ export abstract class BaseClientConnection<Requests extends RequestType | undefi
}
}

protected abstract postMessage(sharedArrayBuffer: SharedArrayBuffer): any;
protected abstract postMessage(sharedArrayBuffer: SharedArrayBuffer): void;

abstract dispose(): void;

protected handleMessage(message: Message): void {
if (message.method === '$/ready') {
Expand Down Expand Up @@ -763,6 +782,7 @@ type RequestHandler = {
export interface ServiceConnection<RequestHandlers extends RequestType | undefined = undefined> {
readonly onRequest: HandleRequestSignatures<RequestHandlers>;
signalReady(): void;
dispose(): void;
}

export abstract class BaseServiceConnection<RequestHandlers extends RequestType | undefined = undefined> implements ServiceConnection<RequestHandlers> {
Expand All @@ -771,8 +791,9 @@ export abstract class BaseServiceConnection<RequestHandlers extends RequestType
private readonly textEncoder: RAL.TextEncoder;
private readonly requestHandlers: Map<string, RequestHandler>;
private readonly requestResults: Map<number, TypedArray>;
private sentReady = false;

constructor() {
constructor(protected readonly connectionId: string) {
this.textDecoder = RAL().TextDecoder.create();
this.textEncoder = RAL().TextEncoder.create();
this.requestHandlers = new Map();
Expand All @@ -788,14 +809,23 @@ export abstract class BaseServiceConnection<RequestHandlers extends RequestType
};
}

protected handleBroadcastMessage(ev: any) {
const message = ev.data as Notification;
if (message?.method === '$/broadcastport' && message?.params && message?.dest === KnownConnectionIds.Main) {
this.onBroadcastPort(message);
}
}

protected async handleMessage(sharedArrayBuffer: SharedArrayBuffer): Promise<void> {
const header = new Uint32Array(sharedArrayBuffer, SyncSize.total, HeaderSize.total / 4);
const requestOffset = header[HeaderIndex.messageOffset];
const requestLength = header[HeaderIndex.messageByteLength];

try {
// See above why we need to slice the Uint8Array.
const message = JSON.parse(this.textDecoder.decode(new Uint8Array(sharedArrayBuffer, requestOffset, requestLength).slice()));
const data = this.textDecoder.decode(new Uint8Array(sharedArrayBuffer, requestOffset, requestLength).slice());
console.log(`Handling message ${header.length} ${data}`);
const message = JSON.parse(data);
if (Request.is(message)) {
if (message.method === '$/fetchResult') {
const resultId: number = message.params!.resultId as number;
Expand All @@ -809,6 +839,11 @@ export abstract class BaseServiceConnection<RequestHandlers extends RequestType
} else {
header[HeaderIndex.errno] = RPCErrno.LazyResultFailed;
}
} else if (message.method === '$/checkready') {
// Client may not have been active when ready signal was sent. Send it again
if (this.sentReady) {
this.signalReady();
}
} else {
if (message.params?.binary === null) {
const binaryParamsLength = header[HeaderIndex.binaryParamByteLength];
Expand Down Expand Up @@ -873,9 +908,14 @@ export abstract class BaseServiceConnection<RequestHandlers extends RequestType
}

public signalReady(): void {
const notification: Notification = { method: '$/ready' };
this.sentReady = true;
const notification: Notification = { method: '$/ready', src: this.connectionId, dest: KnownConnectionIds.All };
this.postMessage(notification);
}

protected abstract postMessage(message: Message): void;

protected abstract onBroadcastPort(message: Notification): void;

abstract dispose(): void;
}
1 change: 1 addition & 0 deletions sync-api-common/src/common/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type AssertionErrorData = {
operator: string;
generatedMessage: boolean;
code: string;
stack?: string;
};

export type ErrorData = {
Expand Down
2 changes: 1 addition & 1 deletion sync-api-common/src/common/test/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"tsBuildInfoFile":"../../../lib/common/test/compile.tsbuildInfo",
"rootDir": ".",
"outDir": "../../../lib/common/test",
"lib": [ "es2020"],
"lib": [ "es2020", "DOM"],
"types": [
"mocha"
]
Expand Down
6 changes: 4 additions & 2 deletions sync-api-common/src/common/test/workers/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ export function assertResult(result: { errno: 0; data: TypedArray } | { errno: R

export async function runSingle(test: (connection: ClientConnection<TestRequests>) => void): Promise<void> {
const connection = RAL().$testing.ClientConnection.create<TestRequests>()!;
await connection.serviceReady();
try {
await connection.serviceReady();
test(connection);
} catch (error) {
if (error instanceof assert.AssertionError) {
Expand All @@ -38,7 +38,8 @@ export async function runSingle(test: (connection: ClientConnection<TestRequests
expected: error.expected,
operator: error.operator,
generatedMessage: error.generatedMessage,
code: error.code
code: error.code,
stack: error.stack
});
} else if (error instanceof Error) {
connection.sendRequest('testing/error', {
Expand All @@ -51,5 +52,6 @@ export async function runSingle(test: (connection: ClientConnection<TestRequests
}
} finally {
connection.sendRequest('testing/done');
connection.dispose();
}
}
Loading