Skip to content

Commit

Permalink
Allow the usage of push streams (#19)
Browse files Browse the repository at this point in the history
Push streams are disabled by default, but they can be enabled from now on.
  • Loading branch information
szmarczak committed Jul 29, 2019
1 parent 5aa5799 commit d2c4d2d
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ Destroys **all** sessions.
- [WebSockets over HTTP2 is not supported yet](https://github.com/nodejs/node/issues/15230), although there is [a proposal](https://tools.ietf.org/html/rfc8441) already.
- [HTTP2 sockets cannot be malformed](https://github.com/nodejs/node/blob/cc8250fab86486632fdeb63892be735d7628cd13/lib/internal/http2/core.js#L725), therefore modifying the socket will have no effect.
- HTTP2 is a binary protocol. Headers are sent without any validation.
- You can make [a custom Agent](examples/push-stream/index.js) to support push streams.

## Benchmarks

Expand Down
2 changes: 1 addition & 1 deletion agent-example.js → examples/agent.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
const http2 = require('.'); // Note: using local version
const http2 = require('../source'); // Note: using the local version

class MyAgent extends http2.Agent {
createConnection(authority, options) {
Expand Down
74 changes: 74 additions & 0 deletions examples/push-stream/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
'use strict';
const ManyKeysMap = require('many-keys-map');
const {extend: gotExtend} = require('got');
const http2 = require('../../source'); // Note: using the local version

class PushAgent extends http2.Agent {
constructor(options) {
super(options);
this.settings.enablePush = true;

this.on('session', session => {
const pushCache = new ManyKeysMap();
session.pushCache = pushCache;

session.on('stream', (stream, requestHeaders) => {
const parsedPushHeaders = PushAgent._parsePushHeaders(requestHeaders);

if (pushCache.has(parsedPushHeaders)) {
stream.close(http2.constants.NGHTTP2_REFUSED_STREAM);
return;
}

stream.once('push', pushHeaders => {
pushCache.set(parsedPushHeaders, {stream, pushHeaders});
});
});
});
}

async request(authority, options, headers) {
const session = await this.getSession(authority, options);

const parsedPushHeaders = PushAgent._parsePushHeaders(headers);
const cache = session.pushCache.get(parsedPushHeaders);
if (cache) {
const {stream, pushHeaders} = cache;
delete session.pushCache.delete(parsedPushHeaders);

setImmediate(() => {
stream.emit('response', pushHeaders);
});

return stream;
}

const stream = session.request(headers);

return stream;
}

static _parsePushHeaders(headers) {
// TODO: headers[':authority'] needs to be verified properly.

return [
headers[':path'] || '/',
headers[':method'] || 'GET'
];
}
}

(async () => {
const got = gotExtend({
baseUrl: 'https://localhost:3000',
request: http2.request,
rejectUnauthorized: false,
agent: new PushAgent()
});

const response = await got('');
console.log('/', response.body, response.headers);

const pushResponse = await got('push');
console.log('/push', pushResponse.body, pushResponse.headers);
})();
33 changes: 33 additions & 0 deletions examples/push-stream/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict';
const util = require('util');
const http2 = require('http2');
const createCert = require('create-cert');

const PORT = 3000;

(async () => {
const {cert, key} = await createCert();

const server = http2.createSecureServer({cert, key});

server.on('stream', stream => {
stream.respond({':status': 200});
stream.pushStream({':path': '/push'}, (error, pushStream) => {
if (error) {
throw error;
}

pushStream.respond({':status': 200});
pushStream.end('some pushed data');
});

stream.end('some data');
});

server.listen = util.promisify(server.listen);
server.close = util.promisify(server.close);

await server.listen(PORT);

console.log(`Server is listening on port ${PORT}`);
})();
2 changes: 1 addition & 1 deletion example.js → examples/request.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
const http2 = require('.'); // Note: using local version
const http2 = require('../source'); // Note: using the local version

const options = {
hostname: 'nghttp2.org',
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
"coveralls": "^3.0.5",
"create-cert": "^1.0.6",
"get-stream": "^5.1.0",
"got": "^9.6.0",
"many-keys-map": "^1.0.2",
"nyc": "^14.1.1",
"p-event": "^4.1.0",
"tempy": "^0.3.0",
Expand Down
9 changes: 8 additions & 1 deletion source/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class Agent extends EventEmitter {
this.timeout = timeout;
this.maxSessions = maxSessions;
this.maxFreeSessions = maxFreeSessions;

this.settings = {
enablePush: false
};
}

getName(authority, options = {}) {
Expand Down Expand Up @@ -143,6 +147,7 @@ class Agent extends EventEmitter {

const session = http2.connect(authority, {
createConnection: this.createConnection,
settings: this.settings,
...options
});
session[kCurrentStreamsCount] = 0;
Expand Down Expand Up @@ -173,7 +178,7 @@ class Agent extends EventEmitter {
this._processQueue(name);
});

session.once('remoteSettings', () => {
session.once('localSettings', () => {
removeFromQueue();

const movedListeners = listeners.splice(session.remoteSettings.maxConcurrentStreams);
Expand Down Expand Up @@ -245,6 +250,8 @@ class Agent extends EventEmitter {

return stream;
};

this.emit('session', session);
} catch (error) {
for (const listener of listeners) {
listener.reject(error);
Expand Down
20 changes: 20 additions & 0 deletions test/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,26 @@ if (isCompatible) {
t.is(requests[0].session, requests[1].session);
});

test('emits `session` event when a new session is created', wrapper, async (t, server) => {
t.plan(1);

const agent = new Agent();

agent.once('session', session => {
t.truthy(session);
});

await agent.getSession(server.url);
});

test('`.settings` property', wrapper, async (t, server) => {
const agent = new Agent();
agent.settings.maxHeaderListSize = 100;

const session = await agent.getSession(server.url);
t.is(session.localSettings.maxHeaderListSize, 100);
});

// eslint-disable-next-line ava/no-skip-test
test.skip('throws on invalid usage', wrapper, async (t, server) => {
const agent = new Agent();
Expand Down

0 comments on commit d2c4d2d

Please sign in to comment.