Skip to content

Commit

Permalink
Add progress events (#322)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vadim Demedes authored and sindresorhus committed Jul 23, 2017
1 parent a4eb37b commit be7b802
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 8 deletions.
171 changes: 165 additions & 6 deletions index.js
Expand Up @@ -3,27 +3,64 @@ const EventEmitter = require('events');
const http = require('http');
const https = require('https');
const PassThrough = require('stream').PassThrough;
const Transform = require('stream').Transform;
const urlLib = require('url');
const fs = require('fs');
const querystring = require('querystring');
const duplexer3 = require('duplexer3');
const intoStream = require('into-stream');
const isStream = require('is-stream');
const getStream = require('get-stream');
const timedOut = require('timed-out');
const urlParseLax = require('url-parse-lax');
const urlToOptions = require('url-to-options');
const lowercaseKeys = require('lowercase-keys');
const decompressResponse = require('decompress-response');
const mimicResponse = require('mimic-response');
const isRetryAllowed = require('is-retry-allowed');
const Buffer = require('safe-buffer').Buffer;
const isURL = require('isurl');
const isPlainObj = require('is-plain-obj');
const PCancelable = require('p-cancelable');
const pTimeout = require('p-timeout');
const pify = require('pify');
const pkg = require('./package');

const getMethodRedirectCodes = new Set([300, 301, 302, 303, 304, 305, 307, 308]);
const allMethodRedirectCodes = new Set([300, 303, 307, 308]);

const isFormData = body => isStream(body) && typeof body.getBoundary === 'function';

const getBodySize = opts => {
const body = opts.body;

if (opts.headers['content-length']) {
return Number(opts.headers['content-length']);
}

if (!body && !opts.stream) {
return 0;
}

if (typeof body === 'string') {
return Buffer.byteLength(body);
}

if (isFormData(body)) {
return pify(body.getLength.bind(body))();
}

if (body instanceof fs.ReadStream) {
return pify(fs.stat)(body.path).then(stat => stat.size);
}

if (isStream(body) && Buffer.isBuffer(body._buffer)) {
return body._buffer.length;
}

return null;
};

function requestAsEventEmitter(opts) {
opts = opts || {};

Expand All @@ -32,6 +69,8 @@ function requestAsEventEmitter(opts) {
const redirects = [];
let retryCount = 0;
let redirectUrl;
let uploadBodySize;
let uploaded = 0;

const get = opts => {
if (opts.protocol !== 'http:' && opts.protocol !== 'https:') {
Expand All @@ -46,7 +85,17 @@ function requestAsEventEmitter(opts) {
fn = electron.net || electron.remote.net;
}

let progressInterval;

const req = fn.request(opts, res => {
clearInterval(progressInterval);

ee.emit('uploadProgress', {
percent: 1,
transferred: uploaded,
total: uploadBodySize
});

const statusCode = res.statusCode;

res.url = redirectUrl || requestUrl;
Expand Down Expand Up @@ -85,22 +134,65 @@ function requestAsEventEmitter(opts) {
return;
}

const downloadBodySize = Number(res.headers['content-length']) || null;
let downloaded = 0;

setImmediate(() => {
const progressStream = new Transform({
transform(chunk, encoding, callback) {
downloaded += chunk.length;

const percent = downloadBodySize ? downloaded / downloadBodySize : 0;

// Let flush() be responsible for emitting the last event
if (percent < 1) {
ee.emit('downloadProgress', {
percent,
transferred: downloaded,
total: downloadBodySize
});
}

callback(null, chunk);
},

flush(callback) {
ee.emit('downloadProgress', {
percent: 1,
transferred: downloaded,
total: downloadBodySize
});

callback();
}
});

mimicResponse(res, progressStream);
progressStream.redirectUrls = redirects;

const response = opts.decompress === true &&
typeof decompressResponse === 'function' &&
req.method !== 'HEAD' ? decompressResponse(res) : res;
req.method !== 'HEAD' ? decompressResponse(progressStream) : progressStream;

if (!opts.decompress && ['gzip', 'deflate'].indexOf(res.headers['content-encoding']) !== -1) {
opts.encoding = null;
}

response.redirectUrls = redirects;

ee.emit('response', response);

ee.emit('downloadProgress', {
percent: 0,
transferred: 0,
total: downloadBodySize
});

res.pipe(progressStream);
});
});

req.once('error', err => {
clearInterval(progressInterval);

const backoff = opts.retries(++retryCount, err);

if (backoff) {
Expand All @@ -111,7 +203,44 @@ function requestAsEventEmitter(opts) {
ee.emit('error', new got.RequestError(err, opts));
});

ee.on('request', req => {
ee.emit('uploadProgress', {
percent: 0,
transferred: 0,
total: uploadBodySize
});

req.connection.on('connect', () => {
const uploadEventFrequency = 150;

progressInterval = setInterval(() => {
const lastUploaded = uploaded;
const headersSize = Buffer.byteLength(req._header);
uploaded = req.connection.bytesWritten - headersSize;

// Prevent the known issue of `bytesWritten` being larger than body size
if (uploadBodySize && uploaded > uploadBodySize) {
uploaded = uploadBodySize;
}

// Don't emit events with unchanged progress and
// prevent last event from being emitted, because
// it's emitted when `response` is emitted
if (uploaded === lastUploaded || uploaded === uploadBodySize) {
return;
}

ee.emit('uploadProgress', {
percent: uploadBodySize ? uploaded / uploadBodySize : 0,
transferred: uploaded,
total: uploadBodySize
});
}, uploadEventFrequency);
});
});

if (opts.gotTimeout) {
clearInterval(progressInterval);
timedOut(req, opts.gotTimeout);
}

Expand All @@ -121,8 +250,16 @@ function requestAsEventEmitter(opts) {
};

setImmediate(() => {
get(opts);
Promise.resolve(getBodySize(opts))
.then(size => {
uploadBodySize = size;
get(opts);
})
.catch(err => {
ee.emit('error', err);
});
});

return ee;
}

Expand All @@ -131,7 +268,9 @@ function asPromise(opts) {
pTimeout(requestPromise, opts.gotTimeout.request, new got.RequestError({message: 'Request timed out', code: 'ETIMEDOUT'}, opts)) :
requestPromise;

return timeoutFn(new PCancelable((onCancel, resolve, reject) => {
const proxy = new EventEmitter();

const promise = timeoutFn(new PCancelable((onCancel, resolve, reject) => {
const ee = requestAsEventEmitter(opts);
let cancelOnRequest = false;

Expand Down Expand Up @@ -191,10 +330,21 @@ function asPromise(opts) {
});

ee.on('error', reject);
ee.on('uploadProgress', proxy.emit.bind(proxy, 'uploadProgress'));
ee.on('downloadProgress', proxy.emit.bind(proxy, 'downloadProgress'));
}));

promise.on = (name, fn) => {
proxy.on(name, fn);
return promise;
};

return promise;
}

function asStream(opts) {
opts.stream = true;

const input = new PassThrough();
const output = new PassThrough();
const proxy = duplexer3(input, output);
Expand Down Expand Up @@ -256,6 +406,8 @@ function asStream(opts) {

ee.on('redirect', proxy.emit.bind(proxy, 'redirect'));
ee.on('error', proxy.emit.bind(proxy, 'error'));
ee.on('uploadProgress', proxy.emit.bind(proxy, 'uploadProgress'));
ee.on('downloadProgress', proxy.emit.bind(proxy, 'downloadProgress'));

return proxy;
}
Expand Down Expand Up @@ -320,7 +472,7 @@ function normalizeArguments(url, opts) {
throw new TypeError('options.body must be a plain Object or Array when options.form or options.json is used');
}

if (isStream(body) && typeof body.getBoundary === 'function') {
if (isFormData(body)) {
// Special case for https://github.com/form-data/form-data
headers['content-type'] = headers['content-type'] || `multipart/form-data; boundary=${body.getBoundary()}`;
} else if (opts.form && canBodyBeStringified) {
Expand All @@ -336,6 +488,13 @@ function normalizeArguments(url, opts) {
headers['content-length'] = length;
}

// Convert buffer to stream to receive upload progress events
// see https://github.com/sindresorhus/got/pull/322
if (Buffer.isBuffer(body)) {
opts.body = intoStream(body);
opts.body._buffer = body;
}

opts.method = (opts.method || 'POST').toUpperCase();
} else {
opts.method = (opts.method || 'GET').toUpperCase();
Expand Down
6 changes: 4 additions & 2 deletions package.json
Expand Up @@ -53,13 +53,16 @@
"decompress-response": "^3.2.0",
"duplexer3": "^0.1.4",
"get-stream": "^3.0.0",
"into-stream": "^3.1.0",
"is-plain-obj": "^1.1.0",
"is-retry-allowed": "^1.0.0",
"is-stream": "^1.0.0",
"isurl": "^1.0.0-alpha5",
"lowercase-keys": "^1.0.0",
"mimic-response": "^1.0.0",
"p-cancelable": "^0.3.0",
"p-timeout": "^1.1.1",
"pify": "^3.0.0",
"safe-buffer": "^5.0.1",
"timed-out": "^4.0.0",
"url-parse-lax": "^1.0.0",
Expand All @@ -70,10 +73,9 @@
"coveralls": "^2.11.4",
"form-data": "^2.1.1",
"get-port": "^3.0.0",
"into-stream": "^3.0.0",
"nyc": "^11.0.2",
"pem": "^1.4.4",
"pify": "^3.0.0",
"slow-stream": "0.0.4",
"tempfile": "^2.0.0",
"tempy": "^0.1.0",
"universal-url": "1.0.0-alpha",
Expand Down
31 changes: 31 additions & 0 deletions readme.md
Expand Up @@ -21,6 +21,7 @@ Created because [`request`](https://github.com/request/request) is bloated *(sev
- [Request cancelation](#aborting-the-request)
- [Follows redirects](#followredirect)
- [Retries on network failure](#retries)
- [Progress events](#onuploadprogress-progress)
- [Handles gzip/deflate](#decompress)
- [Timeout handling](#timeout)
- [Errors with metadata](#errors)
Expand Down Expand Up @@ -202,6 +203,36 @@ got.stream('github.com')

`redirect` event to get the response object of a redirect. The second argument is options for the next request to the redirect location.

##### .on('uploadProgress', progress)
##### .on('downloadProgress', progress)

Progress events for uploading (sending request) and downloading (receiving response). The `progress` argument is an object like:

```js
{
percent: 0.1,
transferred: 1024,
total: 10240
}
```

If it's not possible to retrieve the body size (can happen when streaming), `total` will be `null`.

**Note**: Progress events can also be used with promises.

```js
got('todomvc.com')
.on('downloadProgress', progress => {
// Report download progress
})
.on('uploadProgress', progress => {
// Report upload progress
})
.then(response => {
// Done
});
```

##### .on('error', error, body, response)

`error` event emitted in case of protocol error (like `ENOTFOUND` etc.) or status error (4xx or 5xx). The second argument is the body of the server response in case of status error. The third argument is response object.
Expand Down

0 comments on commit be7b802

Please sign in to comment.