-
Notifications
You must be signed in to change notification settings - Fork 43
/
WolkeStream.js
94 lines (92 loc) · 2.95 KB
/
WolkeStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* Created by Julian on 30.04.2017.
*/
const Stream = require('stream');
const https = require('https');
const http = require('http');
const URL = require('url');
class WolkeStream {
constructor(url, options) {
this.output = new Stream.PassThrough({highWaterMark: 32768});
this.total = 0;
this.done = 0;
this.url = url;
this.request(url, 0);
return this.output;
}
request(url, length) {
const options = typeof url === 'string' ? URL.parse(url) : url;
if (!options.headers) options.headers = {};
if (length > 0) {
options.headers.Range = `bytes=${length}-`;
}
if (options.protocol === 'https:') {
let req = https.get(options, (res) => {
this.processRes(req, res);
});
} else {
let req = http.get(options, (res) => {
this.processRes(req, res);
});
}
}
processRes(req, res) {
if (this.done === 0) this.total = Number(res.headers['content-length']);
req.on('error', (err) => {
if (!err) return;
console.error(err);
if (err.message === 'read ECONNRESET') {
this.output.pause();
res.unpipe();
res.removeAllListeners();
req.removeAllListeners();
return this.request(this.url, this.done);
}
});
res.on('data', (chunk) => {
// if (!chunk) {
// if (this.done === this.total) {
// return this.output.write(null);
// }
// }
this.done += Buffer.byteLength(chunk);
// this.output.write(chunk);
});
res.on('aborted', (err) => {
if (!err) return;
console.error(err);
this.output.pause();
res.unpipe();
res.removeAllListeners();
req.removeAllListeners();
return this.request(this.url, this.done);
});
res.on('error', (err) => {
if (!err) return;
console.error(err);
if (err.message === 'read ECONNRESET') {
this.output.pause();
res.unpipe();
res.removeAllListeners();
req.removeAllListeners();
return this.request(this.url, this.done);
}
});
res.on('end', () => {
// console.log('res end');
if (this.done < this.total) {
res.unpipe();
res.removeAllListeners();
req.removeAllListeners();
return this.request(this.url, this.done);
} else {
res.unpipe();
res.removeAllListeners();
req.removeAllListeners();
this.output.end();
}
});
res.pipe(this.output, {end: false});
}
}
module.exports = WolkeStream;