Skip to content

Commit

Permalink
feat: add support for 1.0 structured cloud events (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance committed Jan 16, 2020
1 parent 0af91a4 commit b246948
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 12 deletions.
39 changes: 33 additions & 6 deletions lib/event-handler.js
Expand Up @@ -7,20 +7,36 @@ const v1 = require('cloudevents-sdk/v1');
const v02Unmarshaller = new v02.HTTPUnmarshaller();
const v03Unmarshaller = new v03.HTTPUnmarshaller();
const v1BinaryReceiver = new v1.BinaryHTTPReceiver();
const v1StructuredReceiver = new v1.StructuredHTTPReceiver();

function use(fastify, opts, done) {
fastify.addContentTypeParser('application/cloudevents+json',
{ parseAs: 'string' },
function(req, body, done) {
// unmarshallEvent() handles parsing
done(null, body);
});

function use(fastify) {
fastify.decorateRequest('isCloudEvent', function() {
return this.req.headers === undefined
? false
: 'ce-type' in this.req.headers;
if ('ce-type' in this.req.headers) {
return true;
} else {
const contentType = this.req.headers['content-type'];
if (contentType && contentType.match(/application\/cloudevents/)) {
return true;
}
}
return false;
});

// Any incoming requests for cloud events will only be
// processed if it's a cloud event spec version we know about
fastify.addHook('preHandler', async(request, reply) => {
if (request.isCloudEvent()) {
const version = request.headers['ce-specversion'];
if (!acceptsVersion(version)) {
// if there is no version in the headers, it is a
// structured event
if (version && !acceptsVersion(version)) {
reply.code(406);
const error = new Error(
`Unsupported cloud event version detected: ${version}`);
Expand All @@ -31,11 +47,22 @@ function use(fastify) {
}
}
});

done();
}

async function unmarshallEvent(request) {
const version = request.headers['ce-specversion'];
if (version === '0.2') {
if (!version) {
// it's a structured event and the version is in the body
// currently only v1 structured events are supported
try {
const event = v1StructuredReceiver.parse(request.body, request.headers);
request.context.cloudevent = event.format();
} catch (err) {
return Promise.reject(err);
}
} else if (version === '0.2') {
return v02Unmarshaller.unmarshall(request.body, request.headers)
.then(cloudevent => (request.context.cloudevent = cloudevent.format()));
} else if (version === '0.3') {
Expand Down
4 changes: 1 addition & 3 deletions lib/request-handler.js
Expand Up @@ -5,7 +5,6 @@ const invoker = require('./invoker');

module.exports = function(fastify, opts, done) {
const invokeFunction = invoker(opts.func);
eventHandler(fastify);

fastify.get('/', doGet);

Expand All @@ -20,14 +19,13 @@ module.exports = function(fastify, opts, done) {
sendReply(reply, await invokeFunction(request.context, reply.log));
}

done();
eventHandler(fastify, null, done);
};

function sendReply(reply, payload) {
if (payload.headers['Content-Type'].startsWith('text/plain')) {
payload.response = JSON.stringify(payload.response);
}

return reply
.code(payload.code)
.headers(payload.headers)
Expand Down
33 changes: 30 additions & 3 deletions test/test.js
Expand Up @@ -79,7 +79,7 @@ test('Accepts HTTP POST requests', t => {
}, { log: false });
});

test('Responds to 0.2 cloud events', t => {
test('Responds to 0.2 binary cloud events', t => {
const func = require(`${__dirname}/fixtures/cloud-event/`);
framework(func, server => {
request(server)
Expand All @@ -100,7 +100,7 @@ test('Responds to 0.2 cloud events', t => {
}, { log: false });
});

test('Responds to 0.3 cloud events', t => {
test('Responds to 0.3 binary cloud events', t => {
const func = require(`${__dirname}/fixtures/cloud-event/`);
framework(func, server => {
request(server)
Expand All @@ -121,7 +121,7 @@ test('Responds to 0.3 cloud events', t => {
}, { log: false });
});

test('Responds to 1.0 cloud events', t => {
test('Responds to 1.0 binary cloud events', t => {
const func = require(`${__dirname}/fixtures/cloud-event/`);
framework(func, server => {
request(server)
Expand All @@ -142,6 +142,33 @@ test('Responds to 1.0 cloud events', t => {
}, { log: false });
});

test('Responds to 1.0 structured cloud events', t => {
const func = require(`${__dirname}/fixtures/cloud-event/`);
framework(func, server => {
request(server)
.post('/')
.send({
id: '1',
source: 'integration-test',
type: 'com.github.pull.create',
specversion: '1.0',
datacontenttype: 'application/json',
data: {
message: 'hello'
}
})
.set('Content-type', 'application/cloudevents+json; charset=utf-8')
.expect(200)
.expect('Content-Type', /json/)
.end((err, res) => {
t.error(err, 'No error');
t.equal(res.body, 'hello');
t.end();
server.close();
});
}, { log: false });
});

test('Responds with 406 Not Acceptable to unknown cloud event versions', t => {
const func = require(`${__dirname}/fixtures/cloud-event/`);
framework(func, server => {
Expand Down

0 comments on commit b246948

Please sign in to comment.