Skip to content

Commit

Permalink
Various fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mixu committed Oct 30, 2015
1 parent 8418d42 commit 949db31
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 76 deletions.
71 changes: 34 additions & 37 deletions lib/lambda.js
Expand Up @@ -2,27 +2,8 @@ var url = require('url');
var Orchestrator = require('orchestrator'); var Orchestrator = require('orchestrator');
var identify = require('identify-github-event'); var identify = require('identify-github-event');
var Task = require('./task'); var Task = require('./task');

var parseTarget = require('./parse-target.js');
function parseTarget(target) { var xtend = require('xtend');
var result = target.match(/([^\/]+)\/([^# ]+)(.*)/);
if (!result) {
return { user: '', repo: '', branch: '' };
}
if (!result[3]) {
return {
user: result[1],
repo: result[2],
branch: 'master',
};
}
var rest = result[3].match(/#?([^ ]*)( -)? (.*)/);
return {
user: result[1],
repo: result[2],
branch: rest[1] || 'master',
name: rest[3],
};
}


var instances = []; var instances = [];


Expand All @@ -31,7 +12,6 @@ function Lambda() {
this._targetToTasks = {}; this._targetToTasks = {};
this._taskNames = {}; this._taskNames = {};
this._config = {}; this._config = {};
this._counter = 1;
// track all Lambdas to allow for the CLI to find them // track all Lambdas to allow for the CLI to find them
instances.push(this); instances.push(this);
} }
Expand All @@ -40,7 +20,7 @@ Lambda.prototype.config = function(key, config) {
if (arguments.length === 0) { if (arguments.length === 0) {
return this._config; return this._config;
} else if (arguments.length === 2) { } else if (arguments.length === 2) {
this._config[key] = config; this._config[key] = xtend(this._config[key] || {}, config);
} else { } else {
this._config = key; this._config = key;
} }
Expand All @@ -61,8 +41,18 @@ Lambda.prototype.task = function(names, deps, fn) {
(Array.isArray(names) ? names : [names]).forEach(function(str) { (Array.isArray(names) ? names : [names]).forEach(function(str) {
var target = parseTarget(str); var target = parseTarget(str);
var fullTarget = target.user + '/' + target.repo + '#' + target.branch; var fullTarget = target.user + '/' + target.repo + '#' + target.branch;
var name = target.name || 'task-' + self._counter++; var name;
self._taskNames[str] = true; // if the task has a task name, use the full string as the task name
if (target.name) {
name = str;
} else {
var counter = 1;
do {
name = str + ' - task-' + counter++;
} while(self._taskNames[name]);
}

self._taskNames[name] = true;


function noArgs() { function noArgs() {
return fn(new Task({ return fn(new Task({
Expand All @@ -85,23 +75,22 @@ Lambda.prototype.task = function(names, deps, fn) {
// if the task wants two params, convert it into a two-param callback that // if the task wants two params, convert it into a two-param callback that
// looks like a one-item callback to Orchestrator // looks like a one-item callback to Orchestrator
var runner = fn.length < 2 ? noArgs : oneArg; var runner = fn.length < 2 ? noArgs : oneArg;
self._orchestrator.add(str, deps, runner); self._orchestrator.add(name, deps, runner);
if (!self._targetToTasks[fullTarget]) { if (!self._targetToTasks[fullTarget]) {
self._targetToTasks[fullTarget] = [ str ]; self._targetToTasks[fullTarget] = [ name ];
} else { } else {
self._targetToTasks[fullTarget].push(str); self._targetToTasks[fullTarget].push(name);
} }
}); });
}; };


Lambda.prototype.getTasksByEvent = function(event) { Lambda.prototype.getTasksByEvent = function(event) {
var self = this, var user = event.user,
user = event.user,
repo = event.repo, repo = event.repo,
branch = event.branch; branch = event.branch;


var fullTarget = event.user + '/' + event.repo + '#' + event.branch; var fullTarget = event.user + '/' + event.repo + '#' + event.branch;
return this._targetToTasks[fullTarget]; return this._targetToTasks[fullTarget] || [];
}; };


Lambda.prototype.exec = function(event, onDone) { Lambda.prototype.exec = function(event, onDone) {
Expand All @@ -121,7 +110,7 @@ Lambda.prototype.exec = function(event, onDone) {
} else if (typeof event === 'object') { } else if (typeof event === 'object') {
var target = identify.target(event); var target = identify.target(event);
// find the tasks that match the event // find the tasks that match the event
tasks = this.getTasksByEvent(target); taskNames = this.getTasksByEvent(target);
} else { } else {
console.log('[markdown-styles-lambda] No target event or task was specified.'); console.log('[markdown-styles-lambda] No target event or task was specified.');
this.printKnownTasks(); this.printKnownTasks();
Expand Down Expand Up @@ -165,23 +154,31 @@ Lambda.prototype.printKnownTasks = function() {
}; };


Lambda.prototype.snsHandler = function(events) { Lambda.prototype.snsHandler = function(events) {
var self = this;
if (typeof events === 'string') { if (typeof events === 'string') {
events = [events]; events = [events];
} }
var matchAll = events.indexOf('*') !== -1 || arguments.length === 0; var matchAll = arguments.length === 0 || events.indexOf('*') !== -1 ;
return function(event, context) { return function(event, context) {
console.log('[markdown-styles-lambda] Received event:', event); var unwrappedEvent = {};
var eventType = lambda.identifyGithubEvent(event.Records[0].Sns.Message); console.log('[markdown-styles-lambda] Received event:', JSON.stringify(event, null, 2));
try {
unwrappedEvent = JSON.parse(event.Records[0].Sns.Message);
} catch (e) {
console.log('[markdown-styles-lambda] Could not parse SNS message payload as JSON.');
}
console.log('[markdown-styles-lambda] Unwrapped event:', JSON.stringify(unwrappedEvent));
var eventType = self.identifyGithubEvent(unwrappedEvent);
if (matchAll || events.indexOf(eventType) !== -1) { if (matchAll || events.indexOf(eventType) !== -1) {
lambda.exec(event.Records[0].Sns.Message, context); self.exec(unwrappedEvent, context);
} else { } else {
console.log('[markdown-styles-lambda] Did nothing with ' + eventType); console.log('[markdown-styles-lambda] Did nothing with ' + eventType);
context.success(); context.success();
} }
}; };
} }


Lambda.prototype.identifyGithubEvent = identify; Lambda.identifyGithubEvent = Lambda.prototype.identifyGithubEvent = identify;


Lambda.create = Lambda.prototype.create = function() { Lambda.create = Lambda.prototype.create = function() {
return new Lambda(); return new Lambda();
Expand Down
26 changes: 26 additions & 0 deletions lib/parse-target.js
@@ -0,0 +1,26 @@
module.exports = function parseTarget(target) {
var parts = target.match(/([^\/]+)\/([^# ]+)(.*)/);
if (!parts) {
return { user: '', repo: '', branch: 'master' };
}
var result = {
user: parts[1],
repo: parts[2],
}
if (!parts[3]) {
result.branch = 'master';
return result;
}
var branch = parts[3].match(/#([^ ]+)(.*)/);
if (branch) {
result.branch = branch[1];
result.name = branch[2].replace(/^( -)? /, '');
} else {
result.branch = 'master';
result.name = parts[3].replace(/^( -)? /, '');
}
if (!result.name) {
delete result.name;
}
return result;
};
25 changes: 20 additions & 5 deletions lib/stream/from-fs.js
@@ -1,15 +1,30 @@
var fs = require('fs'), var fs = require('fs'),
pi = require('pipe-iterators'); pi = require('pipe-iterators'),
xtend = require('xtend');


function read() { function read(opts) {
opts = xtend({
read: true,
buffer: true,
}, opts);
return pi.thru.obj(function(file, enc, onDone) { return pi.thru.obj(function(file, enc, onDone) {
var stat = fs.statSync(file); var stat = fs.statSync(file);
if (stat.isFile()) { if (stat.isFile()) {
this.push({ var result = {
path: file, path: file,
stat: stat, stat: stat,
contents: fs.readFileSync(file, 'utf8') contents: null,
}); }
if (!opts.read) {
this.push(result);
} else {
if (opts.buffer) {
result.contents = fs.readFileSync(file, 'utf8')
} else {
result.contents = fs.createReadStream(file);
}
this.push(result);
}
} }
onDone(); onDone();
}); });
Expand Down
8 changes: 4 additions & 4 deletions lib/stream/to-fs.js
Expand Up @@ -14,8 +14,9 @@ function dest(output) {
file.path = output + file.path; file.path = output + file.path;
var writeDir = path.dirname(file.path); var writeDir = path.dirname(file.path);


function log() { function done() {
console.log('[FS] Wrote ' + originalPath + ' -> ' + file.path); console.log('[FS] Wrote ' + originalPath + ' -> ' + file.path);
onDone();
} }


(seen[writeDir] ? function(a, onDone) { onDone(null); } : mkdirp)( (seen[writeDir] ? function(a, onDone) { onDone(null); } : mkdirp)(
Expand All @@ -25,11 +26,10 @@ function dest(output) {
} }
seen[writeDir] = true; seen[writeDir] = true;
if (!pi.isReadable(file.contents)) { if (!pi.isReadable(file.contents)) {
fs.writeFile(file.path, file.contents, log); fs.writeFile(file.path, file.contents, done);
} else { } else {
file.contents.pipe(fs.createWriteStream(file.path)).once('finish', log); file.contents.pipe(fs.createWriteStream(file.path)).once('finish', done);
} }
onDone();
} }
); );
}); });
Expand Down
59 changes: 33 additions & 26 deletions lib/task.js
Expand Up @@ -22,7 +22,11 @@ Task.prototype.config = function(key, config) {
if (arguments.length === 0) { if (arguments.length === 0) {
return this._config; return this._config;
} else if (arguments.length === 2) { } else if (arguments.length === 2) {
this._config[key] = config; if (typeof config === 'string' || typeof config === 'boolean' || typeof config === 'number') {
this._config[key] = config;
} else {
this._config[key] = xtend(this._config[key] || {}, config);
}
} else { } else {
this._config = key; this._config = key;
} }
Expand All @@ -42,21 +46,21 @@ Task.prototype.github = function(glob, opts) {
opts.base += '/'; opts.base += '/';
} }
var readable = pi.fromAsync(function(onDone) { var readable = pi.fromAsync(function(onDone) {
console.log('[Github API] Matching against Github Contents API with glob ' + glob); console.log('[Github API] Matching against Github Contents API with glob ' + glob);
gglob({ gglob({
authenticate: self._config.github, authenticate: self._config.github,
user: self.user, user: self.user,
repo: self.repo, repo: self.repo,
branch: self.branch, branch: self.branch,
glob: glob glob: glob
}, function(err, results, meta) { }, function(err, results, meta) {
console.log('[Github API] Glob match ' + glob + ' completed. API limit remaining: ' + meta.limit); if (err) {
if (err) { console.error('[Github API] Returned error:', err);
console.error('[Github API] Returned error:', err); }
} console.log('[Github API] Glob match ' + glob + ' completed. Matched ' + results.length + ' files. API limit remaining: ' + meta.limit);
onDone(null, results); onDone(null, results);
}); });
}).pipe( }).pipe(
pi.thru.obj(function(file, enc, done) { pi.thru.obj(function(file, enc, done) {
// path relative to the basepath of the glob // path relative to the basepath of the glob
var path = file.path; var path = file.path;
Expand Down Expand Up @@ -134,40 +138,43 @@ Task.prototype.s3 = function(target) {
var parts = url.parse(target); var parts = url.parse(target);
var bucket = parts.host; var bucket = parts.host;
// S3 can write to `/foo/bar`, but only paths like `foo/bar` show up in the S3 UI and work for static site hosting // S3 can write to `/foo/bar`, but only paths like `foo/bar` show up in the S3 UI and work for static site hosting
var key = (parts.path.charAt(0) === '/' ? parts.path.substr(1) : parts.path); var key = (parts.path && parts.path.charAt(0) === '/' ? parts.path.substr(1) : '');
var s3 = new AWS.S3(this._config.s3 ? this._config.s3 : {}); var s3 = new AWS.S3(this._config.s3 ? this._config.s3 : {});


return pi.writable.obj(function(file, enc, onDone) { return pi.writable.obj(function(file, enc, onDone) {
var stream = this; var stream = this;
var contentType = mime.lookup(file.path) || 'text/html'; var contentType = mime.lookup(file.path) || 'text/html';
console.log('[S3] Writing ' + file.path + ' -> s3://' + bucket + '/' + key + file.path + ' as ' + contentType); var fileKey = (key + file.path).replace(/^\/+/, '');
console.log('[S3] Writing ' + file.path + ' -> s3://' + bucket + '/' + fileKey + ' as ' + contentType);
s3.putObject({ s3.putObject({
Bucket: bucket, Bucket: bucket,
Key: key + file.path, Key: fileKey,
Body: file.contents, Body: file.contents,
ContentType: contentType ContentType: contentType
}, function(err, data) { }, function(err, data) {
console.log('[S3] Wrote ' + file.path + ' -> s3://' + bucket + '/' + key + file.path); console.log('[S3] Wrote ' + file.path + ' -> s3://' + bucket + '/' + fileKey );
onDone(err); onDone(err);
}); });
}); });
}; };


Task.prototype.fromFs = function(glob, basename) { Task.prototype.fromFs = function(glob, opts) {
var base = basename || parse.basename(glob); opts = xtend({
if (base.charAt(base.length - 1) !== '/') { base: parse.basename(glob)
base += '/'; }, opts);
if (opts.base.charAt(opts.base.length - 1) !== '/') {
opts.base += '/';
} }
return pi.pipeline([ return pi.pipeline([
wildglob.stream(glob), wildglob.stream(glob),
pi.filter(function(filename) { pi.filter(function(filename) {
var stat = fs.statSync(filename); var stat = fs.statSync(filename);
return stat.isFile(); return stat.isFile();
}), }),
fromFs(), fromFs(opts),
pi.mapKey('path', function(path) { pi.mapKey('path', function(path) {
// path relative to the basepath of the glob // path relative to the basepath of the glob
var stripPath = path.substr(0, base.length) === base ? path.substr(base.length) : path; var stripPath = path.substr(0, opts.base.length) === opts.base ? path.substr(opts.base.length) : path;
// ensure that the first character is a / // ensure that the first character is a /
if (stripPath.charAt(0) !== '/') { if (stripPath.charAt(0) !== '/') {
stripPath = '/' + stripPath; stripPath = '/' + stripPath;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -4,7 +4,7 @@
"description": "", "description": "",
"main": "./lib/lambda.js", "main": "./lib/lambda.js",
"scripts": { "scripts": {
"test": "echo \"Error: no test specified\" && exit 1" "test": "mocha --bail ./test/*.test.js"
}, },
"author": "Mikito Takada <mikito.takada@gmail.com> (http://mixu.net/)", "author": "Mikito Takada <mikito.takada@gmail.com> (http://mixu.net/)",
"license": "ISC", "license": "ISC",
Expand Down
4 changes: 2 additions & 2 deletions test/integration.test.js
Expand Up @@ -26,7 +26,7 @@ describe('integration tests', function() {
'abc/bar/baz.md': '# Hello' 'abc/bar/baz.md': '# Hello'
}); });
var task = new Task({}); var task = new Task({});
task.fromFs(tmpdir + '/**/*.md', tmpdir + '/abc') task.fromFs(tmpdir + '/**/*.md', { base: tmpdir + '/abc'})
.pipe(pi.map(function(item) { return item.path; })) .pipe(pi.map(function(item) { return item.path; }))
.pipe(pi.toArray(function(results) { .pipe(pi.toArray(function(results) {
assert.deepEqual(results, [ '/foo.md', '/bar/baz.md' ]); assert.deepEqual(results, [ '/foo.md', '/bar/baz.md' ]);
Expand All @@ -40,7 +40,7 @@ describe('integration tests', function() {
'abc/bar/baz.md': '# Hello' 'abc/bar/baz.md': '# Hello'
}); });
var task = new Task({}); var task = new Task({});
task.fromFs(tmpdir + '/**/*.md', tmpdir + '/abc/') task.fromFs(tmpdir + '/**/*.md', { base: tmpdir + '/abc/' })
.pipe(pi.map(function(item) { return item.path; })) .pipe(pi.map(function(item) { return item.path; }))
.pipe(pi.toArray(function(results) { .pipe(pi.toArray(function(results) {
assert.deepEqual(results, [ '/foo.md', '/bar/baz.md' ]); assert.deepEqual(results, [ '/foo.md', '/bar/baz.md' ]);
Expand Down

0 comments on commit 949db31

Please sign in to comment.