Skip to content

Commit

Permalink
fix(core): process hangs when plugin exits unexpectedly
Browse files Browse the repository at this point in the history
  • Loading branch information
AgentEnder committed Feb 27, 2024
1 parent f43d3e9 commit 1a06066
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 119 deletions.
7 changes: 2 additions & 5 deletions packages/nx/src/project-graph/build-project-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ import {
} from './nx-deps-cache';
import { applyImplicitDependencies } from './utils/implicit-project-dependencies';
import { normalizeProjectNodes } from './utils/normalize-project-nodes';
import {
isNxPluginV1,
isNxPluginV2,
loadNxPlugins,
} from './plugins/internal-api';
import { loadNxPlugins } from './plugins/internal-api';
import { isNxPluginV1, isNxPluginV2 } from './plugins/utils';
import { CreateDependenciesContext } from './plugins';
import { getRootTsConfigPath } from '../plugins/js/utils/typescript';
import {
Expand Down
36 changes: 0 additions & 36 deletions packages/nx/src/project-graph/plugins/internal-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,42 +88,6 @@ export async function loadNxPlugin(
return loadedPlugin;
}

export function isNxPluginV2(plugin: NxPlugin): plugin is NxPluginV2 {
return 'createNodes' in plugin || 'createDependencies' in plugin;
}

export function isNxPluginV1(
plugin: NxPlugin | RemotePlugin
): plugin is NxPluginV1 {
return 'processProjectGraph' in plugin || 'projectFilePatterns' in plugin;
}

export function normalizeNxPlugin(plugin: NxPlugin): NormalizedPlugin {
if (isNxPluginV2(plugin)) {
return plugin;
}
if (isNxPluginV1(plugin) && plugin.projectFilePatterns) {
return {
...plugin,
createNodes: [
`*/**/${combineGlobPatterns(plugin.projectFilePatterns)}`,
(configFilePath) => {
const root = dirname(configFilePath);
return {
projects: {
[root]: {
name: toProjectName(configFilePath),
targets: plugin.registerProjectTargets?.(configFilePath),
},
},
};
},
],
};
}
return plugin;
}

export async function getDefaultPlugins(root: string) {
return [
join(__dirname, '../../plugins/js'),
Expand Down
51 changes: 33 additions & 18 deletions packages/nx/src/project-graph/plugins/plugin-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { logger } from '../../utils/logger';
import { RemotePlugin, nxPluginCache } from './internal-api';
import { PluginWorkerResult, consumeMessage, createMessage } from './messaging';

const pool: ChildProcess[] = [];
const pool: Set<ChildProcess> = new Set();

const pidMap = new Map<number, { name: string; pending: Set<string> }>();

Expand Down Expand Up @@ -47,13 +47,13 @@ export function loadRemoteNxPlugin(plugin: PluginConfiguration, root: string) {
],
});
worker.send(createMessage({ type: 'load', payload: { plugin, root } }));
pool.push(worker);
pool.add(worker);

logger.verbose(`[plugin-worker] started worker: ${worker.pid}`);

return new Promise<RemotePlugin>((res, rej) => {
worker.on('message', createWorkerHandler(worker, res, rej));
worker.on('exit', () => workerOnExitHandler(worker));
worker.on('exit', createWorkerExitHandler(worker));
});
}

Expand All @@ -66,22 +66,24 @@ export async function shutdownPluginWorkers() {
// Marks the workers as shutdown so that we don't report unexpected exits
pluginWorkersShutdown = true;

logger.verbose(`[plugin-pool] shutting down workers`);
logger.verbose(`[plugin-pool] starting worker shutdown`);

const pending = pool
.map(({ pid }) => Array.from(pidMap.get(pid)?.pending))
.flat();
const pending = getPendingPromises(pool, pidMap);

if (pending.length > 0) {
logger.verbose(
`[plugin-pool] waiting for ${pending.length} pending operations to complete`
);
await Promise.all(pending.map((tx) => promiseBank.get(tx)?.promise));
await Promise.all(pending);
}

logger.verbose(`[plugin-pool] all pending operations completed`);

for (const p of pool) {
p.kill('SIGINT');
}

logger.verbose(`[plugin-pool] all workers killed`);
}

/**
Expand Down Expand Up @@ -215,27 +217,40 @@ function createWorkerHandler(
};
}

function workerOnExitHandler(worker: ChildProcess) {
function createWorkerExitHandler(worker: ChildProcess) {
return () => {
if (!pluginWorkersShutdown) {
pidMap.get(worker.pid)?.pending.forEach((tx) => {
const { rejecter } = promiseBank.get(tx);
rejecter(
`[Nx] plugin worker ${
pidMap.get(worker.pid) ?? worker.pid
} exited unexpectedly`
new Error(
`Plugin worker ${
pidMap.get(worker.pid).name ?? worker.pid
} exited unexpectedly with code ${worker.exitCode}`
)
);
});
shutdownPluginWorkers();
throw new Error(
`[Nx] plugin worker ${
pidMap.get(worker.pid) ?? worker.pid
} exited unexpectedly`
);
}
};
}

process.on('exit', () => {
shutdownPluginWorkers();
if (pool.size) {
shutdownPluginWorkers();
}
});

function getPendingPromises(
pool: Set<ChildProcess>,
pidMap: Map<number, { name: string; pending: Set<string> }>
) {
const pendingTxs: Array<Promise<unknown>> = [];
for (const p of pool) {
const { pending } = pidMap.get(p.pid) ?? { pending: new Set() };
for (const tx of pending) {
pendingTxs.push(promiseBank.get(tx)?.promise);
}
}
return pendingTxs;
}
53 changes: 24 additions & 29 deletions packages/nx/src/project-graph/plugins/plugin-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { PluginWorkerMessage, consumeMessage } from './messaging';
import { PluginConfiguration } from '../../config/nx-json';
import { ProjectConfiguration } from '../../config/workspace-json-project-json';
import { retrieveProjectConfigurationsWithoutPluginInference } from '../utils/retrieve-workspace-files';
import { CreateNodesResultWithContext, NormalizedPlugin } from './internal-api';
import type { CreateNodesResultWithContext, NormalizedPlugin } from './internal-api';
import { CreateNodesContext } from './public-api';
import { CreateNodesError } from './utils';

global.NX_GRAPH_CREATION = true;

Expand Down Expand Up @@ -116,34 +117,28 @@ function runCreateNodesInParallel(
CreateNodesResultWithContext | Promise<CreateNodesResultWithContext>
> = configFiles.map((file) => {
performance.mark(`${plugin.name}:createNodes:${file} - start`);
const value = plugin.createNodes[1](file, pluginOptions, context);
if (value instanceof Promise) {
return value
.catch((e) => {
performance.mark(`${plugin.name}:createNodes:${file} - end`);
throw new Error(
`Unable to create nodes for ${file} using plugin ${plugin.name}.`,
e
);
})
.then((r) => {
performance.mark(`${plugin.name}:createNodes:${file} - end`);
performance.measure(
`${plugin.name}:createNodes:${file}`,
`${plugin.name}:createNodes:${file} - start`,
`${plugin.name}:createNodes:${file} - end`
);
return { ...r, pluginName: plugin.name, file };
});
} else {
performance.mark(`${plugin.name}:createNodes:${file} - end`);
performance.measure(
`${plugin.name}:createNodes:${file}`,
`${plugin.name}:createNodes:${file} - start`,
`${plugin.name}:createNodes:${file} - end`
);
return { ...value, pluginName: plugin.name, file };
}
// Result is either static or a promise, using Promise.resolve lets us
// handle both cases with same logic
const value = Promise.resolve(
plugin.createNodes[1](file, pluginOptions, context)
);
return value
.catch((e) => {
performance.mark(`${plugin.name}:createNodes:${file} - end`);
throw new CreateNodesError(
`Unable to create nodes for ${file} using plugin ${plugin.name}.`,
e
);
})
.then((r) => {
performance.mark(`${plugin.name}:createNodes:${file} - end`);
performance.measure(
`${plugin.name}:createNodes:${file}`,
`${plugin.name}:createNodes:${file} - start`,
`${plugin.name}:createNodes:${file} - end`
);
return { ...r, pluginName: plugin.name, file };
});
});
return Promise.all(promises);
}
61 changes: 61 additions & 0 deletions packages/nx/src/project-graph/plugins/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { dirname } from 'node:path';

import { toProjectName } from '../../config/workspaces';
import { combineGlobPatterns } from '../../utils/globs';

import type { NxPluginV1 } from '../../utils/nx-plugin.deprecated';
import type { NormalizedPlugin, RemotePlugin } from './internal-api';
import type { NxPlugin, NxPluginV2 } from './public-api';

export function isNxPluginV2(plugin: NxPlugin): plugin is NxPluginV2 {
return 'createNodes' in plugin || 'createDependencies' in plugin;
}

export function isNxPluginV1(
plugin: NxPlugin | RemotePlugin
): plugin is NxPluginV1 {
return 'processProjectGraph' in plugin || 'projectFilePatterns' in plugin;
}

export function normalizeNxPlugin(plugin: NxPlugin): NormalizedPlugin {
if (isNxPluginV2(plugin)) {
return plugin;
}
if (isNxPluginV1(plugin) && plugin.projectFilePatterns) {
return {
...plugin,
createNodes: [
`*/**/${combineGlobPatterns(plugin.projectFilePatterns)}`,
(configFilePath) => {
const root = dirname(configFilePath);
return {
projects: {
[root]: {
name: toProjectName(configFilePath),
targets: plugin.registerProjectTargets?.(configFilePath),
},
},
};
},
],
};
}
return plugin;
}

export class CreateNodesError extends Error {
constructor(msg, cause: Error | unknown) {
const message = `${msg} ${
!cause
? ''
: cause instanceof Error
? `\n\n\t Inner Error: ${cause.stack}`
: cause
}`;
// These errors are thrown during a JS callback which is invoked via rust.
// The errors messaging gets lost in the rust -> js -> rust transition, but
// logging the error here will ensure that it is visible in the console.
console.error(message);
super(message, { cause });
}
}
2 changes: 1 addition & 1 deletion packages/nx/src/project-graph/plugins/worker-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import { logger } from '../../utils/logger';

import type * as ts from 'typescript';
import { extname } from 'node:path';
import { NormalizedPlugin, normalizeNxPlugin } from './internal-api';
import { normalizeNxPlugin } from './utils';
import { NxPlugin } from './public-api';

export type LoadedNxPlugin = {
Expand Down
41 changes: 11 additions & 30 deletions packages/nx/src/project-graph/utils/project-configuration-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import { ONLY_MODIFIES_EXISTING_TARGET } from '../../plugins/target-defaults/tar

import { minimatch } from 'minimatch';
import { join } from 'path';
import { CreateNodesResult } from '../plugins';
import { CreateNodesError } from '../plugins/utils';
import {
CreateNodesResultWithContext,
RemotePlugin,
} from '../plugins/internal-api';
import { shutdownPluginWorkers } from '../plugins/plugin-pool';

export type SourceInformation = [file: string, plugin: string];
export type ConfigurationSourceMaps = Record<
Expand Down Expand Up @@ -225,19 +226,16 @@ export function buildProjectsConfigurationsFromProjectPathsAndPlugins(
matchedFiles.push(file);
}
}
try {
let r = createNodes(matchedFiles, {
nxJsonConfiguration: nxJson,
workspaceRoot: root,
});
let r = createNodes(matchedFiles, {
nxJsonConfiguration: nxJson,
workspaceRoot: root,
}).catch((e) =>
shutdownPluginWorkers().then(() => {
throw e;
})
);

results.push(r);
} catch (e) {
throw new CreateNodesError(
`Unable to create nodes using plugin ${plugin.name}.`,
e
);
}
results.push(r);
}

return Promise.all(results).then((results) => {
Expand Down Expand Up @@ -341,23 +339,6 @@ export function readProjectConfigurationsFromRootMap(
return projects;
}

class CreateNodesError extends Error {
constructor(msg, cause: Error | unknown) {
const message = `${msg} ${
!cause
? ''
: cause instanceof Error
? `\n\n\t Inner Error: ${cause.stack}`
: cause
}`;
// These errors are thrown during a JS callback which is invoked via rust.
// The errors messaging gets lost in the rust -> js -> rust transition, but
// logging the error here will ensure that it is visible in the console.
console.error(message);
super(message, { cause });
}
}

/**
* Merges two targets.
*
Expand Down

0 comments on commit 1a06066

Please sign in to comment.