Skip to content

Commit

Permalink
Lots of fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeal committed Feb 27, 2012
1 parent 29dc4a4 commit ba2a54b
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 100 deletions.
260 changes: 162 additions & 98 deletions index.js
Expand Up @@ -13,61 +13,85 @@ var pushover = require('pushover')
, bouncy = require('bouncy')
, dnode = require('dnode')
, uuid = require('./uuid')
, net = require('net')
;

var portrange = 45032
, httpportrange = 50032
;

function getPort (cb) {
var port = portrange
portrange += 1

var server = net.createServer()
server.listen(port, function (err) {
server.once('close', function () {
cb(port)
})
server.close()
})
server.on('error', function (err) {
getPort(cb)
})
}

function Child (proc, rev, cb) {
var self = this
self.process = proc
self.rev = rev
self.workdir = path.join(self.process.branch.deployment.workdir, uuid())
self.port = portrange + 1
portrange += 1

function spawn (cb) {
self.child = childprocess.spawn('node', [path.resolve(self.workdir, self.process.name), '--deployCtrlPort='+self.port])
self.child.stdout.pipe(process.stdout)
self.child.stderr.pipe(process.stderr)
self.child.on('exit', function () {
// TODO: Add handling for possible infinite restart loops
if (!self.closed) {
console.error("processes exited, restarting!")
spawn(function () {})
}
})
cb()
}

fs.mkdir(self.workdir, 0755, function (e) {
if (e) return cb(e)
procstreams('git archive --format=tar '+rev, {cwd:self.process.branch.deployment.repo})
.pipe('tar -x', {cwd:self.workdir})
.on('exit', function (status) {
spawn(function () {
setTimeout(function () {
console.log('moar!')
self.up = upnode.connect(self.port, function (remote) {
self.remote = remote
cb(null)
})
}, 1 * 1000)
getPort(function (port) {
self.port = port

function spawn (cb) {
self.child = childprocess.spawn('node', [path.resolve(self.workdir, self.process.name), '--deployCtrlPort='+self.port])
self.child.stdout.pipe(process.stdout)
self.child.stderr.pipe(process.stderr)
self.child.on('exit', function () {
// TODO: Add handling for possible infinite restart loops
if (!self.closed) {
self.process.branch.deployment.logger.error("["+self.process.name+"] processes exited, restarting!")
spawn(function () {})
} else {
self.exited = true
}
})
cb()
}

fs.mkdir(self.workdir, 0755, function (e) {
if (e) return cb(e)
procstreams('git archive --format=tar '+rev, {cwd:self.process.branch.deployment.repo})
.pipe('tar -x', {cwd:self.workdir})
.on('exit', function (status) {
spawn(function () {
setTimeout(function () {
self.up = upnode.connect(self.port, function (remote) {
self.remote = remote
cb(null)
})
}, 1 * 1000)
})
})
})

})
}
Child.prototype.close = function () {
var self = this
, child = self.child
;
self.closed = true
self.remote.close(function (err) {
if (err) return console.error('child reports close error', err.message)
console.log("child reports close.")
if (err) return self.process.branch.deployment.logger.error('['+self.process.name+'] child reports close error at rev '+self.rev, err.message)
self.process.branch.deployment.logger.log("["+self.process.name+"] child reports close at "+self.rev)
})
setTimeout(function () {
self.child.kill('SIGKILL')
}, 60 * 1000) // one minute hard timeout on all child processes
if (!self.exited) {
self.process.branch.deployment.logger.log("["+self.process.name+"] child requires hard kill at "+self.rev)
child.kill('SIGKILL')
}
}, 10 * 1000) // one minute hard timeout on all child processes
}
// util.inherits(Child, events.EventEmitter)

Expand All @@ -87,49 +111,42 @@ Process.prototype.roll = function (rev, info) {
// ;

var oldchild = self.child
console.log('creating child')
self.child = new Child(self, rev, function (e) {

var child = new Child(self, rev, function (e) {
if (e) throw e
if (info.domains) {
self.child.httpPort = httpportrange + 1
httpportrange += 1
// set info and rev, there is a bug, this will be HEAD on startup
self.child.remote.setInfo({process:info, rev:rev})
self.child.remote.startHttpServer(self.child.httpPort, function (err) {
if (err) throw err
if (oldchild) oldchild.close()
console.log('info', info)
if (info.domains) {
info.domains.forEach(function (domain) {
if (!self.branch.deployment.routing[domain]) self.branch.deployment.routing[domain] = {}
self.branch.deployment.routing[domain][self.branch.name] = [self.child.httpPort]
})
self.currentInfo = info
self.rolling = false
if (self.nextRoll) self.nextRoll()
}

getPort(function (port) {
child.httpPort = port
// set info and rev, there is a bug, this will be HEAD on startup
child.remote.setInfo({process:info, rev:child.rev})
child.remote.startHttpServer(child.httpPort, function (err) {
self.branch.deployment.logger.error('http server running', err)
if (err) throw err
if (oldchild) oldchild.close()
self.child = child
self.branch.deployment.logger.log('info', info)
if (info.domains) {
self.branch.deployment.logger.error('domains')
self.branch.deployment.logger.error(info.domains)
info.domains.forEach(function (domain) {
if (!self.branch.deployment.routing[domain]) self.branch.deployment.routing[domain] = {}
self.branch.deployment.routing[domain][self.branch.name] = [child.httpPort]
})
self.currentInfo = info
self.rolling = false
if (self.nextRoll) self.nextRoll()
}
})
})
} else {
if (oldchild) oldchild.close()
self.child = child
self.rolling = false
if (self.nextRoll) self.nextRoll()
}
})

// function newproc (cb) {
// num = num - 1
// var child = new Child(self, rev)
//
// }
//
// newproc(function (e) {
// if (e) throw e
// while (num !== 0) {
// newproc()
// }
// })


self.branch.deployment.logger.log('['+self.name+'] creating child at rev '+child.rev)

}
if (!this.rolling) self.nextRoll()
Expand All @@ -146,25 +163,26 @@ Branch.prototype.process = function (name) {
}
Branch.prototype.update = function (update) {
var self = this
console.log('update')
self.deployment.logger.log('update')
fs.readFile(path.join(self.deployment.repo, 'package.json'), function (err, data) {
if (err) throw err
console.log('got package')
self.deployment.logger.log('got package')
var pkg = JSON.parse(data.toString())
console.log(self.name)
console.log(pkg.deploy[self.name])
self.deployment.logger.log('[branch] name ='+self.name)
self.deployment.logger.log('[branch] pkg.deply = '+pkg.deploy[self.name])
_.each(pkg.deploy[self.name], function (pinfo, process) {
console.log('rolling')
self.deployment.logger.log('[branch] rolling, '+self.name)
self.process(process).roll(update.arguments[2], pinfo)
})
})
}

function Deployment (repo) {
function Deployment (repo, logger) {
var self = this
self.repo = repo
self.routing = {}
self.branches = {}
self.logger = logger || console
self.workdir = process.cwd()
self.pusher = pushover(path.dirname(repo), {checkout:true})
self.pusher.autoCreate = false
Expand All @@ -174,9 +192,15 @@ function Deployment (repo) {
})
fs.readFile(path.join(self.repo, 'package.json'), function (err, data) {
var pkg = JSON.parse(data.toString())
_.each(pkg.deploy, function (binfo, branch) {
_.each(binfo, function (pinfo, process) {
self.branch(branch).process(process).roll('HEAD', pinfo)

procstreams('git rev-parse HEAD', {cwd:self.repo})
.data(function(stdout, stderr) {
var rev = stdout.slice(0, stdout.length - 1)
self.logger.error('starting with rev: '+JSON.stringify(rev))
_.each(pkg.deploy, function (binfo, branch) {
_.each(binfo, function (pinfo, process) {
self.branch(branch).process(process).roll(rev, pinfo)
})
})
})
})
Expand All @@ -190,12 +214,18 @@ Deployment.prototype.branch = function (name) {
Deployment.prototype.onUpdate = function (update) {
var self = this
if (update.arguments[0].slice(0, 'refs/heads/'.length) !== 'refs/heads/') {
console.log("Not a head push, skipping")
self.logger.log("Not a head push, skipping")
update.accept()
return
}
update.accept()
self.branch(update.arguments[0].slice('refs/heads/'.length)).update(update)
setTimeout(function () {
// This needs to get updated to something that makes sure the change applied
procstreams('git reset --hard', {cwd:self.repo})
.data(function(stdout, stderr) {
self.branch(update.arguments[0].slice('refs/heads/'.length)).update(update)
})
}, 1000)
}
Deployment.prototype.listen = function () {
this.port = arguments[0]
Expand All @@ -212,57 +242,91 @@ Deployment.prototype.balance = function (port) {
var self = this
bouncy(function (req, bounce) {
var host = req.headers.host
if (!host) return console.error('Request has no host header.')
self.logger.log('Request: '+req.url+' '+host)
if (!host) return self.logger.error('Request has no host header. '+req.url)
if (self.routing[host]) return bounce.apply(bounce, self.routing[host].master)
if (!host.indexOf('.')) return noroute(bounce)

var subdomain = host.slice(0, host.indexOf('.'))
var hostdomain = host.slice(host.indexOf('.')+1)

if (!self.routing[hostdomain]) return noroute(bounce)
if (subdomain === 'www') return bounce.apply(bounce, self.routing[host].master)
if (subdomain === 'www') return bounce.apply(bounce, self.routing[hostdomain].master)
if (!self.routing[hostdomain][subdomain]) return noroute(bounce)
bounce.apply(bounce, self.routing[host][subdomain])
bounce.apply(bounce, self.routing[hostdomain][subdomain])

}).listen(port);
}

module.exports = function (repo) {
return new Deployment(repo)
module.exports = function (repo, logger) {
return new Deployment(repo, logger)
}

var service = new events.EventEmitter()

// HACK!
for (i in service) {
(function (i) {
if (typeof service[i] !== 'function') return
module.exports[i] = function () {
service[i].apply(service, arguments)
}
})(i)
}

var logger = console

module.exports.logger = function (l) {
logger = l
}

function control (port) {
var service = {}
function control (port) {

process.on('uncaughtException', function(err) {
logger.error('uncaughtException', err.stack, err)
service.close(function () {})
})

service.startHttpServer = function (port, cb) {
if (!module.exports.http) return cb(new Error('deploy.http is not defined.'))

if (!module.exports.http && !module.exports.httpServer) return cb(new Error('deploy.http is not defined.'))
module.exports.httpPort = port
module.exports.httpServer = http.createServer(module.exports.http)
if (!module.exports.httpServer) {
module.exports.httpServer = http.createServer(module.exports.http)
}
logger.info('process http port', port)
module.exports.httpServer.listen(port, cb)
service.emit('startHttpServer')
service.once('shutdown', function () {
module.exports.httpServer.close()
})
}
service.info = function (cb) {
cb(null, {port:port, httpPort:module.exports.httpPort, env:process.env, info:module.exports.info})
}
service.setInfo = function (info, cb) {
module.exports.info = info
service.emit('setInfo', info)
if (cb) cb()
}
service.close = function (cb) {
if (module.exports.httpServer) {
module.exports.httpServer.close()
if (module.exports.close) return module.exports.close(cb)
} else if (module.exports.close) {
return module.exports.close(cb)
} else {
cb()
process.exit()
}
cb()
service.emit('close')
service.emit('shutdown')
}

var server = dnode(service)
server.use(upnode.ping)
logger.info('process control port', port)
server.listen(port, function () {
console.log("Deploy Control is up on port "+port+'.')
})
service.once('shutdown', function () {
server.close()
if (module.exports.close) {
module.exports.close()
}
})
}

process.argv.forEach(function (arg) {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Expand Up @@ -2,7 +2,7 @@
"author": "Mikeal Rogers <mikeal.rogers@gmail.com>",
"name": "deployed",
"description": "Deployment system.",
"version": "0.0.5",
"version": "0.0.7",
"repository": {
"url": "git://github.com/mikeal/deployed.git"
},
Expand All @@ -12,7 +12,7 @@
"bouncy":"*",
"dnode":"*",
"git-emit":"*",
"procstreams":"*",
"procstreams":">= 0.2.0",
"pushover":"*",
"underscore":"*",
"upnode":"*"
Expand Down

0 comments on commit ba2a54b

Please sign in to comment.