From b24694827ff0de4ebbbf3977cd7ab9fbf6e14391 Mon Sep 17 00:00:00 2001 From: Lance Ball Date: Thu, 16 Jan 2020 15:03:02 -0700 Subject: [PATCH] feat: add support for 1.0 structured cloud events (#24) --- lib/event-handler.js | 39 +++++++++++++++++++++++++++++++++------ lib/request-handler.js | 4 +--- test/test.js | 33 ++++++++++++++++++++++++++++++--- 3 files changed, 64 insertions(+), 12 deletions(-) diff --git a/lib/event-handler.js b/lib/event-handler.js index 6604857..e776da4 100644 --- a/lib/event-handler.js +++ b/lib/event-handler.js @@ -7,12 +7,26 @@ const v1 = require('cloudevents-sdk/v1'); const v02Unmarshaller = new v02.HTTPUnmarshaller(); const v03Unmarshaller = new v03.HTTPUnmarshaller(); const v1BinaryReceiver = new v1.BinaryHTTPReceiver(); +const v1StructuredReceiver = new v1.StructuredHTTPReceiver(); + +function use(fastify, opts, done) { + fastify.addContentTypeParser('application/cloudevents+json', + { parseAs: 'string' }, + function(req, body, done) { + // unmarshallEvent() handles parsing + done(null, body); + }); -function use(fastify) { fastify.decorateRequest('isCloudEvent', function() { - return this.req.headers === undefined - ? false - : 'ce-type' in this.req.headers; + if ('ce-type' in this.req.headers) { + return true; + } else { + const contentType = this.req.headers['content-type']; + if (contentType && contentType.match(/application\/cloudevents/)) { + return true; + } + } + return false; }); // Any incoming requests for cloud events will only be @@ -20,7 +34,9 @@ function use(fastify) { fastify.addHook('preHandler', async(request, reply) => { if (request.isCloudEvent()) { const version = request.headers['ce-specversion']; - if (!acceptsVersion(version)) { + // if there is no version in the headers, it is a + // structured event + if (version && !acceptsVersion(version)) { reply.code(406); const error = new Error( `Unsupported cloud event version detected: ${version}`); @@ -31,11 +47,22 @@ function use(fastify) { } } }); + + done(); } async function unmarshallEvent(request) { const version = request.headers['ce-specversion']; - if (version === '0.2') { + if (!version) { + // it's a structured event and the version is in the body + // currently only v1 structured events are supported + try { + const event = v1StructuredReceiver.parse(request.body, request.headers); + request.context.cloudevent = event.format(); + } catch (err) { + return Promise.reject(err); + } + } else if (version === '0.2') { return v02Unmarshaller.unmarshall(request.body, request.headers) .then(cloudevent => (request.context.cloudevent = cloudevent.format())); } else if (version === '0.3') { diff --git a/lib/request-handler.js b/lib/request-handler.js index 6a2ca49..4bc8aa1 100644 --- a/lib/request-handler.js +++ b/lib/request-handler.js @@ -5,7 +5,6 @@ const invoker = require('./invoker'); module.exports = function(fastify, opts, done) { const invokeFunction = invoker(opts.func); - eventHandler(fastify); fastify.get('/', doGet); @@ -20,14 +19,13 @@ module.exports = function(fastify, opts, done) { sendReply(reply, await invokeFunction(request.context, reply.log)); } - done(); + eventHandler(fastify, null, done); }; function sendReply(reply, payload) { if (payload.headers['Content-Type'].startsWith('text/plain')) { payload.response = JSON.stringify(payload.response); } - return reply .code(payload.code) .headers(payload.headers) diff --git a/test/test.js b/test/test.js index 9e6f3a7..ccd49f4 100644 --- a/test/test.js +++ b/test/test.js @@ -79,7 +79,7 @@ test('Accepts HTTP POST requests', t => { }, { log: false }); }); -test('Responds to 0.2 cloud events', t => { +test('Responds to 0.2 binary cloud events', t => { const func = require(`${__dirname}/fixtures/cloud-event/`); framework(func, server => { request(server) @@ -100,7 +100,7 @@ test('Responds to 0.2 cloud events', t => { }, { log: false }); }); -test('Responds to 0.3 cloud events', t => { +test('Responds to 0.3 binary cloud events', t => { const func = require(`${__dirname}/fixtures/cloud-event/`); framework(func, server => { request(server) @@ -121,7 +121,7 @@ test('Responds to 0.3 cloud events', t => { }, { log: false }); }); -test('Responds to 1.0 cloud events', t => { +test('Responds to 1.0 binary cloud events', t => { const func = require(`${__dirname}/fixtures/cloud-event/`); framework(func, server => { request(server) @@ -142,6 +142,33 @@ test('Responds to 1.0 cloud events', t => { }, { log: false }); }); +test('Responds to 1.0 structured cloud events', t => { + const func = require(`${__dirname}/fixtures/cloud-event/`); + framework(func, server => { + request(server) + .post('/') + .send({ + id: '1', + source: 'integration-test', + type: 'com.github.pull.create', + specversion: '1.0', + datacontenttype: 'application/json', + data: { + message: 'hello' + } + }) + .set('Content-type', 'application/cloudevents+json; charset=utf-8') + .expect(200) + .expect('Content-Type', /json/) + .end((err, res) => { + t.error(err, 'No error'); + t.equal(res.body, 'hello'); + t.end(); + server.close(); + }); + }, { log: false }); +}); + test('Responds with 406 Not Acceptable to unknown cloud event versions', t => { const func = require(`${__dirname}/fixtures/cloud-event/`); framework(func, server => {