-
Notifications
You must be signed in to change notification settings - Fork 23
/
worker.js
149 lines (127 loc) · 4.22 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
'use strict'
const { realImport, realRequire } = require('real-require')
const { workerData, parentPort } = require('worker_threads')
const { WRITE_INDEX, READ_INDEX } = require('./indexes')
const { waitDiff } = require('./wait')
const {
dataBuf,
filename,
stateBuf
} = workerData
let destination
const state = new Int32Array(stateBuf)
const data = Buffer.from(dataBuf)
async function start () {
let fn
try {
if (filename.endsWith('.ts') || filename.endsWith('.cts')) {
// TODO: add support for the TSM modules loader ( https://github.com/lukeed/tsm ).
if (!process[Symbol.for('ts-node.register.instance')]) {
realRequire('ts-node/register')
} else if (process.env.TS_NODE_DEV) {
realRequire('ts-node-dev')
}
// TODO: Support ES imports once tsc, tap & ts-node provide better compatibility guarantees.
// Remove extra forwardslash on Windows
fn = realRequire(decodeURIComponent(filename.replace(process.platform === 'win32' ? 'file:///' : 'file://', '')))
} else {
fn = (await realImport(filename))
}
// Depending on how the default export is performed, and on how the code is
// transpiled, we may find cases of two nested "default" objects.
// See https://github.com/pinojs/pino/issues/1243#issuecomment-982774762
if (typeof fn === 'object') fn = fn.default
if (typeof fn === 'object') fn = fn.default
} catch (error) {
// A yarn user that tries to start a ThreadStream for an external module
// provides a filename pointing to a zip file.
// eg. require.resolve('pino-elasticsearch') // returns /foo/pino-elasticsearch-npm-6.1.0-0c03079478-6915435172.zip/bar.js
// The `import` will fail to try to load it.
// This catch block executes the `require` fallback to load the module correctly.
// In fact, yarn modifies the `require` function to manage the zipped path.
// More details at https://github.com/pinojs/pino/pull/1113
// The error codes may change based on the node.js version (ENOTDIR > 12, ERR_MODULE_NOT_FOUND <= 12 )
if ((error.code === 'ENOTDIR' || error.code === 'ERR_MODULE_NOT_FOUND') &&
filename.startsWith('file://')) {
fn = realRequire(decodeURIComponent(filename.replace('file://', '')))
} else {
throw error
}
}
destination = await fn(workerData.workerData)
destination.on('error', function (err) {
Atomics.store(state, WRITE_INDEX, -2)
Atomics.notify(state, WRITE_INDEX)
Atomics.store(state, READ_INDEX, -2)
Atomics.notify(state, READ_INDEX)
parentPort.postMessage({
code: 'ERROR',
err
})
})
destination.on('close', function () {
// process._rawDebug('worker close emitted')
const end = Atomics.load(state, WRITE_INDEX)
Atomics.store(state, READ_INDEX, end)
Atomics.notify(state, READ_INDEX)
setImmediate(() => {
process.exit(0)
})
})
}
// No .catch() handler,
// in case there is an error it goes
// to unhandledRejection
start().then(function () {
parentPort.postMessage({
code: 'READY'
})
process.nextTick(run)
})
function run () {
const current = Atomics.load(state, READ_INDEX)
const end = Atomics.load(state, WRITE_INDEX)
// process._rawDebug(`pre state ${current} ${end}`)
if (end === current) {
if (end === data.length) {
waitDiff(state, READ_INDEX, end, Infinity, run)
} else {
waitDiff(state, WRITE_INDEX, end, Infinity, run)
}
return
}
// process._rawDebug(`post state ${current} ${end}`)
if (end === -1) {
// process._rawDebug('end')
destination.end()
return
}
const toWrite = data.toString('utf8', current, end)
// process._rawDebug('worker writing: ' + toWrite)
const res = destination.write(toWrite)
if (res) {
Atomics.store(state, READ_INDEX, end)
Atomics.notify(state, READ_INDEX)
setImmediate(run)
} else {
destination.once('drain', function () {
Atomics.store(state, READ_INDEX, end)
Atomics.notify(state, READ_INDEX)
run()
})
}
}
process.on('unhandledRejection', function (err) {
parentPort.postMessage({
code: 'ERROR',
err
})
process.exit(1)
})
process.on('uncaughtException', function (err) {
parentPort.postMessage({
code: 'ERROR',
err
})
process.exit(1)
})