Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(generator-helper): continuously handle generator errors #19385

Merged
merged 27 commits into from May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4b76d9a
style: add newlines between methods in GeneratorProcess
aqrln May 22, 2023
311c235
fix(generator-helper): continuously handle generator process errors
aqrln May 22, 2023
6bee07c
fix(cli): don't ignore unhandled exceptions and promise rejections
aqrln May 22, 2023
3d55765
Safer `stop()` method
aqrln May 23, 2023
6260c47
Make listeners and child private fields
aqrln May 23, 2023
e4602e8
Don't send data if child not started or stdin not writable
aqrln May 23, 2023
f7e8a3d
Reject future calls if an error happened before
aqrln May 23, 2023
c18b8a1
Add additional logging
aqrln May 23, 2023
de08a58
Add temporary log
aqrln May 23, 2023
120a3ae
Handle EPIPE error
aqrln May 23, 2023
eaf94c0
Use write operation callback
aqrln May 23, 2023
2ac5c88
Add missing return
aqrln May 23, 2023
96d3739
Remove log
aqrln May 23, 2023
9634461
Set up an error event handler
aqrln May 23, 2023
5d812da
Simplify logic
aqrln May 23, 2023
ca4f910
Remove unnecessary closure
aqrln May 23, 2023
d4c10e4
Make handleResponse typed
aqrln May 23, 2023
c6f81c8
Fix wrong path in .cmd file
aqrln May 23, 2023
79f2ec9
Fix typo
aqrln May 23, 2023
fe4962b
Handle failed startup on Windows
aqrln May 23, 2023
36001ba
Enable skipped tests on Windows
aqrln May 23, 2023
59808fc
Remove listeners after rejecting them
aqrln May 24, 2023
8bbb96c
Refactor listeners and rename to handlers
aqrln May 24, 2023
5d9e130
Add a TODO comment
aqrln May 24, 2023
8356314
Revert "Enable skipped tests on Windows"
aqrln May 24, 2023
3270c62
Update comments for skipped tests
aqrln May 24, 2023
d280fe7
Remove uncaughtException and unhandledRejection handlers
aqrln May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 0 additions & 7 deletions packages/cli/src/bin.ts
Expand Up @@ -51,13 +51,6 @@ const commandArray = process.argv.slice(2)

process.removeAllListeners('warning')

const debug = Debug('prisma:cli')
process.on('uncaughtException', (e) => {
debug(e)
})
process.on('unhandledRejection', (e) => {
debug(e)
})
aqrln marked this conversation as resolved.
Show resolved Hide resolved
// Listen to Ctr + C and exit
process.once('SIGINT', () => {
process.exit(130)
Expand Down
294 changes: 159 additions & 135 deletions packages/generator-helper/src/GeneratorProcess.ts
Expand Up @@ -3,6 +3,7 @@ import type { ChildProcessByStdio } from 'child_process'
import { fork } from 'child_process'
import { spawn } from 'cross-spawn'
import { bold } from 'kleur/colors'
import { Readable, Writable } from 'stream'

import byline from './byline'
import type { GeneratorConfig, GeneratorManifest, GeneratorOptions, JsonRPC } from './types'
Expand All @@ -20,192 +21,215 @@ type GeneratorProcessOptions = {
}

export class GeneratorError extends Error {
public code: number
public data?: any
constructor(message: string, code: number, data?: any) {
name = 'GeneratorError'

constructor(message: string, public code?: number, public data?: any) {
super(message)
this.code = code
this.data = data
if (data?.stack) {
this.stack = data.stack
}
}
}

type ResultHandler<T = unknown> = {
resolve: (value: T) => void
reject: (error: Error) => void
}

export class GeneratorProcess {
child?: ChildProcessByStdio<any, any, any>
listeners: { [key: string]: (result: any, err?: Error) => void } = {}
private stderrLogs = ''
private child?: ChildProcessByStdio<Writable, null, Readable>
private handlers: Record<string, ResultHandler> = {}
private initPromise?: Promise<void>
private isNode: boolean
private initWaitTime: number
private currentGenerateDeferred?: {
resolve: (result: any) => void
reject: (error: Error) => void
}
constructor(private executablePath: string, { isNode = false, initWaitTime = 200 }: GeneratorProcessOptions = {}) {
private errorLogs = ''
private pendingError: Error | undefined

constructor(private pathOrCommand: string, { isNode = false }: GeneratorProcessOptions = {}) {
this.isNode = isNode
this.initWaitTime = initWaitTime
}

async init(): Promise<void> {
if (!this.initPromise) {
this.initPromise = this.initSingleton()
}
return this.initPromise
}

initSingleton(): Promise<void> {
return new Promise((resolve, reject) => {
try {
if (this.isNode) {
this.child = fork(this.executablePath, [], {
stdio: ['pipe', 'inherit', 'pipe', 'ipc'],
env: {
...process.env,
PRISMA_GENERATOR_INVOCATION: 'true',
},
execArgv: ['--max-old-space-size=8096'],
})
} else {
this.child = spawn(this.executablePath, {
stdio: ['pipe', 'inherit', 'pipe'],
env: {
...process.env,
PRISMA_GENERATOR_INVOCATION: 'true',
},
shell: true,
})
if (this.isNode) {
this.child = fork(this.pathOrCommand, [], {
stdio: ['pipe', 'inherit', 'pipe', 'ipc'],
aqrln marked this conversation as resolved.
Show resolved Hide resolved
env: {
...process.env,
PRISMA_GENERATOR_INVOCATION: 'true',
},
// TODO: this assumes the host has at least 8 GB of RAM which may not be the case.
execArgv: ['--max-old-space-size=8096'],
}) as ChildProcessByStdio<Writable, null, Readable>
} else {
this.child = spawn(this.pathOrCommand, {
stdio: ['pipe', 'inherit', 'pipe'],
env: {
...process.env,
PRISMA_GENERATOR_INVOCATION: 'true',
},
shell: true,
})
}

this.child.on('exit', (code, signal) => {
debug(`child exited with code ${code} on signal ${signal}`)
if (code) {
const error = new GeneratorError(
`Generator ${JSON.stringify(this.pathOrCommand)} failed:\n\n${this.errorLogs}`,
)
this.pendingError = error
this.rejectAllHandlers(error)
}
})

this.child.on('exit', (code) => {
if (code && code > 0) {
if (this.currentGenerateDeferred) {
// print last 5 lines of stderr
this.currentGenerateDeferred.reject(new Error(this.stderrLogs.split('\n').slice(-5).join('\n')))
} else {
reject(new Error(`Generator at ${this.executablePath} could not start:\n\n${this.stderrLogs}`))
}
}
})
// Set the error handler for stdin to prevent unhandled error events.
// We handle write errors explicitly in `sendMessage` method.
this.child.stdin.on('error', () => {})

this.child.on('error', (err) => {
if (err.message.includes('EACCES')) {
reject(
new Error(
`The executable at ${this.executablePath} lacks the right chmod. Please use ${bold(
`chmod +x ${this.executablePath}`,
)}`,
),
)
} else {
reject(err)
}
})
this.child.on('error', (error) => {
debug(error)
this.pendingError = error

byline(this.child.stderr).on('data', (line) => {
const response = String(line)
this.stderrLogs += response + '\n'
let data
try {
data = JSON.parse(response)
} catch (e) {
debug(response)
}
if (data) {
this.handleResponse(data)
}
})
// Handle startup errors: reject the `init` promise.
if ((error as NodeJS.ErrnoException).code === 'EACCES') {
reject(
new Error(
`The executable at ${this.pathOrCommand} lacks the right permissions. Please use ${bold(
`chmod +x ${this.pathOrCommand}`,
)}`,
),
)
} else {
reject(error)
}

this.child.on('spawn', () => {
// Wait initWaitTime for the binary to report an error and exit with non-zero exit code before considering it
// successfully started.
// TODO: this is not a reliable way to detect a startup error as the initialization could take longer than
// initWaitTime (200 ms by default), and this also hurts the generation performance since it always waits even
// if the generator succesfully initialized in less than initWaitTime. The proper solution would be to make
// the generator explicitly send a notification when it is ready, and we should wait until we get that
// notification. Requiring that would be a breaking change, however we could start by introducing an optional
// notification that would stop the waiting timer as a performance optimization.
setTimeout(resolve, this.initWaitTime)
})
} catch (e) {
reject(e)
}
// Reject any pending requests if the error event happened after spawning.
this.rejectAllHandlers(error)
})

byline(this.child.stderr).on('data', (line: Buffer) => {
const response = String(line)
let data: JsonRPC.Response | undefined
try {
data = JSON.parse(response)
} catch (e) {
this.errorLogs += response + '\n'
debug(response)
}
if (data) {
this.handleResponse(data)
}
})

this.child.on('spawn', resolve)
})
}
private handleResponse(data: any): void {

private rejectAllHandlers(error: Error) {
for (const id of Object.keys(this.handlers)) {
this.handlers[id].reject(error)
delete this.handlers[id]
}
}

private handleResponse(data: JsonRPC.Response): void {
if (data.jsonrpc && data.id) {
if (typeof data.id !== 'number') {
throw new Error(`message.id has to be a number. Found value ${data.id}`)
}
if (this.listeners[data.id]) {
if (data.error) {
if (this.handlers[data.id]) {
if (isErrorResponse(data)) {
const error = new GeneratorError(data.error.message, data.error.code, data.error.data)
this.listeners[data.id](null, error)
this.handlers[data.id].reject(error)
} else {
this.listeners[data.id](data.result)
this.handlers[data.id].resolve(data.result)
}
delete this.listeners[data.id]
delete this.handlers[data.id]
}
}
}
private registerListener(messageId: number, cb: (result: any, err?: Error) => void): void {
this.listeners[messageId] = cb
}
private sendMessage(message: JsonRPC.Request): void {
this.child!.stdin.write(JSON.stringify(message) + '\n')

private sendMessage(message: JsonRPC.Request, callback: (error?: Error) => void): void {
if (!this.child) {
callback(new GeneratorError('Generator process has not started yet'))
return
}

if (!this.child.stdin.writable) {
callback(new GeneratorError('Cannot send data to the generator process, process already exited'))
return
}

this.child.stdin.write(JSON.stringify(message) + '\n', (error) => {
if (!error) {
return callback()
}

if ((error as NodeJS.ErrnoException).code === 'EPIPE') {
// Child process already terminated but we didn't know about it yet on Node.js side, so the `exit` event hasn't
// been emitted yet, and the `child.stdin.writable` check also passed. We skip this error and let the `exit`
// event handler reject active requests (including this one).
return callback()
}

callback(error)
})
}

private getMessageId(): number {
return globalMessageId++
}

stop(): void {
if (!this.child!.killed) {
this.child!.kill()
if (!this.child?.killed) {
this.child?.kill()
}
}
getManifest(config: GeneratorConfig): Promise<GeneratorManifest | null> {
return new Promise((resolve, reject) => {
const messageId = this.getMessageId()

this.registerListener(messageId, (result, error) => {
if (error) {
return reject(error)
private rpcMethod<T, U>(method: string, mapResult: (x: unknown) => U = (x) => x as U): (arg: T) => Promise<U> {
return (params: T): Promise<U> =>
new Promise((resolve, reject) => {
if (this.pendingError) {
reject(this.pendingError)
return
}
if (result.manifest) {
resolve(result.manifest)
} else {
resolve(null)

const messageId = this.getMessageId()

this.handlers[messageId] = {
resolve: (result) => resolve(mapResult(result)),
reject,
}
})

this.sendMessage({
jsonrpc: '2.0',
method: 'getManifest',
params: config,
id: messageId,
this.sendMessage(
{
jsonrpc: '2.0',
method,
params,
id: messageId,
},
(error) => {
if (error) reject(error)
},
)
})
})
}
generate(options: GeneratorOptions): Promise<any> {
return new Promise((resolve, reject) => {
const messageId = this.getMessageId()

this.currentGenerateDeferred = { resolve, reject }
getManifest = this.rpcMethod<GeneratorConfig, GeneratorManifest | null>(
'getManifest',
(result) => (result as { manifest?: GeneratorManifest | null }).manifest ?? null,
)

this.registerListener(messageId, (result, error) => {
if (error) {
reject(error)
this.currentGenerateDeferred = undefined
return
}
resolve(result)
this.currentGenerateDeferred = undefined
})
generate = this.rpcMethod<GeneratorOptions, void>('generate')
}

this.sendMessage({
jsonrpc: '2.0',
method: 'generate',
params: options,
id: messageId,
})
})
}
function isErrorResponse(response: JsonRPC.Response): response is JsonRPC.ErrorResponse {
return (response as JsonRPC.ErrorResponse).error !== undefined
}
@@ -0,0 +1,5 @@
#!/usr/bin/env node

setTimeout(() => {
throw new Error('test')
}, 1000)
@@ -0,0 +1,2 @@
@ECHO off
node "%~dp0\failing-after-1s-executable" %*