Skip to content

Commit

Permalink
improve .vacuumProcs to always wait for all ending batchprocess children
Browse files Browse the repository at this point in the history
  • Loading branch information
mceachen committed Mar 1, 2022
1 parent efe6572 commit ed8f72f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 27 deletions.
8 changes: 7 additions & 1 deletion src/BatchCluster.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,13 @@ describe("BatchCluster", function () {
results.length / opts.maxTasksPerProcess,
results.length * (isWin ? 9 : 5) // because flaky
)


// So, at this point, we should have at least _asked_ the
// initial child processes to end because they're "worn".

// Running vacuumProcs will return a promise that will only
// resolve when those procs have shut down.

await bc.vacuumProcs()

// Expect no prior pids to remain, as long as there were before-pids:
Expand Down
4 changes: 2 additions & 2 deletions src/BatchCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ export class BatchCluster {
// NOT ASYNC: updates internal state. only exported for tests.
vacuumProcs() {
this.#maybeCheckPids()
const endPromises: (undefined | Promise<void>)[] = []
const endPromises: Promise<void>[] = []
let pidsToReap = Math.max(0, this.#procs.length - this.options.maxProcs)
filterInPlace(this.#procs, (proc) => {
// Only check `.idle` (not `.ready`) procs. We don't want to reap busy
Expand All @@ -434,7 +434,7 @@ export class BatchCluster {
}
return true
})
return Promise.all(endPromises.filter((ea) => ea != null))
return Promise.all(endPromises)
}

/**
Expand Down
39 changes: 15 additions & 24 deletions src/BatchProcess.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import child_process from "child_process"
import { until } from "./Async"
import { Deferred } from "./Deferred"
import { cleanError, tryEach } from "./Error"
import { InternalBatchProcessOptions } from "./InternalBatchProcessOptions"
import { Logger } from "./Logger"
Expand Down Expand Up @@ -51,12 +52,6 @@ export class BatchProcess {
// Only set to true when `proc.pid` is no longer in the process table.
#starting = true

// .#end() started:
#ending = false

// .#end() finished:
#ended = false

// pidExists() returned false
#exited = false

Expand All @@ -72,6 +67,8 @@ export class BatchProcess {
#currentTask: Task | undefined
#currentTaskTimeout: NodeJS.Timer | undefined

#endPromise: undefined | Deferred<void>

/**
* @param onIdle to be called when internal state changes (like the current
* task is resolved, or the process exits)
Expand Down Expand Up @@ -142,7 +139,7 @@ export class BatchProcess {
* child process exiting)
*/
get ending(): boolean {
return this.#ending
return this.#endPromise != null
}

/**
Expand All @@ -152,7 +149,7 @@ export class BatchProcess {
* (but expensive!) answer.
*/
get ended(): boolean {
return this.#ended
return true === this.#endPromise?.settled
}

/**
Expand All @@ -173,9 +170,9 @@ export class BatchProcess {
*/
get whyNotHealthy(): WhyNotHealthy | null {
if (this.#whyNotHealthy != null) return this.#whyNotHealthy
if (this.#ended) {
if (this.ended) {
return "ended"
} else if (this.#ending) {
} else if (this.ending) {
return "ending"
} else if (this.#healthCheckFailures > 0) {
return "unhealthy"
Expand Down Expand Up @@ -296,7 +293,7 @@ export class BatchProcess {
}

#execTask(task: Task): boolean {
if (this.#ending) return false
if (this.ending) return false

this.#taskCount++
this.#currentTask = task
Expand Down Expand Up @@ -377,15 +374,10 @@ export class BatchProcess {
* endPromise.
*/
// NOT ASYNC! needs to change state immediately.
end(gracefully = true, reason: WhyNotHealthy): Promise<void> | undefined {
this.#whyNotHealthy ??= reason

if (this.#ending) {
return undefined
} else {
this.#ending = true
return this.#end(gracefully, reason)
}
end(gracefully = true, reason: WhyNotHealthy): Promise<void> {
return (this.#endPromise ??= new Deferred<void>().observe(
this.#end(gracefully, (this.#whyNotHealthy ??= reason))
)).promise
}

// NOTE: Must only be invoked by this.end(), and only expected to be invoked
Expand Down Expand Up @@ -458,7 +450,6 @@ export class BatchProcess {
)
await kill(this.proc.pid, true)
}
this.#ended = true
this.opts.observer.emit("childEnd", this, reason)
}

Expand All @@ -474,7 +465,7 @@ export class BatchProcess {
}

#onError(reason: WhyNotHealthy, error: Error, task?: Task) {
if (this.#ending) {
if (this.ending) {
// We're ending already, so don't propagate the error.
// This is expected due to race conditions stdin EPIPE and process shutdown.
this.#logger().debug(
Expand Down Expand Up @@ -533,7 +524,7 @@ export class BatchProcess {
const task = this.#currentTask
if (task != null && task.pending) {
task.onStderr(data)
} else if (!this.#ending) {
} else if (!this.ending) {
// If we're ending and there isn't a task, don't worry about it.
this.opts.observer.emit("noTaskData", null, data, this)
void this.end(false, "stderr")
Expand All @@ -546,7 +537,7 @@ export class BatchProcess {
if (task != null && task.pending) {
this.opts.observer.emit("taskData", data, task, this)
task.onStdout(data)
} else if (this.#ending) {
} else if (this.ending) {
// don't care if we're already being shut down.
} else if (!blank(data)) {
this.opts.observer.emit("noTaskData", data, null, this)
Expand Down

0 comments on commit ed8f72f

Please sign in to comment.