Skip to content

Commit

Permalink
feat: move tjs schema to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFlurry committed Mar 6, 2024
1 parent 479c0ef commit 0469872
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 95 deletions.
8 changes: 8 additions & 0 deletions src/build/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { generateDenoConfig } from "./deno_config.ts";
import { inflateRuntimeArchive } from "./inflate_runtime_archive.ts";
import { Module, Script } from "../project/mod.ts";
import { assertExists, exists, join, tjs } from "../deps.ts";
import { shutdownAllPools } from "../utils/worker_pool.ts";
import { migrateDev } from "../migrate/dev.ts";

/**
Expand Down Expand Up @@ -94,6 +95,9 @@ interface BuildStepOpts {
files?: string[];
}

// TODO: Convert this to a build flag
const FORCE_BUILD = false;

/**
* Plans a build step.
*/
Expand All @@ -113,6 +117,7 @@ export function buildStep(
// TODO: max parallel build steps
// TODO: error handling
if (
FORCE_BUILD ||
opts.always ||
(opts.files && await compareHash(buildState.cache, opts.files))
) {
Expand Down Expand Up @@ -176,6 +181,8 @@ export async function build(project: Project) {
await Deno.writeTextFile(buildCachePath, JSON.stringify(buildState.cache.newCache));

console.log("✅ Finished");

shutdownAllPools();
}

async function buildSteps(buildState: BuildState, project: Project) {
Expand Down Expand Up @@ -315,6 +322,7 @@ async function buildScript(
assertExists(script.requestSchema);
assertExists(script.responseSchema);

// Populate cache with response
if (!buildState.cache.newCache.scriptSchemas[module.name]) buildState.cache.newCache.scriptSchemas[module.name] = {};
buildState.cache.newCache.scriptSchemas[module.name][script.name] = {
request: script.requestSchema,
Expand Down
108 changes: 13 additions & 95 deletions src/build/schema.ts
Original file line number Diff line number Diff line change
@@ -1,103 +1,21 @@
import { tjs } from "../deps.ts";
import { Module, Project, Script } from "../project/mod.ts";
import { runJob } from "../utils/worker_pool.ts";
import { WorkerRequest, WorkerResponse } from "./schema.worker.ts";
import { createWorkerPool } from "../utils/worker_pool.ts";

const WORKER_POOL = createWorkerPool<WorkerRequest, WorkerResponse>({
source: import.meta.resolve("./schema.worker.ts"),
// Leave 1 CPU core free
count: Math.max(1, navigator.hardwareConcurrency - 1),
});

// TODO: This function is sync
export async function compileSchema(
project: Project,
module: Module,
script: Script,
) {
// TODO: Dupe of project.ts
// https://docs.deno.com/runtime/manual/advanced/typescript/configuration#what-an-implied-tsconfigjson-looks-like
const DEFAULT_COMPILER_OPTIONS = {
"allowJs": true,
"esModuleInterop": true,
"experimentalDecorators": false,
"inlineSourceMap": true,
"isolatedModules": true,
"jsx": "react",
"module": "esnext",
"moduleDetection": "force",
"strict": true,
"target": "esnext",
"useDefineForClassFields": true,

"lib": ["esnext", "dom", "dom.iterable"],
"allowImportingTsExtensions": true,
};

const validateConfig = {
topRef: true,
required: true,
strictNullChecks: true,
noExtraProps: true,
esModuleInterop: true,

// TODO: Is this needed?
include: [script.path],

// TODO: Figure out how to work without this? Maybe we manually validate the request type exists?
ignoreErrors: true,
};

const program = tjs.getProgramFromFiles(
[script.path],
DEFAULT_COMPILER_OPTIONS,
);

const requestSchema = tjs.generateSchema(
program,
"Request",
validateConfig,
[script.path],
);
if (requestSchema === null) {
throw new Error("Failed to generate request schema for " + script.path);
}
// patchSchema(null, requestSchema);
script.requestSchema = requestSchema;

const responseSchema = tjs.generateSchema(
program,
"Response",
validateConfig,
[script.path],
);
if (responseSchema === null) {
throw new Error(
"Failed to generate response schema for " + script.path,
);
}
// patchSchema(null, responseSchema);
script.responseSchema = responseSchema;
): Promise<void> {
const res = await runJob(WORKER_POOL, { script });
script.requestSchema = res.requestSchema;
script.responseSchema = res.responseSchema;
}

// function patchSchema(name: string | null, schema: tjs.DefinitionOrBoolean) {
// if (typeof schema === "boolean") return;

// if (name && name.startsWith("Record<") && schema.type == "object") {
// console.log('Patching', name, schema);
// schema.type = "object";
// schema.additionalProperties = {};
// }

// // Recursively patch schemas
// if (schema.definitions) {
// for (const key in schema.definitions) {
// patchSchema(key, schema.definitions[key]);
// }
// } else if (schema.properties) {
// for (const key in schema.properties) {
// patchSchema(key, schema.properties[key]);
// }
// } else if (schema.items) {
// if (typeof schema.items === "boolean") return;
// else if (Array.isArray(schema.items)) {
// for (const item of schema.items) {
// patchSchema(null, item);
// }
// } else {
// patchSchema(null, schema.items);
// }
// }
// }
86 changes: 86 additions & 0 deletions src/build/schema.worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Runs synchronise TypeScript code to derive the schema from a script in a
// background worker.

/// <reference no-default-lib="true" />
/// <reference lib="deno.worker" />

import { tjs } from "../deps.ts";
import { Script } from "../project/mod.ts";

export interface WorkerRequest {
script: Script;
}

export interface WorkerResponse {
requestSchema: tjs.Definition;
responseSchema: tjs.Definition;
}

self.onmessage = async (ev) => {
const { script } = ev.data as WorkerRequest;

// TODO: Dupe of project.ts
// https://docs.deno.com/runtime/manual/advanced/typescript/configuration#what-an-implied-tsconfigjson-looks-like
const DEFAULT_COMPILER_OPTIONS = {
"allowJs": true,
"esModuleInterop": true,
"experimentalDecorators": false,
"inlineSourceMap": true,
"isolatedModules": true,
"jsx": "react",
"module": "esnext",
"moduleDetection": "force",
"strict": true,
"target": "esnext",
"useDefineForClassFields": true,

"lib": ["esnext", "dom", "dom.iterable"],
"allowImportingTsExtensions": true,
};

const validateConfig = {
topRef: true,
required: true,
strictNullChecks: true,
noExtraProps: true,
esModuleInterop: true,

// TODO: Is this needed?
include: [script.path],

// TODO: Figure out how to work without this? Maybe we manually validate the request type exists?
ignoreErrors: true,
};

const program = tjs.getProgramFromFiles(
[script.path],
DEFAULT_COMPILER_OPTIONS,
);

const requestSchema = tjs.generateSchema(
program,
"Request",
validateConfig,
[script.path],
);
if (requestSchema === null) {
throw new Error("Failed to generate request schema for " + script.path);
}

const responseSchema = tjs.generateSchema(
program,
"Response",
validateConfig,
[script.path],
);
if (responseSchema === null) {
throw new Error(
"Failed to generate response schema for " + script.path,
);
}

self.postMessage({
requestSchema,
responseSchema,
} as WorkerResponse);
};
120 changes: 120 additions & 0 deletions src/utils/worker_pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* A collection of workers used to run jobs with a max concurrency.
*
* Useful for easily implementing multi-threading by running jobs in the background.
*/
export interface WorkerPool<Req, Res> {
source: string;
workers: WorkerInstance[];
pendingJobs: PendingJob<Req, Res>[];
shutdown: boolean;
}

interface WorkerInstance {
/** Web Worker running a job. Lazily initiated. */
worker?: Worker;
/** If a job is running on this worker. */
busy: boolean;
}

interface PendingJob<Req, Res> {
request: Req;
resolve: (res: Res) => void;
reject: (err: ErrorEvent) => void;
}

export interface CreateWorkerOpts {
source: string;
count: number;
}

export function createWorkerPool<Req, Res>(
opts: CreateWorkerOpts,
): WorkerPool<Req, Res> {
const pool = {
source: opts.source,
workers: Array.from(
{ length: opts.count },
() => ({ busy: false, worker: undefined }),
),
pendingJobs: [],
shutdown: false,
};
ALL_POOLS.add(pool);
return pool;
}

export function runJob<Req, Res>(
pool: WorkerPool<Req, Res>,
request: Req,
): Promise<Res> {
return new Promise<Res>((resolve, reject) => {
pool.pendingJobs.push({ request, resolve, reject });
tickPool(pool);
});
}

/**
* Runs all pending jobs on any available workers.
*
* Called any time a worker becomes available or a job is pushed to the pool.
*/
function tickPool<Req, Res>(pool: WorkerPool<Req, Res>) {
if (pool.shutdown) throw new Error("Pool is shut down");

while (true) {
// console.log(`Tick pool (workers: ${pool.workers.filter(w => !w.busy).length}/${pool.workers.length}, pendingJobs: ${pool.pendingJobs.length})`);

// Find available worker
const availableWorker = pool.workers.find((worker) => !worker.busy);
if (!availableWorker) {
return;
}

// Get next job
const nextJob = pool.pendingJobs.shift();
if (!nextJob) {
return;
}

// Create worker
if (!availableWorker.worker) {
availableWorker.worker = new Worker(pool.source, { type: "module" });
}

// Run job
availableWorker.busy = true;
availableWorker.worker.onmessage = (ev) => {
const res = ev.data as Res;
nextJob.resolve(res);
availableWorker.busy = false;
tickPool(pool);
};
availableWorker.worker.onerror = (err) => {
nextJob.reject(err);
availableWorker.busy = false;
tickPool(pool);
};
availableWorker.worker.postMessage(nextJob.request);
}
}

export function shutdownPool(pool: WorkerPool<unknown, unknown>) {
pool.shutdown = true;
for (const worker of pool.workers) {
if (worker.worker) {
worker.worker.terminate();
worker.worker = undefined;
}
}
ALL_POOLS.delete(pool);
}

/** Registry of all active pools. Used to shut down any workers still running. */
const ALL_POOLS: Set<WorkerPool<unknown, unknown>> = new Set();

export function shutdownAllPools() {
for (const pool of ALL_POOLS) {
shutdownPool(pool);
}
}

0 comments on commit 0469872

Please sign in to comment.