Skip to content
Permalink
Browse files

retrieve initial routes from mongodb

  • Loading branch information...
Filirom1 committed Nov 18, 2013
1 parent cd9658a commit 551c0b7f015c652f508d250e98ba30674228ece6
Showing with 89 additions and 12 deletions.
  1. +6 −1 bin/openshift-nginx-routing
  2. +1 −0 conf/openshift-nginx-routing.conf
  3. +79 −10 index.js
  4. +3 −1 package.json
@@ -31,4 +31,9 @@ conf.toString().replace(/\#.*/g, '').split('\n').forEach(function(kv){
options.regExp = new RegExp(options.regExp);

var nginxRouting = new NginxRouting(options);
nginxRouting.listenOnActiveMq();
nginxRouting.retrieveRoutes(function(error){
if(error){
console.error(error);
}
nginxRouting.listenOnActiveMq();
});
@@ -8,4 +8,5 @@ nginxConfigDir=/etc/nginx/conf.d/
nginxPidFile=/var/run/nginx.pid
nginxDefaultCertificateFile=/etc/ssl/certs/server.crt
nginxDefaultPrivateKeyFile=/etc/ssl/certs/server.key
mongodbUrl=mongodb://openshift:mongopass@broker.example.com:27017/openshift_broker
regExp=^.*$
@@ -5,7 +5,8 @@ var yaml = require('js-yaml');
var _ = require('underscore');
var async = require('async');
var stomp = require('stomp');

var mongodb = require('mongodb');
var uuid = require('uuid');

exports = module.exports = OORouter;

@@ -45,30 +46,31 @@ var doReloadNginx = _.throttle(function(){

// Listen on activeMQ, and parse message body from yaml
OORouter.prototype.listenOnActiveMq = function(){
this.client = new stomp.Stomp({
console.log("listen on activeMq", this.options.activemq_login + '@' + this.options.activemq_host + ':' + this.options.activemq_port)
this.stompClient = new stomp.Stomp({
port: this.options.activemq_port,
host: this.options.activemq_host,
login: this.options.activemq_login,
passcode: this.options.activemq_password,
});

this.client.connect();
this.stompClient.connect();
var self = this;
self.client.once('connected', function() {
self.stompClient.once('connected', function() {
console.log('Connected. Subscribe on', self.options.activemq_queue)
self.client.subscribe({
self.stompClient.subscribe({
destination: self.options.activemq_queue,
ack: 'client'
ack: 'stompClient'
}, function(){});
});

self.client.on('error', function(error_frame) {
self.stompClient.on('error', function(error_frame) {
console.error('stomp error', error_frame.body);
self.disconnect();
});

self.client.on('message', function(message) {
self.client.ack(message.headers['message-id']);
self.stompClient.on('message', function(message) {
self.stompClient.ack(message.headers['message-id']);
console.log(message.headers['message-id'] ,message.body.toString());
var body = yaml.safeLoad(message.body.toString());
self.dispatch(message.headers['message-id'], body, function(error){
@@ -86,7 +88,7 @@ OORouter.prototype.listenOnActiveMq = function(){
}

OORouter.prototype.disconnect = function(){
this.client.disconnect();
this.stompClient.disconnect();
}

// message coming from activemq_routing_plugin
@@ -291,3 +293,70 @@ OORouter.prototype.onRegExpChange = function(cb){
cb();
});
}

// Retrive routes from MongoDB and transform data into routing messages for the dispatch function.
OORouter.prototype.retrieveRoutes = function(cb){
console.log('Retrieve routes from MongoDB');
var mongoClient = mongodb.MongoClient;
var self = this;
mongoClient.connect(this.options.mongodbUrl, function(error, db){
if(error){
return cb(error);
}
var collection = db.collection('applications');
var stream = collection.find().stream();
stream.once('error', function(error){
cb(error)
});
stream.once('end', function(){
db.close();
self.reloadNginx(function(error){
if(error){
return cb(error);
}
cb();
});
});
stream.on('data', function(app){
if(!app.ha) return;
if(!app.group_instances) return;
app.group_instances.forEach(function(group){
if(!group.gears) return;
group.gears.forEach(function(gear){
if(!gear.port_interfaces) return;
gear.port_interfaces.forEach(function(portInterface){
var id = 'init-' + uuid.v4();
async.series([function(cb){
// only send :create_application if application doesn't exists
if(self.routes[app.name + '-' + app.domain_namespace]) return cb();
var message = {};
message[':action'] = ':create_application';
message[':app_name'] = app.name;
message[':namespace'] = app.domain_namespace;
console.log(id, message);
self.dispatch(id, message, cb);
}, function(cb){
var message = {};
message[':action'] = ':add_gear';
message[':app_name'] = app.name;
message[':namespace'] = app.domain_namespace;
message[':protocols'] = portInterface.protocols;
message[':types'] = portInterface.type;
message[':mappings'] = portInterface.mappings;
message[':public_port'] = portInterface.external_port;
message[':public_port_name'] = portInterface.cartridge_name;
message[':public_address'] = gear.server_identity;
console.log(id, message);
self.dispatch(id, message, cb);
}], function(error){
if(error){
console.error(error);
}
console.log(id, 'OK');
});
});
});
});
});
});
}
@@ -12,6 +12,8 @@
"async": "~0.2.9",
"stomp": "~0.1.1",
"rimraf": "~2.2.2",
"js-yaml": "~2.1.3"
"js-yaml": "~2.1.3",
"uuid": "~1.4.1",
"mongodb": "~1.3.19"
}
}

0 comments on commit 551c0b7

Please sign in to comment.
You can’t perform that action at this time.