Skip to content

Commit

Permalink
Make Agent's maxSessions option work
Browse files Browse the repository at this point in the history
  • Loading branch information
szmarczak committed May 11, 2019
1 parent 04079a7 commit 5ae0051
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 106 deletions.
8 changes: 6 additions & 2 deletions README.md
Expand Up @@ -229,19 +229,23 @@ Returns a `string` containing a proper name for sessions created with these opti

#### agent.getSession(authority, options)

Returns a free `Http2Session`. If no free sessions are found, a new one is created.
Returns a Promise giving free `Http2Session`. If no free sessions are found, a new one is created.

##### authority

Type: `string`

Authority used to create a new session.

##### options

Type: `Object`

Options used to create a new session.

#### agent.request(authority, options, headers)

Returns `Http2Stream`.
Returns a Promise giving `Http2Stream`.

#### agent.createConnection(authority, options)

Expand Down
110 changes: 74 additions & 36 deletions source/agent.js
Expand Up @@ -17,8 +17,9 @@ class Agent extends EventEmitter {
constructor({timeout = 30000, maxSessions = Infinity, maxFreeSessions = 1} = {}) {
super();

this.busySessions = [];
this.busySessions = {};
this.freeSessions = {};
this.queue = {};

this.timeout = timeout;
this.maxSessions = maxSessions;
Expand All @@ -39,46 +40,75 @@ class Agent extends EventEmitter {
return name;
}

_processQueue(name) {
const busyLength = this.busySessions[name] ? this.busySessions[name].length : 0;

if (busyLength < this.maxSessions && this.queue[name]) {
this.queue[name].shift()();

if (this.queue[name].length === 0) {
delete this.queue[name];
}
}
}

getSession(authority, options) {
const name = this.getName(authority, options);
return new Promise((resolve, reject) => {
const name = this.getName(authority, options);

if (this.freeSessions[name]) {
resolve(this.freeSessions[name][0]);
} else {
if (!this.queue[name]) {
this.queue[name] = [];
}

if (!this.freeSessions[name]) {
const session = http2.connect(authority, {
createConnection: this.createConnection,
...options
});
session[kCurrentStreamsCount] = 0;
this.queue[name].push(() => {
try {
const session = http2.connect(authority, {
createConnection: this.createConnection,
...options
});
session[kCurrentStreamsCount] = 0;

session.setTimeout(this.timeout, () => {
session.close();
});
session.setTimeout(this.timeout, () => {
session.close();
});

session.once('error', () => {
session.destroy();
});
session.once('error', () => {
session.destroy();
});

session.once('close', () => {
const freeSessions = this.freeSessions[name] || [];
const index = freeSessions.indexOf(session);
session.once('close', () => {
const freeSessions = this.freeSessions[name] || [];
const index = freeSessions.indexOf(session);

if (index !== -1) {
freeSessions.splice(index, 1);
if (index !== -1) {
freeSessions.splice(index, 1);

if (freeSessions.length === 0) {
delete this.freeSessions[name];
}
}
});
if (freeSessions.length === 0) {
delete this.freeSessions[name];
}
}

this.freeSessions[name] = [session];
}
this._processQueue(name);
});

this.freeSessions[name] = [session];
resolve(session);
} catch (error) {
reject(error);
}
});

return this.freeSessions[name][0];
this._processQueue(name);
}
});
}

request(authority, options, headers) {
async request(authority, options, headers) {
const name = this.getName(authority, options);
const session = this.getSession(authority, options);
const session = await this.getSession(authority, options);

const stream = session.request(headers, {
endStream: false
Expand All @@ -90,7 +120,11 @@ class Agent extends EventEmitter {
if (++session[kCurrentStreamsCount] >= session.remoteSettings.maxConcurrentStreams) {
const freeSessions = this.freeSessions[name];
freeSessions.splice(freeSessions.indexOf(session), 1);
this.busySessions.push(session);

if (!this.busySessions[name]) {
this.busySessions[name] = [];
}
this.busySessions[name].push(session);

if (freeSessions.length === 0) {
delete this.freeSessions[name];
Expand All @@ -101,18 +135,22 @@ class Agent extends EventEmitter {
if (--session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams) {
session.unref();

const busySessionsIndex = this.busySessions.indexOf(session);
const busySessionsIndex = (this.busySessions[name] || []).indexOf(session);
if (busySessionsIndex !== -1) {
this.busySessions.splice(busySessionsIndex, 1);
this.busySessions[name].splice(busySessionsIndex, 1);

if (this.busySessions[name].length === 0) {
delete this.busySessions[name];
}

if (this.freeSessions[name]) {
if (this.freeSessions[name].length < this.maxFreeSessions) {
if ((this.freeSessions[name].length || []) < this.maxFreeSessions) {
if (this.freeSessions[name]) {
this.freeSessions[name].push(session);
} else {
session.close();
this.freeSessions[name] = [session];
}
} else {
this.freeSessions[name] = [session];
session.close();
}
}
}
Expand Down
140 changes: 73 additions & 67 deletions source/client-request.js
Expand Up @@ -168,83 +168,89 @@ class ClientRequest extends Writable {
headers[HTTP2_HEADER_PATH] = this.path;
}

// Makes a HTTP2 request
try {
if (this[kOptions].session) {
this._request = this[kOptions].session.request(headers, {
endStream: false
});
} else {
this._request = this[kAgent].request(this[kAuthority], this[kOptions], headers);
}
} catch (error) {
this.emit('error', error);
return;
}

// The real magic is here
if (!this.destroyed && !this.aborted) {
// Forwards `timeout`, `continue`, `close` and `error` events to this instance.
if (!isConnectMethod) {
proxyEvents(this._request, this, ['timeout', 'continue', 'close', 'error']);
}

// This event tells we are ready to listen for the data.
this._request.once('response', (headers, flags, rawHeaders) => {
this.res = new HTTP2IncomingMessage(this.socket);
this.res.req = this;
this.res.statusCode = headers[HTTP2_HEADER_STATUS];
this.res.headers = {...headers};
this.res.rawHeaders = [...rawHeaders];

this.res.once('end', () => {
if (this.aborted) {
this.res.aborted = true;
this.res.emit('aborted');
} else {
this.res.complete = true;
}
});
const onStream = stream => {
this._request = stream;

if (isConnectMethod) {
this.res.upgrade = true;
if (!this.destroyed && !this.aborted) {
// Forwards `timeout`, `continue`, `close` and `error` events to this instance.
if (!isConnectMethod) {
proxyEvents(this._request, this, ['timeout', 'continue', 'close', 'error']);
}

// The HTTP1 API says the socket is detached here,
// but we can't do that so we pass the original HTTP2 request.
if (this.emit('connect', this.res, this._request, Buffer.alloc(0))) {
this.emit('close');
// This event tells we are ready to listen for the data.
this._request.once('response', (headers, flags, rawHeaders) => {
this.res = new HTTP2IncomingMessage(this.socket);
this.res.req = this;
this.res.statusCode = headers[HTTP2_HEADER_STATUS];
this.res.headers = {...headers};
this.res.rawHeaders = [...rawHeaders];

this.res.once('end', () => {
if (this.aborted) {
this.res.aborted = true;
this.res.emit('aborted');
} else {
this.res.complete = true;
}
});

if (isConnectMethod) {
this.res.upgrade = true;

// The HTTP1 API says the socket is detached here,
// but we can't do that so we pass the original HTTP2 request.
if (this.emit('connect', this.res, this._request, Buffer.alloc(0))) {
this.emit('close');
} else {
// No listeners attached, destroy the original request.
this._request.destroy();
}
} else {
// No listeners attached, destroy the original request.
this._request.destroy();
}
} else {
// Forwards data
this._request.pipe(this.res);
// Forwards data
this._request.pipe(this.res);

if (!this.emit('response', this.res)) {
// No listeners attached, dump the response.
this.res._dump();
if (!this.emit('response', this.res)) {
// No listeners attached, dump the response.
this.res._dump();
}
}
}
});
});

// Emits `information` event
this._request.once('headers', headers => this.emit('information', {statusCode: headers[HTTP2_HEADER_STATUS]}));
// Emits `information` event
this._request.once('headers', headers => this.emit('information', {statusCode: headers[HTTP2_HEADER_STATUS]}));

this._request.once('trailers', (trailers, flags, rawTrailers) => {
// Assigns trailers to the response object.
this.res.trailers = {...trailers};
this.res.rawTrailers = [...rawTrailers];
});
this._request.once('trailers', (trailers, flags, rawTrailers) => {
// Assigns trailers to the response object.
this.res.trailers = {...trailers};
this.res.rawTrailers = [...rawTrailers];
});

this.socket = this._request.session.socket;
this.connection = this._request.session.socket;
this.socket = this._request.session.socket;
this.connection = this._request.session.socket;

process.nextTick(() => {
this.emit('socket', this._request.session.socket);
});
process.nextTick(() => {
this.emit('socket', this._request.session.socket);
});
} else {
this._request.close(NGHTTP2_CANCEL);
}
};

// Makes a HTTP2 request
if (this[kOptions].session) {
try {
onStream(this[kOptions].session.request(headers, {
endStream: false
}));
} catch (error) {
this.emit('error', error);
}
} else {
this._request.close(NGHTTP2_CANCEL);
// eslint-disable-next-line promise/prefer-await-to-then
this[kAgent].request(this[kAuthority], this[kOptions], headers).then(onStream, error => {
this.emit('error', error);
});
}
}

Expand All @@ -253,7 +259,7 @@ class ClientRequest extends Writable {
}

get headersSent() {
return Boolean(this._request);
return this[kFlushedHeaders];
}

removeHeader(name) {
Expand Down
3 changes: 2 additions & 1 deletion test/agent.js
Expand Up @@ -21,7 +21,8 @@ test('timeout works', async t => {
timeout: 100
});

agent.request(s.url, s.options).end().resume();
const request = await agent.request(s.url, s.options);
request.end().resume();

t.is(Object.values(agent.freeSessions).length, 1);

Expand Down

0 comments on commit 5ae0051

Please sign in to comment.