Skip to content

Commit 67ed526

Browse files
ronagBridgeAR
authored andcommitted
stream: error state cleanup
Clean up end simplify errored state. - errorEmitted should be set in the same tick as 'error' is emitted. - errored should be set as soon as an error occurs. - errored should exist on Readable as well. - refactor destroy logic and make it easier to follow. PR-URL: #30851 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent 2d13896 commit 67ed526

File tree

4 files changed

+140
-50
lines changed

4 files changed

+140
-50
lines changed

lib/_stream_readable.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ function ReadableState(options, stream, isDuplex) {
140140
// Has it been destroyed
141141
this.destroyed = false;
142142

143+
// Indicates whether the stream has errored.
144+
this.errored = false;
145+
143146
// Crypto is kind of old and crusty. Historically, its default string
144147
// encoding is 'binary' so we have to make this configurable.
145148
// Everything else in the universe uses 'utf8', though.

lib/internal/streams/destroy.js

Lines changed: 55 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,25 @@
11
'use strict';
22

3-
function needError(stream, err) {
4-
if (!err) {
5-
return false;
6-
}
7-
8-
const r = stream._readableState;
9-
const w = stream._writableState;
10-
11-
if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
12-
return false;
13-
}
14-
15-
if (w) {
16-
w.errorEmitted = true;
17-
}
18-
if (r) {
19-
r.errorEmitted = true;
20-
}
21-
22-
return true;
23-
}
24-
253
// Undocumented cb() API, needed for core, not for public API.
264
// The cb() will be invoked synchronously if _destroy is synchronous.
5+
// If cb is passed no 'error' event will be emitted.
276
function destroy(err, cb) {
287
const r = this._readableState;
298
const w = this._writableState;
309

31-
if (w && err) {
32-
w.errored = true;
10+
if (err) {
11+
if (w) {
12+
w.errored = true;
13+
}
14+
if (r) {
15+
r.errored = true;
16+
}
3317
}
3418

3519
if ((w && w.destroyed) || (r && r.destroyed)) {
3620
if (cb) {
3721
cb(err);
38-
} else if (needError(this, err)) {
22+
} else if (err) {
3923
process.nextTick(emitErrorNT, this, err);
4024
}
4125

@@ -53,17 +37,24 @@ function destroy(err, cb) {
5337
}
5438

5539
this._destroy(err || null, (err) => {
56-
const emitClose = (w && w.emitClose) || (r && r.emitClose);
40+
if (err) {
41+
if (w) {
42+
w.errored = true;
43+
}
44+
if (r) {
45+
r.errored = true;
46+
}
47+
}
48+
5749
if (cb) {
5850
// Invoke callback before scheduling emitClose so that callback
5951
// can schedule before.
6052
cb(err);
61-
if (emitClose) {
62-
process.nextTick(emitCloseNT, this);
63-
}
64-
} else if (needError(this, err)) {
65-
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
66-
} else if (emitClose) {
53+
// Don't emit 'error' if passed a callback.
54+
process.nextTick(emitCloseNT, this);
55+
} else if (err) {
56+
process.nextTick(emitErrorCloseNT, this, err);
57+
} else {
6758
process.nextTick(emitCloseNT, this);
6859
}
6960
});
@@ -72,15 +63,34 @@ function destroy(err, cb) {
7263
}
7364

7465
function emitErrorCloseNT(self, err) {
75-
self.emit('error', err);
76-
self.emit('close');
66+
emitErrorNT(self, err);
67+
emitCloseNT(self);
7768
}
7869

7970
function emitCloseNT(self) {
80-
self.emit('close');
71+
const r = self._readableState;
72+
const w = self._writableState;
73+
74+
if ((w && w.emitClose) || (r && r.emitClose)) {
75+
self.emit('close');
76+
}
8177
}
8278

8379
function emitErrorNT(self, err) {
80+
const r = self._readableState;
81+
const w = self._writableState;
82+
83+
if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
84+
return;
85+
}
86+
87+
if (w) {
88+
w.errorEmitted = true;
89+
}
90+
if (r) {
91+
r.errorEmitted = true;
92+
}
93+
8494
self.emit('error', err);
8595
}
8696

@@ -90,6 +100,7 @@ function undestroy() {
90100

91101
if (r) {
92102
r.destroyed = false;
103+
r.errored = false;
93104
r.reading = false;
94105
r.ended = false;
95106
r.endEmitted = false;
@@ -118,14 +129,17 @@ function errorOrDestroy(stream, err) {
118129
const r = stream._readableState;
119130
const w = stream._writableState;
120131

121-
if (w & err) {
122-
w.errored = true;
123-
}
124-
125132
if ((r && r.autoDestroy) || (w && w.autoDestroy))
126133
stream.destroy(err);
127-
else if (needError(stream, err))
128-
stream.emit('error', err);
134+
else if (err) {
135+
if (w) {
136+
w.errored = true;
137+
}
138+
if (r) {
139+
r.errored = true;
140+
}
141+
emitErrorNT(stream, err);
142+
}
129143
}
130144

131145

test/parallel/test-stream-readable-destroy.js

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,20 @@ const assert = require('assert');
129129
cb(expected);
130130
});
131131

132+
let ticked = false;
132133
read.on('end', common.mustNotCall('no end event'));
133134
read.on('error', common.mustCall((err) => {
135+
assert.strictEqual(ticked, true);
136+
assert.strictEqual(read._readableState.errorEmitted, true);
137+
assert.strictEqual(read._readableState.errored, true);
134138
assert.strictEqual(err, expected);
135139
}));
136140

137141
read.destroy();
142+
assert.strictEqual(read._readableState.errorEmitted, false);
143+
assert.strictEqual(read._readableState.errored, true);
138144
assert.strictEqual(read.destroyed, true);
145+
ticked = true;
139146
}
140147

141148
{
@@ -174,10 +181,58 @@ const assert = require('assert');
174181

175182
const expected = new Error('kaboom');
176183

177-
read.on('close', common.mustCall());
184+
let ticked = false;
185+
read.on('close', common.mustCall(() => {
186+
assert.strictEqual(read._readableState.errorEmitted, false);
187+
assert.strictEqual(ticked, true);
188+
}));
189+
// 'error' should not be emitted since a callback is passed to
190+
// destroy(err, callback);
191+
read.on('error', common.mustNotCall());
192+
193+
assert.strictEqual(read._readableState.errored, false);
194+
assert.strictEqual(read._readableState.errorEmitted, false);
195+
178196
read.destroy(expected, common.mustCall(function(err) {
197+
assert.strictEqual(read._readableState.errored, true);
179198
assert.strictEqual(err, expected);
180199
}));
200+
assert.strictEqual(read._readableState.errorEmitted, false);
201+
assert.strictEqual(read._readableState.errored, true);
202+
ticked = true;
203+
}
204+
205+
{
206+
const readable = new Readable({
207+
destroy: common.mustCall(function(err, cb) {
208+
process.nextTick(cb, new Error('kaboom 1'));
209+
}),
210+
read() {}
211+
});
212+
213+
let ticked = false;
214+
readable.on('close', common.mustCall(() => {
215+
assert.strictEqual(ticked, true);
216+
assert.strictEqual(readable._readableState.errorEmitted, true);
217+
}));
218+
readable.on('error', common.mustCall((err) => {
219+
assert.strictEqual(ticked, true);
220+
assert.strictEqual(err.message, 'kaboom 2');
221+
assert.strictEqual(readable._readableState.errorEmitted, true);
222+
}));
223+
224+
readable.destroy();
225+
assert.strictEqual(readable.destroyed, true);
226+
assert.strictEqual(readable._readableState.errored, false);
227+
assert.strictEqual(readable._readableState.errorEmitted, false);
228+
229+
// Test case where `readable.destroy()` is called again with an error before
230+
// the `_destroy()` callback is called.
231+
readable.destroy(new Error('kaboom 2'));
232+
assert.strictEqual(readable._readableState.errorEmitted, false);
233+
assert.strictEqual(readable._readableState.errored, true);
234+
235+
ticked = true;
181236
}
182237

183238
{

test/parallel/test-stream-writable-destroy.js

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,22 @@ const assert = require('assert');
157157
write(chunk, enc, cb) { cb(); }
158158
});
159159

160-
write.on('close', common.mustCall());
161-
write.on('error', common.mustCall());
160+
let ticked = false;
161+
write.on('close', common.mustCall(() => {
162+
assert.strictEqual(ticked, true);
163+
}));
164+
write.on('error', common.mustCall((err) => {
165+
assert.strictEqual(ticked, true);
166+
assert.strictEqual(err.message, 'kaboom 1');
167+
assert.strictEqual(write._writableState.errorEmitted, true);
168+
}));
162169

163170
write.destroy(new Error('kaboom 1'));
164171
write.destroy(new Error('kaboom 2'));
165-
assert.strictEqual(write._writableState.errorEmitted, true);
172+
assert.strictEqual(write._writableState.errored, true);
173+
assert.strictEqual(write._writableState.errorEmitted, false);
166174
assert.strictEqual(write.destroyed, true);
175+
ticked = true;
167176
}
168177

169178
{
@@ -176,20 +185,29 @@ const assert = require('assert');
176185
}
177186
});
178187

179-
writable.on('close', common.mustCall());
180-
writable.on('error', common.expectsError({
181-
type: Error,
182-
message: 'kaboom 2'
188+
let ticked = false;
189+
writable.on('close', common.mustCall(() => {
190+
assert.strictEqual(ticked, true);
191+
assert.strictEqual(writable._writableState.errorEmitted, true);
192+
}));
193+
writable.on('error', common.mustCall((err) => {
194+
assert.strictEqual(ticked, true);
195+
assert.strictEqual(err.message, 'kaboom 2');
196+
assert.strictEqual(writable._writableState.errorEmitted, true);
183197
}));
184198

185199
writable.destroy();
186200
assert.strictEqual(writable.destroyed, true);
201+
assert.strictEqual(writable._writableState.errored, false);
187202
assert.strictEqual(writable._writableState.errorEmitted, false);
188203

189204
// Test case where `writable.destroy()` is called again with an error before
190205
// the `_destroy()` callback is called.
191206
writable.destroy(new Error('kaboom 2'));
192-
assert.strictEqual(writable._writableState.errorEmitted, true);
207+
assert.strictEqual(writable._writableState.errorEmitted, false);
208+
assert.strictEqual(writable._writableState.errored, true);
209+
210+
ticked = true;
193211
}
194212

195213
{

0 commit comments

Comments
 (0)