Skip to content

Commit

Permalink
fix(node): add workaround for handling sub processes in node:node exe…
Browse files Browse the repository at this point in the history
…cutor

In Nx 15, we plan to clean up runExecutor and implement proper process handling in runExecutor
upstream. This is a workaround that fix some racing-condition in node:node exexecutor downstream

ISSUES CLOSED: #9305
  • Loading branch information
nartc committed May 17, 2022
1 parent 4db6c2d commit 4b34b2b
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions packages/node/src/executors/node/node.impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import {
} from '@nrwl/devkit';
import { calculateProjectDependencies } from '@nrwl/workspace/src/utilities/buildable-libs-utils';
import { ChildProcess, fork } from 'child_process';
import { HashingImpl } from 'nx/src/hasher/hashing-impl';
import * as treeKill from 'tree-kill';
import { promisify } from 'util';
import { InspectType, NodeExecutorOptions } from './schema';

let subProcess: ChildProcess = null;
const hasher = new HashingImpl();
const processMap = new Map<string, ChildProcess>();
const hashedMap = new Map<string[], string>();

export interface ExecutorEvent {
outfile: string;
Expand All @@ -24,15 +27,15 @@ export async function* nodeExecutor(
context: ExecutorContext
) {
process.on('SIGTERM', async () => {
await killCurrentProcess();
await killCurrentProcess(options, 'SIGTERM');
process.exit(128 + 15);
});
process.on('SIGINT', async () => {
await killCurrentProcess();
await killCurrentProcess(options, 'SIGINT');
process.exit(128 + 2);
});
process.on('SIGHUP', async () => {
await killCurrentProcess();
await killCurrentProcess(options, 'SIGHUP');
process.exit(128 + 1);
});

Expand Down Expand Up @@ -84,11 +87,16 @@ async function runProcess(
options: NodeExecutorOptions,
mappings: { [project: string]: string }
) {
subProcess = fork(
const execArgv = getExecArgv(options);

const hashed = hasher.hashArray(execArgv.concat(options.args));
hashedMap.set(options.args, hashed);

const subProcess = fork(
joinPathFragments(__dirname, 'node-with-require-overrides'),
options.args,
{
execArgv: getExecArgv(options),
execArgv,
stdio: 'inherit',
env: {
...process.env,
Expand All @@ -98,6 +106,8 @@ async function runProcess(
}
);

processMap.set(hashed, subProcess);

if (!options.watch) {
return new Promise<void>((resolve, reject) => {
subProcess.on('exit', (code) => {
Expand Down Expand Up @@ -136,7 +146,7 @@ async function handleBuildEvent(
) {
// Don't kill previous run unless new build is successful.
if (options.watch && event.success) {
await killCurrentProcess();
await killCurrentProcess(options);
}

if (event.success) {
Expand All @@ -147,11 +157,24 @@ async function handleBuildEvent(
const promisifiedTreeKill: (pid: number, signal: string) => Promise<void> =
promisify(treeKill);

async function killCurrentProcess() {
if (!subProcess) return;
async function killCurrentProcess(
options: NodeExecutorOptions,
signal: string = 'SIGTERM'
) {
const currentProcessKey = hashedMap.get(options.args);
if (!currentProcessKey) return;

const currentProcess = processMap.get(currentProcessKey);
if (!currentProcess) return;

try {
await promisifiedTreeKill(subProcess.pid, 'SIGTERM');
await promisifiedTreeKill(currentProcess.pid, signal);

// if the currentProcess.killed is false, invoke kill()
// to properly send the signal to the process
if (!currentProcess.killed) {
currentProcess.kill(signal as NodeJS.Signals);
}
} catch (err) {
if (Array.isArray(err) && err[0] && err[2]) {
const errorMessage = err[2];
Expand All @@ -160,7 +183,8 @@ async function killCurrentProcess() {
logger.error(err.message);
}
} finally {
subProcess = null;
processMap.delete(currentProcessKey);
hashedMap.delete(options.args);
}
}

Expand Down

0 comments on commit 4b34b2b

Please sign in to comment.