-
Notifications
You must be signed in to change notification settings - Fork 15
/
ChildProcessPool.class.js
194 lines (175 loc) · 6.3 KB
/
ChildProcessPool.class.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
const { app } = require('electron');
const { fork } = require('child_process');
const _path = require('path');
const { getRandomString } = require(_path.join(app.getAppPath(), 'app/utils/utils'));
let inspectStartIndex = 5858;
class ChildProcessPool {
constructor({ path, max=6, cwd, env }) {
this.cwd = cwd || _path.dirname(path);
this.env = env || process.env;
this.callbacks = {};
this.pidMap = new Map();
this.collaborationMap = new Map();
this.forked = [];
this.forkedPath = path;
this.forkIndex = 0;
this.maxInstance = max;
}
/* -------------- internal -------------- */
/* Received data from a child process */
dataRespond = (data, id) => {
if (id && this.callbacks[id]) {
this.callbacks[id](data);
delete this.callbacks[id];
};
}
/* Received data from multi child processes */
dataRespondAll = (data, id) => {
if (!id) return;
let resultAll = this.collaborationMap.get(id);
if (resultAll !== undefined) {
this.collaborationMap.set(id, [...resultAll, data]);
} else {
this.collaborationMap.set(id, [data]);
}
resultAll = this.collaborationMap.get(id);
if (resultAll.length === this.forked.length) {
this.callbacks[id](resultAll);
delete this.callbacks[id];
this.collaborationMap.delete(id);
}
}
/* Get a process instance from the pool */
getForkedFromPool(id="default") {
let forked;
if (!this.pidMap.get(id)) {
// create new process
if (this.forked.length < this.maxInstance) {
inspectStartIndex ++;
forked = fork(
this.forkedPath,
this.env.NODE_ENV === "development" ? [`--inspect=${inspectStartIndex}`] : [],
{ cwd: this.cwd, env: { ...this.env, id } }
);
this.forked.push(forked);
forked.on('message', (data) => {
const id = data.id;
delete data.id;
delete data.action;
this.onMessage({ data, id });
});
forked.on('exit', () => { this.onProcessDisconnect(forked.pid) });
forked.on('closed', () => { this.onProcessDisconnect(forked.pid) });
forked.on('error', (err) => { this.onProcessError(err, forked.pid) });
} else {
this.forkIndex = this.forkIndex % this.maxInstance;
forked = this.forked[this.forkIndex];
}
if(id !== 'default')
this.pidMap.set(id, forked.pid);
if(this.pidMap.keys.length === 1000)
console.warn('ChildProcessPool: The count of pidMap is over than 1000, suggest to use unique id!');
this.forkIndex += 1;
} else {
// use existing processes
forked = this.forked.find(f => f.pid === this.pidMap.get(id));
if (!forked) throw new Error(`Get forked process from pool failed! the process pid: ${this.pidMap.get(id)}.`);
}
return forked;
}
/**
* onProcessDisconnect [triggered when a process instance disconnect]
* @param {[String]} pid [process pid]
*/
onProcessDisconnect(pid){
removeForkedFromPool(this.forked, pid, this.pidMap);
}
/**
* onProcessError [triggered when a process instance break]
* @param {[Error]} err [error]
* @param {[String]} pid [process pid]
*/
onProcessError(err, pid) {
console.error("ChildProcessPool: ", err);
this.onProcessDisconnect(pid);
}
/**
* onMessage [Received data from a process]
* @param {[Any]} data [response data]
* @param {[String]} id [process tmp id]
*/
onMessage({ data, id }) {
if (this.collaborationMap.get(id) !== undefined) {
this.dataRespondAll(data, id)
} else {
this.dataRespond(data, id);
}
}
/* -------------- caller -------------- */
/**
* send [Send request to a process]
* @param {[String]} taskName [task name - necessary]
* @param {[Any]} params [data passed to process - necessary]
* @param {[String]} id [the unique id bound to a process instance - not necessary]
* @return {[Promise]} [return a Promise instance]
*/
send(taskName, params, givenId) {
if (givenId === 'default') throw new Error('ChildProcessPool: Prohibit the use of this id value: [default] !')
const id = getRandomString();
const forked = this.getForkedFromPool(givenId);
return new Promise(resolve => {
this.callbacks[id] = resolve;
forked.send({action: taskName, params, id });
});
}
/**
* sendToAll [Send requests to all processes]
* @param {[String]} taskName [task name - necessary]
* @param {[Any]} params [data passed to process - necessary]
* @return {[Promise]} [return a Promise instance]
*/
sendToAll(taskName, params) {
const id = getRandomString();
return new Promise(resolve => {
this.callbacks[id] = resolve;
this.collaborationMap.set(id, []);
if (this.forked.length) {
// use process in pool
this.forked.forEach((forked) => {
forked.send({ action: taskName, params, id });
})
} else {
// use default process
this.getForkedFromPool().send({ action: taskName, params, id });
}
});
}
/**
* disconnect [shutdown a sub process or all sub processes]
* @param {[String]} id [id bound with a sub process. If none is given, all sub processes will be killed.]
*/
disconnect(id) {
if (id !== undefined) {
const pid = this.pidMap.get(id);
const fork = this.forked.find(p => p.pid === pid);
if (fork) fork.disconnect();
} else {
console.warn('ChildProcessPool: The all sub processes will be shutdown!');
this.forked.forEach(fork => {
fork.disconnect();
});
}
}
/**
* setMaxInstanceLimit [set the max count of sub process instances created by pool]
* @param {[Number]} count [the max count instances]
*/
setMaxInstanceLimit(count) {
if (!Number.isInteger(count) || count <= 0)
return console.warn('ChildProcessPool: setMaxInstanceLimit - the param count must be an positive integer!');
if (count < this.maxInstance)
console.warn(`ChildProcesspool: setMaxInstanceLimit - the param count is less than old value ${this.maxInstance} !`);
this.maxInstance = count;
}
}
module.exports = ChildProcessPool;