Skip to content

Commit

Permalink
Adding support for schedule event.
Browse files Browse the repository at this point in the history
This automatically sets up the rules and triggers to bind functions
to the alarm trigger feed.
  • Loading branch information
James Thomas committed Jan 25, 2017
1 parent 1aa0402 commit 64df98a
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 17 deletions.
55 changes: 55 additions & 0 deletions compile/schedule/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Compile Triggers

This plugins compiles the schedule events in `serverless.yaml` to corresponding [OpenWhisk Alarm Trigger Feeds](https://github.com/openwhisk/openwhisk/blob/master/docs/actions.md)
definitions.

## How it works

`Compile Schedule` hooks into the [`deploy:compileEvents`](/lib/plugins/deploy) lifecycle.

It loops over all schedule event which are defined in `serverless.yaml`.

### Implicit Schedule Definition

Alarm triggers referenced in the function event configuration don't have to
explicitly defined the triggers and rules in the `resources` section. The plugin
will set up these resources for creation without any further configuration.

```yaml
# serverless.yaml
functions:
index:
handler: users.main
events:
- schedule: cron(* * * * *) // fires every minute
```
The plugin will create a trigger called `${serviceName}_${fnName}_alarmTrigger`
and a rule called `${serviceName}_${fnName}_alarmRule` to bind the function to
the cron events.

### Explicit Schedule Definition

Adding extra properties for the alarm event can be handled by defining an object
as the `schedule` value rather than the cron string.

```yaml
# serverless.yaml
functions:
index:
handler: users.main
events:
- schedule:
rate: cron(* * * * *)
enabled: false
trigger: custom_trigger_name
rule: custom_rule_name
max: 10000 // maximum event fires, defaults to 1000
params:
hello: world
```

`rate` is the only mandatory property.

All OpenWhisk Schedule events are merged inside the `serverless.service.triggers` and `serverless.service.rules` section.
82 changes: 82 additions & 0 deletions compile/schedule/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
'use strict';

const BbPromise = require('bluebird');

class OpenWhiskCompileSchedules {
constructor(serverless, options) {
this.serverless = serverless;
this.options = options;
this.provider = this.serverless.getProvider('openwhisk');
this.feed = '/whisk.system/alarms/alarm'

this.hooks = {
'before:deploy:compileEvents': () => BbPromise.bind(this)
.then(this.setup)
.then(this.processScheduleEvents)
};
}

setup() {
if (!this.serverless.service.resources) {
this.serverless.service.resources = {};
}

if (!this.serverless.service.resources.triggers) {
this.serverless.service.resources.triggers = {};
}

if (!this.serverless.service.resources.rules) {
this.serverless.service.resources.rules = {};
}
}

// correct rate syntax is: cron(* * * * *)
parseScheduleRate (rate) {
const cron = rate.match(/cron\((.*)\)/)
if (!cron || !cron[1] || cron[1].split(' ').length !== 5) {
throw new this.serverless.classes.Error(
[`Schedule event rate property value is invalid: ${rate}`,
'The correct syntax should be "cron(_ _ _ _ _)"'].join('\n')
)
}

return cron[1]
}

compileScheduleTrigger (fnName, schedule) {
const name = schedule.trigger || `${this.serverless.service.service}_${fnName}_schedule_trigger`
const cron = this.parseScheduleRate(schedule.rate || schedule)
const trigger_payload = JSON.stringify(schedule.params || {})

const feed_parameters = { cron, trigger_payload }

if (schedule.max) {
feed_parameters.maxTriggers = schedule.max
}

return { name, content: { feed: this.feed, feed_parameters } }
}

defaultScheduleRuleName (triggerName, fnName) {
return `${this.serverless.service.service}_${fnName}_schedule_rule`
}

processScheduleEvent (fnName, schedule) {
const fnObj = this.serverless.service.getFunction(fnName)
const trigger = this.compileScheduleTrigger(fnName, schedule)
const rule = schedule.rule || this.defaultScheduleRuleName(trigger.name, fnName)

fnObj.events.push({ trigger: { name: trigger.name, rule } })
this.serverless.service.resources.triggers[trigger.name] = trigger.content
}

processScheduleEvents () {
this.serverless.service.getAllFunctions().forEach(name => {
const fn = this.serverless.service.getFunction(name)
const events = (fn.events || []).filter(e => e.schedule)
events.forEach(e => this.processScheduleEvent(name, e.schedule))
})
}
}

module.exports = OpenWhiskCompileSchedules;
178 changes: 178 additions & 0 deletions compile/schedule/tests/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
'use strict';

const expect = require('chai').expect;
const chaiAsPromised = require('chai-as-promised');

require('chai').use(chaiAsPromised);

const sinon = require('sinon');
const OpenWhiskCompileSchedules = require('../index');

describe('OpenWhiskCompileSchedules', () => {
let serverless;
let sandbox;
let openwhiskCompileSchedules;

beforeEach(() => {
sandbox = sinon.sandbox.create();
serverless = {classes: {Error}, service: {provider: {}, defaults: {namespace: ''}, resources: {}, getAllFunctions: () => []}, getProvider: sandbox.spy()};
const options = {
stage: 'dev',
region: 'us-east-1',
};
openwhiskCompileSchedules = new OpenWhiskCompileSchedules(serverless, options);
serverless.service.service = 'serviceName';
serverless.service.defaults = {
namespace: 'testing',
apihost: '',
auth: '',
};

serverless.cli = { log: () => {} };
openwhiskCompileSchedules.setup()

});

afterEach(() => {
sandbox.restore();
});

describe('#processScheduleEvents()', () => {
it('should update create schedule trigger and update manifest resources.', () => {
const fnObj = { events: [{schedule: "cron(* * * * *)"}] }
serverless.service.getFunction = () => fnObj
openwhiskCompileSchedules.compileScheduleTrigger = () => ({name: 'serviceName_fnName_schedule_trigger', content: { a: 1 }})
openwhiskCompileSchedules.processScheduleEvent("fnName", fnObj.events[0].schedule)
expect(fnObj.events[1]).to.be.deep.equal({
trigger: { name: 'serviceName_fnName_schedule_trigger', rule: 'serviceName_fnName_schedule_rule' }
})

expect(serverless.service.resources.triggers).to.be.deep.equal({serviceName_fnName_schedule_trigger: {a: 1}})
})
})

describe('#processScheduleEvents()', () => {
it('should call processEventSchedule for each schedule event.', () => {
const service = openwhiskCompileSchedules.serverless.service;
const fns = {
first: {
events: [{}, {schedule: "cron(* * * * *)"}, {trigger: true}]
},
second: {
events: [{schedule: "cron(* * * * *)"}]
},
third: {}
}

service.getAllFunctions = () => Object.keys(fns)
service.getFunction = name => fns[name];

const spy = openwhiskCompileSchedules.processScheduleEvent = sinon.spy()
openwhiskCompileSchedules.processScheduleEvents()
expect(spy.calledTwice).to.be.equal(true)
expect(spy.withArgs("first", "cron(* * * * *)").calledOnce).to.be.equal(true)
expect(spy.withArgs("second", "cron(* * * * *)").calledOnce).to.be.equal(true)
})
})


describe('#compileScheduleTrigger()', () => {
it('should throw errors for incorrect rate definition.', () => {
let name = 'my_fn'
expect(() => openwhiskCompileSchedules.compileScheduleTrigger(name, 'ron(* * * * *)'))
.to.throw(Error, /Schedule event rate property value is invalid/);
expect(() => openwhiskCompileSchedules.compileScheduleTrigger(name, '* * * * *'))
.to.throw(Error, /Schedule event rate property value is invalid/);
expect(() => openwhiskCompileSchedules.compileScheduleTrigger(name, 'cron(* * * *)'))
.to.throw(Error, /Schedule event rate property value is invalid/);
expect(() => openwhiskCompileSchedules.compileScheduleTrigger(name, 'cron(* * * * * *)'))
.to.throw(Error, /Schedule event rate property value is invalid/);
})

it('should return default trigger for simple schedule event.', () => {
let name = 'my_fn', rate = 'cron(* * * * *)'
const trigger = openwhiskCompileSchedules.compileScheduleTrigger(name, rate)
expect(trigger).to.be.deep.equal({
name: `${serverless.service.service}_${name}_schedule_trigger`,
content: {
feed: '/whisk.system/alarms/alarm',
feed_parameters: {
cron: '* * * * *',
trigger_payload: "{}"
}
}
})
})

it('should return trigger for object schedule event.', () => {
const name = 'my_fn'
const rate = {
rate: 'cron(* * * * *)',
trigger: 'trigger_name',
max: 500,
params: { hello: 'world' }
}
const trigger = openwhiskCompileSchedules.compileScheduleTrigger(name, rate)
expect(trigger).to.be.deep.equal({
name: `trigger_name`,
content: {
feed: '/whisk.system/alarms/alarm',
feed_parameters: {
cron: '* * * * *',
trigger_payload: JSON.stringify(rate.params),
maxTriggers: 500
}
}
})
})
})

/*
// should throw for incorrect format...
describe('#getEventSchedules()', () => {
it('should return all names for simple triggers registered on functions', () => {
const service = openwhiskCompileSchedules.serverless.service;
service.getAllFunctions = () => ["first", "second", "third"];
const handler = name => ({events: [{schedule: "cron(* * * * *)"}, {schedule: "cron(* * * * *)"}]})
service.getFunction = handler;
expect(openwhiskCompileSchedules.getEventSchedules()).to.deep.equal(["blah", "foo"])
})
/*
it('should return all names for complex triggers registered on functions', () => {
const service = openwhiskCompileSchedules.serverless.service;
service.getAllFunctions = () => ["first", "second", "third"];
const handler = name => ({events: [{trigger: {name: "blah"}}, {trigger: {name: "foo"}}]})
service.getFunction = handler;
expect(openwhiskCompileSchedules.getEventSchedules()).to.deep.equal(["blah", "foo"])
})
})
*/
/*
describe('#mergeEventSchedules()', () => {
it('should set up non-existant triggers', () => {
openwhiskCompileSchedules.serverless.service.resources = {};
const output = {first: {}, second: {}, third: {}};
sandbox.stub(openwhiskCompileSchedules, 'getEventSchedules', () => ["first", "second", "third"]);
openwhiskCompileSchedules.mergeEventSchedules();
expect(openwhiskCompileSchedules.serverless.service.resources.triggers).to.deep.equal(output)
});
it('should ignore existing triggers', () => {
const triggers = {first: 1, second: 2, third: 3};
openwhiskCompileSchedules.serverless.service.resources = { triggers };
sandbox.stub(openwhiskCompileSchedules, 'getEventSchedules', () => ["first", "second", "third"]);
openwhiskCompileSchedules.mergeEventSchedules();
expect(openwhiskCompileSchedules.serverless.service.resources.triggers).to.deep.equal(triggers)
})
})
*/

});
2 changes: 2 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const CompileFunctions = require('./compile/functions/index.js');
const CompileTriggers = require('./compile/triggers/index.js');
const CompileRules = require('./compile/rules/index.js');
const CompileHttpEvents = require('./compile/apigw/index.js');
const CompileSchedule = require('./compile/schedule/index.js');
const Deploy = require('./deploy/index.js');
const Invoke = require('./invoke/index.js');
const InvokeLocal = require('./invokeLocal/index.js');
Expand All @@ -30,6 +31,7 @@ class Index {
this.serverless.pluginManager.addPlugin(CompileHttpEvents);
this.serverless.pluginManager.addPlugin(CompileRules);
this.serverless.pluginManager.addPlugin(CompileTriggers);
this.serverless.pluginManager.addPlugin(CompileSchedule);
this.serverless.pluginManager.addPlugin(Remove);
this.serverless.pluginManager.addPlugin(Invoke);
this.serverless.pluginManager.addPlugin(InvokeLocal);
Expand Down
Loading

0 comments on commit 64df98a

Please sign in to comment.