From 0469872a49679697db1bf655727ce2fd74a6c250 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 4 Mar 2024 02:41:58 -0800 Subject: [PATCH] feat: move tjs schema to worker --- src/build/mod.ts | 8 +++ src/build/schema.ts | 108 ++++----------------------------- src/build/schema.worker.ts | 86 ++++++++++++++++++++++++++ src/utils/worker_pool.ts | 120 +++++++++++++++++++++++++++++++++++++ 4 files changed, 227 insertions(+), 95 deletions(-) create mode 100644 src/build/schema.worker.ts create mode 100644 src/utils/worker_pool.ts diff --git a/src/build/mod.ts b/src/build/mod.ts index fb4de920..59ea230b 100644 --- a/src/build/mod.ts +++ b/src/build/mod.ts @@ -13,6 +13,7 @@ import { generateDenoConfig } from "./deno_config.ts"; import { inflateRuntimeArchive } from "./inflate_runtime_archive.ts"; import { Module, Script } from "../project/mod.ts"; import { assertExists, exists, join, tjs } from "../deps.ts"; +import { shutdownAllPools } from "../utils/worker_pool.ts"; import { migrateDev } from "../migrate/dev.ts"; /** @@ -94,6 +95,9 @@ interface BuildStepOpts { files?: string[]; } +// TODO: Convert this to a build flag +const FORCE_BUILD = false; + /** * Plans a build step. */ @@ -113,6 +117,7 @@ export function buildStep( // TODO: max parallel build steps // TODO: error handling if ( + FORCE_BUILD || opts.always || (opts.files && await compareHash(buildState.cache, opts.files)) ) { @@ -176,6 +181,8 @@ export async function build(project: Project) { await Deno.writeTextFile(buildCachePath, JSON.stringify(buildState.cache.newCache)); console.log("✅ Finished"); + + shutdownAllPools(); } async function buildSteps(buildState: BuildState, project: Project) { @@ -315,6 +322,7 @@ async function buildScript( assertExists(script.requestSchema); assertExists(script.responseSchema); + // Populate cache with response if (!buildState.cache.newCache.scriptSchemas[module.name]) buildState.cache.newCache.scriptSchemas[module.name] = {}; buildState.cache.newCache.scriptSchemas[module.name][script.name] = { request: script.requestSchema, diff --git a/src/build/schema.ts b/src/build/schema.ts index 21d26c37..288d2855 100644 --- a/src/build/schema.ts +++ b/src/build/schema.ts @@ -1,103 +1,21 @@ -import { tjs } from "../deps.ts"; import { Module, Project, Script } from "../project/mod.ts"; +import { runJob } from "../utils/worker_pool.ts"; +import { WorkerRequest, WorkerResponse } from "./schema.worker.ts"; +import { createWorkerPool } from "../utils/worker_pool.ts"; + +const WORKER_POOL = createWorkerPool({ + source: import.meta.resolve("./schema.worker.ts"), + // Leave 1 CPU core free + count: Math.max(1, navigator.hardwareConcurrency - 1), +}); // TODO: This function is sync export async function compileSchema( project: Project, module: Module, script: Script, -) { - // TODO: Dupe of project.ts - // https://docs.deno.com/runtime/manual/advanced/typescript/configuration#what-an-implied-tsconfigjson-looks-like - const DEFAULT_COMPILER_OPTIONS = { - "allowJs": true, - "esModuleInterop": true, - "experimentalDecorators": false, - "inlineSourceMap": true, - "isolatedModules": true, - "jsx": "react", - "module": "esnext", - "moduleDetection": "force", - "strict": true, - "target": "esnext", - "useDefineForClassFields": true, - - "lib": ["esnext", "dom", "dom.iterable"], - "allowImportingTsExtensions": true, - }; - - const validateConfig = { - topRef: true, - required: true, - strictNullChecks: true, - noExtraProps: true, - esModuleInterop: true, - - // TODO: Is this needed? - include: [script.path], - - // TODO: Figure out how to work without this? Maybe we manually validate the request type exists? - ignoreErrors: true, - }; - - const program = tjs.getProgramFromFiles( - [script.path], - DEFAULT_COMPILER_OPTIONS, - ); - - const requestSchema = tjs.generateSchema( - program, - "Request", - validateConfig, - [script.path], - ); - if (requestSchema === null) { - throw new Error("Failed to generate request schema for " + script.path); - } - // patchSchema(null, requestSchema); - script.requestSchema = requestSchema; - - const responseSchema = tjs.generateSchema( - program, - "Response", - validateConfig, - [script.path], - ); - if (responseSchema === null) { - throw new Error( - "Failed to generate response schema for " + script.path, - ); - } - // patchSchema(null, responseSchema); - script.responseSchema = responseSchema; +): Promise { + const res = await runJob(WORKER_POOL, { script }); + script.requestSchema = res.requestSchema; + script.responseSchema = res.responseSchema; } - -// function patchSchema(name: string | null, schema: tjs.DefinitionOrBoolean) { -// if (typeof schema === "boolean") return; - -// if (name && name.startsWith("Record<") && schema.type == "object") { -// console.log('Patching', name, schema); -// schema.type = "object"; -// schema.additionalProperties = {}; -// } - -// // Recursively patch schemas -// if (schema.definitions) { -// for (const key in schema.definitions) { -// patchSchema(key, schema.definitions[key]); -// } -// } else if (schema.properties) { -// for (const key in schema.properties) { -// patchSchema(key, schema.properties[key]); -// } -// } else if (schema.items) { -// if (typeof schema.items === "boolean") return; -// else if (Array.isArray(schema.items)) { -// for (const item of schema.items) { -// patchSchema(null, item); -// } -// } else { -// patchSchema(null, schema.items); -// } -// } -// } diff --git a/src/build/schema.worker.ts b/src/build/schema.worker.ts new file mode 100644 index 00000000..3ec3e0b1 --- /dev/null +++ b/src/build/schema.worker.ts @@ -0,0 +1,86 @@ +// Runs synchronise TypeScript code to derive the schema from a script in a +// background worker. + +/// +/// + +import { tjs } from "../deps.ts"; +import { Script } from "../project/mod.ts"; + +export interface WorkerRequest { + script: Script; +} + +export interface WorkerResponse { + requestSchema: tjs.Definition; + responseSchema: tjs.Definition; +} + +self.onmessage = async (ev) => { + const { script } = ev.data as WorkerRequest; + + // TODO: Dupe of project.ts + // https://docs.deno.com/runtime/manual/advanced/typescript/configuration#what-an-implied-tsconfigjson-looks-like + const DEFAULT_COMPILER_OPTIONS = { + "allowJs": true, + "esModuleInterop": true, + "experimentalDecorators": false, + "inlineSourceMap": true, + "isolatedModules": true, + "jsx": "react", + "module": "esnext", + "moduleDetection": "force", + "strict": true, + "target": "esnext", + "useDefineForClassFields": true, + + "lib": ["esnext", "dom", "dom.iterable"], + "allowImportingTsExtensions": true, + }; + + const validateConfig = { + topRef: true, + required: true, + strictNullChecks: true, + noExtraProps: true, + esModuleInterop: true, + + // TODO: Is this needed? + include: [script.path], + + // TODO: Figure out how to work without this? Maybe we manually validate the request type exists? + ignoreErrors: true, + }; + + const program = tjs.getProgramFromFiles( + [script.path], + DEFAULT_COMPILER_OPTIONS, + ); + + const requestSchema = tjs.generateSchema( + program, + "Request", + validateConfig, + [script.path], + ); + if (requestSchema === null) { + throw new Error("Failed to generate request schema for " + script.path); + } + + const responseSchema = tjs.generateSchema( + program, + "Response", + validateConfig, + [script.path], + ); + if (responseSchema === null) { + throw new Error( + "Failed to generate response schema for " + script.path, + ); + } + + self.postMessage({ + requestSchema, + responseSchema, + } as WorkerResponse); +}; diff --git a/src/utils/worker_pool.ts b/src/utils/worker_pool.ts new file mode 100644 index 00000000..f04dc32c --- /dev/null +++ b/src/utils/worker_pool.ts @@ -0,0 +1,120 @@ +/** + * A collection of workers used to run jobs with a max concurrency. + * + * Useful for easily implementing multi-threading by running jobs in the background. + */ +export interface WorkerPool { + source: string; + workers: WorkerInstance[]; + pendingJobs: PendingJob[]; + shutdown: boolean; +} + +interface WorkerInstance { + /** Web Worker running a job. Lazily initiated. */ + worker?: Worker; + /** If a job is running on this worker. */ + busy: boolean; +} + +interface PendingJob { + request: Req; + resolve: (res: Res) => void; + reject: (err: ErrorEvent) => void; +} + +export interface CreateWorkerOpts { + source: string; + count: number; +} + +export function createWorkerPool( + opts: CreateWorkerOpts, +): WorkerPool { + const pool = { + source: opts.source, + workers: Array.from( + { length: opts.count }, + () => ({ busy: false, worker: undefined }), + ), + pendingJobs: [], + shutdown: false, + }; + ALL_POOLS.add(pool); + return pool; +} + +export function runJob( + pool: WorkerPool, + request: Req, +): Promise { + return new Promise((resolve, reject) => { + pool.pendingJobs.push({ request, resolve, reject }); + tickPool(pool); + }); +} + +/** + * Runs all pending jobs on any available workers. + * + * Called any time a worker becomes available or a job is pushed to the pool. + */ +function tickPool(pool: WorkerPool) { + if (pool.shutdown) throw new Error("Pool is shut down"); + + while (true) { + // console.log(`Tick pool (workers: ${pool.workers.filter(w => !w.busy).length}/${pool.workers.length}, pendingJobs: ${pool.pendingJobs.length})`); + + // Find available worker + const availableWorker = pool.workers.find((worker) => !worker.busy); + if (!availableWorker) { + return; + } + + // Get next job + const nextJob = pool.pendingJobs.shift(); + if (!nextJob) { + return; + } + + // Create worker + if (!availableWorker.worker) { + availableWorker.worker = new Worker(pool.source, { type: "module" }); + } + + // Run job + availableWorker.busy = true; + availableWorker.worker.onmessage = (ev) => { + const res = ev.data as Res; + nextJob.resolve(res); + availableWorker.busy = false; + tickPool(pool); + }; + availableWorker.worker.onerror = (err) => { + nextJob.reject(err); + availableWorker.busy = false; + tickPool(pool); + }; + availableWorker.worker.postMessage(nextJob.request); + } +} + +export function shutdownPool(pool: WorkerPool) { + pool.shutdown = true; + for (const worker of pool.workers) { + if (worker.worker) { + worker.worker.terminate(); + worker.worker = undefined; + } + } + ALL_POOLS.delete(pool); +} + +/** Registry of all active pools. Used to shut down any workers still running. */ +const ALL_POOLS: Set> = new Set(); + +export function shutdownAllPools() { + for (const pool of ALL_POOLS) { + shutdownPool(pool); + } +}