Skip to content

Commit

Permalink
Merge fdc53ff into edd1e32
Browse files Browse the repository at this point in the history
  • Loading branch information
szmarczak committed Jul 28, 2019
2 parents edd1e32 + fdc53ff commit 6cace4b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 19 deletions.
6 changes: 4 additions & 2 deletions README.md
Expand Up @@ -240,9 +240,11 @@ Max free sessions per origin.

Returns a `string` containing a proper name for sessions created with these options.

#### agent.getSession(authority, options)
#### agent.getSession(authority, options, name)

Returns a Promise giving free `Http2Session`. If no free sessions are found, a new one is created.
Returns a Promise giving free `Http2Session` under the provided name. If no free sessions are found, a new one is created.

If the `name` argument is `undefined`, it defaults to `agent.getName(authority, options)`.

##### authority

Expand Down
41 changes: 29 additions & 12 deletions source/agent.js
Expand Up @@ -110,35 +110,34 @@ class Agent extends EventEmitter {
}
}

async getSession(authority, options) {
async getSession(authority, options, name) {
return new Promise((resolve, reject) => {
const name = this.getName(authority, options);
const detached = {resolve, reject};

name = name || this.getName(authority, options);

if (Reflect.has(this.freeSessions, name)) {
resolve(this.freeSessions[name][0]);

return;
}

if (Reflect.has(this.queue, name)) {
// TODO: limit the maximum amount of listeners
this.queue[name].listeners.push(detached);

return;
}

const listeners = [detached];

const free = () => {
// If our entry is replaced,`completed` will be `false`.
// Or the entry will be `undefined` if all seats are taken.
if (Reflect.has(this.queue, name) && this.queue[name].completed) {
const removeFromQueue = () => {
// Our entry can be replaced. We cannot remove the new one.
if (this.queue[name] === entry) {
delete this.queue[name];
}
};

this.queue[name] = () => {
const entry = () => {
try {
let receivedSettings = false;

Expand Down Expand Up @@ -168,14 +167,30 @@ class Agent extends EventEmitter {
}
}

free();
removeFromQueue();

removeSession(this.freeSessions, name, session);
this._processQueue(name);
});

session.once('remoteSettings', () => {
free();
removeFromQueue();

const movedListeners = listeners.splice(session.remoteSettings.maxConcurrentStreams);

if (movedListeners.length !== 0) {
while (Reflect.has(this.freeSessions, name) && movedListeners.length !== 0) {
movedListeners.shift().resolve(this.freeSessions[name][0]);
}

if (movedListeners.length !== 0) {
this.getSession(authority, options, name);

// Replace listeners with the new ones
this.queue[name].listeners.length = 0;
this.queue[name].listeners.push(...movedListeners);
}
}

if (Reflect.has(this.freeSessions, name)) {
this.freeSessions[name].push(session);
Expand Down Expand Up @@ -239,8 +254,10 @@ class Agent extends EventEmitter {
}
};

this.queue[name].listeners = listeners;
this.queue[name].completed = false;
entry.listeners = listeners;
entry.completed = false;

this.queue[name] = entry;
this._processQueue(name);
});
}
Expand Down
4 changes: 2 additions & 2 deletions source/client-request.js
Expand Up @@ -53,7 +53,7 @@ class ClientRequest extends Writable {
this[kSession] = options.session;
} else if (options.agent === false) {
this.agent = new Agent({maxFreeSessions: 0});
} else if (options.agent === null || typeof options.agent === 'undefined') {
} else if (typeof options.agent === 'undefined' || options.agent === null) {
if (typeof options.createConnection === 'function') {
// This is a workaround - we don't have to create the session on our own.
this.agent = new Agent({maxFreeSessions: 0});
Expand Down Expand Up @@ -103,7 +103,7 @@ class ClientRequest extends Writable {
this[kOptions] = options;
this[kAuthority] = options.authority || new URL(`https://${options.hostname || options.host}:${options.port}`);

if (this.agent && (options.preconnect || typeof options.preconnect === 'undefined')) {
if (this.agent && (typeof options.preconnect === 'undefined' || options.preconnect)) {
this.agent.getSession(this[kAuthority], options).catch(() => {});
}

Expand Down
19 changes: 16 additions & 3 deletions test/agent.js
Expand Up @@ -92,7 +92,7 @@ if (isCompatible) {
t.is(first, second);
});

test('gives the queued session if exists', singleRequestWrapper, async (t, server) => {
test('gives the queued session if exists', wrapper, async (t, server) => {
server.get('/infinite', () => {});

const agent = new Agent({
Expand Down Expand Up @@ -313,6 +313,8 @@ if (isCompatible) {
});

test('appends to freeSessions after the stream has ended', singleRequestWrapper, async (t, server) => {
t.plan(1);

server.get('/', (request, response) => {
setTimeout(() => {
response.end();
Expand All @@ -331,16 +333,27 @@ if (isCompatible) {
secondStream.end();
await pEvent(secondStream, 'close');

t.is(agent.freeSessions[agent.getName(server.url)].length, 2);
setImmediate(() => {
t.is(agent.freeSessions[agent.getName(server.url)].length, 2);
});
});

test('sessions can be overloaded', singleRequestWrapper, async (t, server) => {
test('prevents overloading sessions', singleRequestWrapper, async (t, server) => {
const agent = new Agent();

agent.getSession(server.url);
const requestPromises = Promise.all([agent.request(server.url), agent.request(server.url)]);

const requests = await requestPromises;
t.not(requests[0].session, requests[1].session);
});

test('sessions can be manually overloaded', singleRequestWrapper, async (t, server) => {
const agent = new Agent();

const session = await agent.getSession(server.url);
const requests = [session.request(), session.request()];

t.is(requests[0].session, requests[1].session);
});

Expand Down

0 comments on commit 6cace4b

Please sign in to comment.