-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.js
150 lines (129 loc) · 3.94 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// This example makes a web site providing an uppercasing service over
// SockJS. The web page sends the user's input over a SockJS socket,
// which is relayed to a REQuest socket which we're listening on with
// a REPly socket. The answer is then calculated and sent back to the
// browser.
//
// You may ask "Why not just reply directly instead of going through
// RabbitMQ?". Well, imagine that the uppercasing was in fact some
// specialised job that was running in another program, and further
// that we might wish to run several instances of that program to keep
// up with the requests. By using RabbitMQ, the requests will be
// load-balanced among all programs listening on a REPly socket.
var http = require('http');
var url = require('url');
var fs = require('fs');
var sockjs = require('sockjs');
var mongodb = require('mongodb');
console.log("Connecting to RabbitMQ at " + rabbitUrl());
var context = require('rabbit.js').createContext(rabbitUrl());
var port = process.env.VCAP_APP_PORT || 8181
// Create a web server on which we'll serve our demo page, and listen
// for SockJS connections.
var httpserver = http.createServer(handler);// Listen for SockJS connections
var sockjs_opts = {sockjs_url: "http://cdn.sockjs.org/sockjs-0.2.min.js", websocket: false};
var sjs = sockjs.createServer(sockjs_opts);
sjs.installHandlers(httpserver, {prefix: '[/]socks'});
// Hook requesting sockets up
sjs.on('connection', function(connection) {
addConnection(connection);
});
var mongoCollection;
mongodb.connect(mongoUrl(), function(err, conn) {
conn.collection('logs', function(err, coll) {
mongoCollection = coll;
});
});
context.on('ready', function() {
var sub = context.socket('SUB');
sub.setEncoding('utf8');
sub.connect({exchange: "amq.topic", pattern: "logs.#"});
sub.on('data', function(msg) {
// Store the message in MongoDB.
storeLog(msg);
// and broadcast to all SockJS connections.
broadcast(msg);
});
// And finally, start the web server.
httpserver.listen(port);
});
// ==== boring details
function storeLog(msg) {
mongoCollection.insert({ 'msg': msg }, { safe: true }, function(err) {
if (err) console.log("Failed to add log message to MongoDB: " + err);
});
}
function rabbitUrl() {
if (process.env.VCAP_SERVICES) {
conf = JSON.parse(process.env.VCAP_SERVICES);
return conf['rabbitmq-2.4'][0].credentials.url;
}
else {
return "amqp://localhost:5672";
}
}
function mongoUrl() {
var conf = mongoConf();
conf.hostname = (conf.hostname || 'localhost');
conf.port = (conf.port || 27017);
conf.db = (conf.db || 'test');
if (conf.username && conf.password) {
return "mongodb://" + conf.username +
":" + conf.password + "@" + conf.hostname +
":" + conf.port + "/" + conf.db;
}
else {
return "mongodb://" + conf.hostname + ":" + conf.port + "/" + conf.db;
}
}
function mongoConf() {
if (process.env.VCAP_SERVICES) {
var env = JSON.parse(process.env.VCAP_SERVICES);
return env['mongodb-1.8'][0]['credentials'];
}
else {
return {
"hostname":"localhost",
"port":27017,
"username":"",
"password":"",
"name":"",
"db":"db"
}
}
}
function handler(req, res) {
var path = url.parse(req.url).pathname;
switch (path){
case '/':
case '/index.html':
fs.readFile(__dirname + '/index.html', function(err, data) {
if (err) return send404(res);
res.writeHead(200, {'Content-Type': 'text/html'});
res.write(data, 'utf8');
res.end();
});
break;
default: send404(res);
}
}
function send404(res) {
res.writeHead(404);
res.write('404');
return res.end();
}
var sockets = [];
function addConnection(connection) {
sockets.push(connection);
connection.on('close', function() {
var i = sockets.indexOf(connection);
if (i > -1) {
sockets.splice(i, 1);
}
});
}
function broadcast(msg) {
for (var i = 0, len = sockets.length; i < len; i++) {
sockets[i].write(msg);
}
}