Skip to content

Commit

Permalink
Replace pump with built-in pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
blakeembrey committed Sep 30, 2020
1 parent affea8b commit bcaed90
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
6 changes: 5 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions package.json
Expand Up @@ -91,7 +91,6 @@
"typescript": "^3.9.5"
},
"dependencies": {
"make-error-cause": "^2.2.0",
"pump": "^3.0.0"
"make-error-cause": "^2.2.0"
}
}
33 changes: 19 additions & 14 deletions src/index.ts
@@ -1,4 +1,3 @@
import pump = require("pump");
import { URL } from "url";
import { request as httpRequest, IncomingMessage } from "http";
import { request as httpsRequest, RequestOptions } from "https";
Expand All @@ -21,7 +20,7 @@ import {
constants as h2constants,
ClientHttp2Session,
} from "http2";
import { PassThrough, Readable, Writable } from "stream";
import { pipeline, PassThrough, Writable } from "stream";
import {
Request,
Response,
Expand Down Expand Up @@ -223,7 +222,7 @@ function pumpBody(
return stream.end(body);
}

return pump(body, stream, (err) => {
return pipeline(body, stream, (err) => {
if (err) return onError(err);
});
}
Expand Down Expand Up @@ -275,10 +274,10 @@ function execHttp1(
ref(socket);

const rawRequest = request(arg);
const requestStream = new PassThrough();

// Handle abort events correctly.
const onAbort = () => {
req.signal.off("abort", onAbort);
socket.emit("agentRemove"); // `abort` destroys the connection with no event.
rawRequest.abort();
};
Expand All @@ -291,11 +290,11 @@ function execHttp1(

// Trigger unavailable error when node.js errors before response.
const onRequestError = (err: Error) => {
unref(socket);
req.signal.off("abort", onAbort);

rawRequest.removeListener("response", onResponse);

unref(socket);

return reject(
new ConnectionError(req, `Unable to connect to ${url.host}`, err)
);
Expand All @@ -322,27 +321,32 @@ function execHttp1(
port: remotePort,
} = rawResponse.connection.address() as AddressInfo;

const responseStream = new PassThrough();
let bytesTransferred = 0;
req.signal.emit("responseStarted");

const onData = (chunk: Buffer) => {
req.signal.emit("responseBytes", (bytesTransferred += chunk.length));
};

const onAborted = () => responseStream.push(null);

req.signal.emit("responseStarted");
rawResponse.on("data", onData);
rawResponse.on("aborted", onAborted);

const res = new HttpResponse(
pump(rawResponse, new PassThrough(), (err) => {
pipeline(rawResponse, responseStream, (err) => {
unref(socket);
req.signal.off("abort", onAbort);
if (err) req.signal.emit("error", err);

rawResponse.removeListener("data", onData);
rawResponse.removeListener("aborted", onAborted);

resolveTrailers(rawResponse.trailers);

req.signal.emit("responseEnded");
}) as Readable,
}),
{
status: rawResponse.statusCode,
statusText: rawResponse.statusMessage,
Expand All @@ -364,19 +368,20 @@ function execHttp1(
return resolve(res);
};

const requestStream = new PassThrough();
let bytesTransferred = 0;
req.signal.emit("requestStarted");

const onData = (chunk: Buffer) => {
req.signal.emit("requestBytes", (bytesTransferred += chunk.length));
};

req.signal.emit("requestStarted");
req.signal.on("abort", onAbort);
rawRequest.once("error", onRequestError);
rawRequest.once("response", onResponse);
requestStream.on("data", onData);

pump(requestStream, rawRequest, () => {
pipeline(requestStream, rawRequest, () => {
requestStream.removeListener("data", onData);

req.signal.emit("requestEnded");
Expand Down Expand Up @@ -466,7 +471,7 @@ function execHttp2(
http2Stream.once("trailers", onTrailers);

const res = new Http2Response(
pump(http2Stream, new PassThrough(), (err) => {
pipeline(http2Stream, new PassThrough(), (err) => {
unref(client.socket);
req.signal.off("abort", onAbort);
if (err) req.signal.emit("error", err);
Expand All @@ -477,7 +482,7 @@ function execHttp2(
resolveTrailers({}); // Resolve in case "trailers" wasn't emitted.

req.signal.emit("responseEnded");
}) as Readable,
}),
{
status: Number(headers[h2constants.HTTP2_HEADER_STATUS]),
statusText: "",
Expand Down Expand Up @@ -513,7 +518,7 @@ function execHttp2(
http2Stream.once("response", onResponse);
requestStream.on("data", onData);

pump(requestStream, http2Stream, () => {
pipeline(requestStream, http2Stream, () => {
requestStream.removeListener("data", onData);

req.signal.emit("requestEnded");
Expand Down

0 comments on commit bcaed90

Please sign in to comment.