Skip to content

Commit d57d8bf

Browse files
authored
fix(pool): terminate workers on CTRL+c forceful exits (#9140)
1 parent fa34701 commit d57d8bf

File tree

5 files changed

+100
-9
lines changed

5 files changed

+100
-9
lines changed

packages/vitest/src/node/pools/pool.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ interface QueuedTask {
2323
}
2424

2525
interface ActiveTask extends QueuedTask {
26-
cancelTask: () => Promise<void>
26+
cancelTask: (options?: { force: boolean }) => Promise<void>
2727
}
2828

2929
export class Pool {
@@ -80,7 +80,11 @@ export class Pool {
8080
this.activeTasks.push(activeTask)
8181

8282
// active tasks receive cancel signal and shut down gracefully
83-
async function cancelTask() {
83+
async function cancelTask(options?: { force: boolean }) {
84+
if (options?.force) {
85+
await runner.stop({ force: true })
86+
}
87+
8488
await runner.waitForTerminated()
8589
resolver.reject(new Error('Cancelled'))
8690
}
@@ -171,6 +175,10 @@ export class Pool {
171175
}
172176

173177
async cancel(): Promise<void> {
178+
// Force exit if previous cancel is still on-going
179+
// for example when user does 'CTRL+c' twice in row
180+
const force = this._isCancelling
181+
174182
// Set flag to prevent new tasks from being queued
175183
this._isCancelling = true
176184

@@ -181,13 +189,14 @@ export class Pool {
181189
pendingTasks.forEach(task => task.resolver.reject(error))
182190
}
183191

184-
const activeTasks = this.activeTasks.splice(0)
185-
await Promise.all(activeTasks.map(task => task.cancelTask()))
192+
await Promise.all(this.activeTasks.map(task => task.cancelTask({ force })))
193+
this.activeTasks = []
186194

187-
const sharedRunners = this.sharedRunners.splice(0)
188-
await Promise.all(sharedRunners.map(runner => runner.stop()))
195+
await Promise.all(this.sharedRunners.map(runner => runner.stop()))
196+
this.sharedRunners = []
189197

190-
await Promise.all(this.exitPromises.splice(0))
198+
await Promise.all(this.exitPromises)
199+
this.exitPromises = []
191200

192201
this.workerIds.forEach((_, id) => this.freeWorkerId(id))
193202

packages/vitest/src/node/pools/poolRunner.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,21 @@ enum RunnerState {
1919
STOPPED = 'stopped',
2020
}
2121

22+
interface StopOptions {
23+
/**
24+
* **Do not use unless you have good reason to.**
25+
*
26+
* Indicates whether to skip waiting for worker's response for `{ type: 'stop' }` message or not.
27+
* By default `.stop()` terminates the workers gracefully by sending them stop-message
28+
* and waiting for workers response, so that workers can do proper teardown.
29+
*
30+
* Force exit is used when user presses `CTRL+c` twice in row and intentionally does
31+
* non-graceful exit. For example in cases where worker is stuck on synchronous thread
32+
* blocking function call and it won't response to `{ type: 'stop' }` messages.
33+
*/
34+
force: boolean
35+
}
36+
2237
const START_TIMEOUT = 60_000
2338
const STOP_TIMEOUT = 60_000
2439

@@ -218,7 +233,7 @@ export class PoolRunner {
218233
}
219234
}
220235

221-
async stop(): Promise<void> {
236+
async stop(options?: StopOptions): Promise<void> {
222237
// Wait for any ongoing operation to complete
223238
if (this._operationLock) {
224239
await this._operationLock
@@ -263,6 +278,11 @@ export class PoolRunner {
263278
}
264279
}
265280

281+
// Don't wait for graceful exit's response when force exiting
282+
if (options?.force) {
283+
return onStop({ type: 'stopped', __vitest_worker_response__: true })
284+
}
285+
266286
this.on('message', onStop)
267287
this.postMessage({
268288
type: 'stop',

packages/vitest/src/runtime/workers/init-forks.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ if (isProfiling) {
2626
processOn('SIGTERM', () => processExit())
2727
}
2828

29+
processOn('error', onError)
30+
2931
export default function workerInit(options: {
3032
runTests: (method: 'run' | 'collect', state: WorkerGlobalState, traces: Traces) => Promise<void>
3133
setup?: (context: WorkerSetupContext) => Promise<() => Promise<unknown>>
@@ -36,7 +38,10 @@ export default function workerInit(options: {
3638
post: v => processSend(v),
3739
on: cb => processOn('message', cb),
3840
off: cb => processOff('message', cb),
39-
teardown: () => processRemoveAllListeners('message'),
41+
teardown: () => {
42+
processRemoveAllListeners('message')
43+
processOff('error', onError)
44+
},
4045
runTests: (state, traces) => executeTests('run', state, traces),
4146
collectTests: (state, traces) => executeTests('collect', state, traces),
4247
setup: options.setup,
@@ -51,3 +56,11 @@ export default function workerInit(options: {
5156
}
5257
}
5358
}
59+
60+
// Prevent leaving worker in loops where it tries to send message to closed main
61+
// thread, errors, and tries to send the error.
62+
function onError(error: any) {
63+
if (error?.code === 'ERR_IPC_CHANNEL_CLOSED' || error?.code === 'EPIPE') {
64+
processExit(1)
65+
}
66+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { test } from 'vitest'
2+
3+
test('slow timeouting test', { timeout: 30_000 }, async () => {
4+
console.log("Running slow timeouting test")
5+
await new Promise(resolve => setTimeout(resolve, 40_000))
6+
})

test/cli/test/cancel-run.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { Readable, Writable } from 'node:stream'
2+
import { stripVTControlCharacters } from 'node:util'
3+
import { createDefer } from '@vitest/utils/helpers'
4+
import { expect, test } from 'vitest'
5+
import { createVitest, registerConsoleShortcuts } from 'vitest/node'
6+
7+
test('can force cancel a run', async () => {
8+
const onExit = vi.fn<never>()
9+
const exit = process.exit
10+
onTestFinished(() => {
11+
process.exit = exit
12+
})
13+
process.exit = onExit
14+
15+
const onTestCaseReady = createDefer<void>()
16+
const vitest = await createVitest('test', {
17+
root: 'fixtures/cancel-run',
18+
reporters: [{ onTestCaseReady: () => onTestCaseReady.resolve() }],
19+
})
20+
onTestFinished(() => vitest.close())
21+
22+
const stdin = new Readable({ read: () => '' }) as NodeJS.ReadStream
23+
stdin.isTTY = true
24+
stdin.setRawMode = () => stdin
25+
registerConsoleShortcuts(vitest, stdin, new Writable())
26+
27+
const onLog = vi.spyOn(vitest.logger, 'log').mockImplementation(() => {})
28+
const promise = vitest.start()
29+
30+
await onTestCaseReady
31+
32+
// First CTRL+c should log warning about graceful exit
33+
stdin.emit('data', '\x03')
34+
35+
const logs = onLog.mock.calls.map(log => stripVTControlCharacters(log[0] || '').trim())
36+
expect(logs).toContain('Cancelling test run. Press CTRL+c again to exit forcefully.')
37+
38+
// Second CTRL+c should stop run
39+
stdin.emit('data', '\x03')
40+
await promise
41+
42+
expect(onExit).toHaveBeenCalled()
43+
})

0 commit comments

Comments
 (0)