Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revive worker-threads. #61

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ lib/
node_modules/

bundle/*.js
package-lock.json
package-lock.json
pnpm-lock.yaml
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "tgrid",
"version": "0.9.7",
"version": "0.9.8",
"main": "lib/index.js",
"typings": "lib/index.d.ts",
"description": "Grid Computing Framework for TypeScript",
Expand All @@ -23,7 +23,6 @@
},
"dependencies": {
"import2": "^1.0.3",
"serialize-error": "^4.1.0",
"tstl": "^2.5.16",
"ws": "^7.5.3"
},
Expand All @@ -41,6 +40,7 @@
"mv": "^2.1.1",
"prettier": "^3.2.5",
"puppeteer": "^22.4.1",
"rimraf": "^5.0.5",
"source-map-support": "^0.5.21",
"ts-node": "^10.9.2",
"ts-patch": "^3.1.2",
Expand Down
21 changes: 8 additions & 13 deletions src/components/Communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import { Pair } from "tstl/utility/Pair";
import { HashMap } from "tstl/container/HashMap";
import { ConditionVariable } from "tstl/thread/ConditionVariable";

import { Exception } from "tstl/exception/Exception";
import { DomainError } from "tstl/exception/DomainError";
import { RuntimeError } from "tstl/exception/RuntimeError";

import serializeError from "serialize-error";
import { serializeError } from "../utils/internal/serializeError";

/**
* The basic communicator.
Expand Down Expand Up @@ -371,21 +369,18 @@ export abstract class Communicator<Provider> {
*/
private async _Send_return(
uid: number,
flag: boolean,
val: any,
success: boolean,
value: any,
): Promise<void> {
// SPECIAL LOGIC FOR ERROR -> FOR CLEAR JSON ENCODING
if (flag === false && val instanceof Error) {
if (typeof (val as Exception).toJSON === "function")
val = (val as Exception).toJSON();
else val = serializeError(val);
}
if (success === false && value instanceof Error)
value = serializeError(value);

// RETURNS
const ret: Invoke.IReturn = {
uid: uid,
success: flag,
value: val,
uid,
success,
value,
};
await this.sendData(ret);
}
Expand Down
13 changes: 10 additions & 3 deletions src/protocols/workers/WorkerConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ export class WorkerConnector<Header, Provider extends object | null>
extends ConnectorBase<Header, Provider>
implements IWorkerSystem
{
/**
* @hidden
*/
private readonly compiler_: Singleton<Promise<IWorkerCompiler>>;

/**
Expand All @@ -55,18 +58,22 @@ export class WorkerConnector<Header, Provider extends object | null>
/**
* Initializer Constructor.
*
* For reference, you're planning to run a bundled JavaScript file,
* and you're using the NodeJS environment, you can't use the `"thread"`
* mode. You've to use the `"process"` mode instead.
*
* @param header An object containing initialization data like activation.
* @param provider An object providing features for remote system.
* @param type You can specify the worker mode when NodeJS. Default is "thread".
* @param type You can specify the worker mode when NodeJS. Default is "process".
*/
public constructor(
header: Header,
provider: Provider,
type: "thread" | "process" = "thread",
type?: "thread" | "process",
) {
super(header, provider);
this.compiler_ = new Singleton(() =>
is_node() ? NodeWorkerCompiler(type) : WebWorkerCompiler(),
is_node() ? NodeWorkerCompiler(type ?? "process") : WebWorkerCompiler(),
);
}

Expand Down
10 changes: 4 additions & 6 deletions src/protocols/workers/WorkerServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ export class WorkerServer<Header, Provider extends object | null>
*/
public constructor() {
super(undefined);

this.channel_ = new Singleton(async () => {
// BROWSER CASE
if (is_node() === false) return (<any>self) as IFeature;

const threadPort = await ThreadPort();
return threadPort.is_worker_server()
? ((<any>threadPort) as IFeature)
return (await ThreadPort.isWorkerThread())
? ((await ThreadPort()) as IFeature)
: (ProcessChannel as IFeature);
});
this.state_ = WorkerServer.State.NONE;
Expand All @@ -88,7 +87,6 @@ export class WorkerServer<Header, Provider extends object | null>

const data: string = await this._Handshake("getHeader");
const wrapper: IHeaderWrapper<Header> = JSON.parse(data);

return wrapper.header;
});
}
Expand Down Expand Up @@ -130,7 +128,7 @@ export class WorkerServer<Header, Provider extends object | null>

// SUCCESS
const channel = await this.channel_.get();
channel.onmessage = this._Handle_message.bind(this);
channel.onmessage = (evt) => this._Handle_message(evt);
channel.postMessage(WorkerServer.State.OPEN);

this.state_ = WorkerServer.State.OPEN;
Expand Down
1 change: 0 additions & 1 deletion src/protocols/workers/internal/NodeWorkerCompiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ export const NodeWorkerCompiler = async (
break;
}
}

await FileSystem.write(path, content);
return path;
},
Expand Down
41 changes: 29 additions & 12 deletions src/protocols/workers/internal/threads/ThreadPort.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,36 @@ import { NodeModule } from "../../../../utils/internal/NodeModule";
/**
* @hidden
*/

export async function ThreadPort() {
const { parentPort } = await NodeModule.thread.get();
if (!parentPort) throw new Error("This is not a worker thread.");

const process = NodeModule.process();
return {
postMessage: (message: any) => parentPort!.postMessage(message),
close: () => process.exit(0),
onmessage: (listener: (event: MessageEvent) => void) =>
parentPort!.on("message", (message) =>
listener({ data: message } as MessageEvent),
),
document: parentPort
? (null! as Document) // NOT WORKER
: undefined,
is_worker_server: () => !!parentPort,
};
class ThreadPort {
public static postMessage(message: any): void {
parentPort!.postMessage(message);
}
public static close(): void {
process.exit(0);
}
public static set onmessage(listener: (event: MessageEvent) => void) {
parentPort!.on("message", (msg) => {
listener({ data: msg } as MessageEvent);
});
}
public static get document(): Document {
return null!;
}
public static is_worker_server(): boolean {
return true;
}
}
return ThreadPort;
}
export namespace ThreadPort {
export async function isWorkerThread(): Promise<boolean> {
const { parentPort } = await NodeModule.thread.get();
return !!parentPort;
}
}
16 changes: 16 additions & 0 deletions src/utils/internal/serializeError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const serializeError = (error: any) => {
if (
typeof error === "object" &&
error !== null &&
typeof error.toJSON === "function"
)
return error.toJSON();
else if (error instanceof Error)
return {
...error,
name: error.name,
stack: error.stack,
message: error.message,
};
return error;
};
15 changes: 8 additions & 7 deletions test/node/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,23 @@ interface IModule {
}

async function iterate(path: string): Promise<void> {
const file_list: string[] = fs.readdirSync(path);
for (const file of file_list) {
const current_path: string = path + "/" + file;
const stat: fs.Stats = fs.lstatSync(current_path);
for (const file of await fs.promises.readdir(path)) {
const location: string = path + "/" + file;
const stat: fs.Stats = await fs.promises.lstat(location);

if (file === "utils") console.log(location, stat.isDirectory());

if (stat.isDirectory() === true && file !== "internal") {
await iterate(current_path);
await iterate(location);
continue;
} else if (
file.substr(-3) !== ".js" ||
current_path === __dirname + "/index.js"
location === __dirname + "/index.js"
)
continue;

const external: IModule = await import(
current_path.substr(0, current_path.length - 3)
location.substr(0, location.length - 3)
);
for (const key in external)
if (key.substr(0, 5) === "test_") {
Expand Down
1 change: 1 addition & 0 deletions test/node/protocols/workers/internal/calculator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async function get<Controller extends object>(
const connector: WorkerConnector<null, null> = new WorkerConnector(
null,
null,
"process",
);
await connector.connect(path);

Expand Down
3 changes: 1 addition & 2 deletions test/node/protocols/workers/internal/join.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ async function main(): Promise<void> {
const server: WorkerServer<null, null> = new WorkerServer();
await server.open(null);
await server.join();

await fs.promises.writeFile(FILE_PATH, "WorkerServer.join()", "utf8");
fs.writeFileSync(FILE_PATH, "WorkerServer.join()", "utf8");
}
main().catch((exp) => {
console.log(exp);
Expand Down
1 change: 1 addition & 0 deletions test/node/protocols/workers/test_hierarchical_workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export async function test_hierarchical_workers(): Promise<void> {
const connector: WorkerConnector<null, null> = new WorkerConnector(
null,
null,
"process",
);
for (let i: number = 0; i < 5; ++i) {
// DO CONNECT
Expand Down
16 changes: 4 additions & 12 deletions test/node/protocols/workers/test_worker_compiler.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
import fs from "fs";
import cp from "child_process";

import { ICalculator } from "../../../controllers/ICalculator";
import { WorkerConnector } from "tgrid";
import { TestBundler } from "../../../browser/TestBundler";

export function test_worker_connect(): Promise<void> {
return _Test_worker(
(worker) =>
worker.connect(__dirname + "/../../../browser/worker-server.js"),
"process",
);
}

export async function test_worker_compile(): Promise<void> {
export async function test_worker_compiler(): Promise<void> {
const PATH = __dirname + "/../../../../../bundle/worker-server.js";
if (fs.existsSync(PATH) === false) cp.execSync("npm run bundle");
if (fs.existsSync(PATH) === false) await TestBundler.execute();

await _Test_worker(
(worker) => worker.compile(fs.readFileSync(PATH, "utf8")),
"thread",
"process",
);
}

Expand Down
11 changes: 6 additions & 5 deletions test/node/protocols/workers/test_worker_join.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import fs from "fs";
import { WorkerConnector } from "tgrid";
import { FileSystem } from "tgrid/lib/protocols/workers/internal/FileSystem";
import { sleep_for } from "tstl/thread";

const FILE_PATH = __dirname + "/log.dat";

export async function test_worker_join(): Promise<void> {
await FileSystem.write(FILE_PATH, "NOT YET");
await fs.promises.writeFile(FILE_PATH, "NOT YET", "utf8");

const connector: WorkerConnector<null, null> = new WorkerConnector(
null,
null,
);
await connector.connect(__dirname + "/internal/join.js");

sleep_for(100)
sleep_for(1_000)
.then(() => connector.close())
.catch(() => {});
await connector.join();

const content: string = await FileSystem.read(FILE_PATH, "utf8");
await FileSystem.unlink(FILE_PATH);
await sleep_for(50);
const content: string = await fs.promises.readFile(FILE_PATH, "utf8");
await fs.promises.unlink(FILE_PATH);

if (content !== "WorkerServer.join()")
throw new Error("Error on WorkerServer.join()");
Expand Down
10 changes: 10 additions & 0 deletions test/node/utils/test_util_serialize_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { serializeError } from "tgrid/lib/utils/internal/serializeError";

export async function test_util_serialize_error(): Promise<void> {
const e: TypeError = new TypeError("something wrong");
const p: object = JSON.parse(JSON.stringify(serializeError(e)));
const keys: string[] = Object.keys(p);
for (const expected of ["name", "message", "stack"])
if (keys.indexOf(expected) === -1)
throw new Error(`Error object does not have ${expected}`);
}