Skip to content

Commit

Permalink
feat(events): create server event emitter
Browse files Browse the repository at this point in the history
This commit adds the class `Topic`, a redis-based event emitter
permitting controllers to subscribe/unsubscribe from channels and react
to events on those channels.  Redis is already an installation
requirement for the application, so this adds minimal dependency
overhead.

The basic usage looks like this:
```js
// import Topic
const Topic = require('topic');

Topic.subscribe('channelName', (data) => console.log(data));
Topic.publish('channelName, { key : 'value' });
Topic.unsubscribe('channelName');
```

Closes #390.
  • Loading branch information
jniles committed May 23, 2016
1 parent acce2df commit 662c7fb
Show file tree
Hide file tree
Showing 8 changed files with 505 additions and 128 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"express-handlebars": "^3.0.0",
"express-session": "^1.11.3",
"helmet": "^2.0.0",
"ioredis": "^1.15.1",
"lodash": "^4.2.1",
"mkdirp": "^0.5.1",
"morgan": "^1.6.1",
Expand Down
16 changes: 9 additions & 7 deletions server/config/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*
* Initialise link between server paths and controller logic
*
* @TODO Pass authenticate and authorize middleware down through
* @todo Pass authenticate and authorize middleware down through
* controllers, allowing for modules to subscribe to different
* levels of authority
*/
Expand Down Expand Up @@ -63,19 +63,23 @@ var referenceGroup = require('../controllers/finance/referenceGroup');
var sectionResultats = require('../controllers/finance/sectionResultat');
var sectionBilans = require('../controllers/finance/sectionBilan');
var creditors = require('../controllers/finance/creditors.js');
const events = require('../controllers/events');

const upload = require('../lib/uploader');

// expose routes to the server.
exports.configure = function (app) {
exports.configure = function configure(app) {
winston.debug('Configuring routes');

// exposed to the outside without authentication
app.get('/languages', users.getLanguages);
app.get('/projects', projects.list);

app.get('/units', units.list);

// event architecture
app.get('/events', events.list);
app.get('/stream', events.stream);

app.post('/login', auth.login);
app.get('/logout', auth.logout);

Expand Down Expand Up @@ -130,23 +134,21 @@ exports.configure = function (app) {
app.put('/cost_centers/:id', costCenter.update);
app.delete('/cost_centers/:id', costCenter.remove);

//API for service routes

// API for service routes
app.post('/services', services.create);
app.get('/services', services.list);
app.get('/services/:id', services.detail);
app.put('/services/:id', services.update);
app.delete('/services/:id', services.remove);

//API for profit_center routes crud
// API for profit_center routes crud
app.get('/profit_centers', profitCenter.list);
app.get('/profit_centers/:id', profitCenter.detail);
app.get('/profit_centers/:id/profit', profitCenter.getProfitValue);
app.post('/profit_centers', profitCenter.create);
app.put('/profit_centers/:id', profitCenter.update);
app.delete('/profit_centers/:id', profitCenter.remove);


//API for reference routes crud
app.get('/references', reference.list);
app.get('/references/:id', reference.detail);
Expand Down
77 changes: 56 additions & 21 deletions server/controllers/auth.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/**
* @overview
* Authentication Controller
*
* This controller is responsible for managing user authentication and
Expand All @@ -8,35 +9,47 @@
* user's ability to selected routes.
*
* @requires lib/db
* @requires lib/topic
* @requires lib/errors/Unauthorized
* @requires lib/errors/Forbidden
* @requires lib/errors/InternalServerError
*/
'use strict';

const db = require('../lib/db');
const Unauthorized = require('../lib/errors/Unauthorized');
const Forbidden = require('../lib/errors/Forbidden');
const InternalServerError = require('../lib/errors/InternalServerError');
const topic = require('../lib/topic');

// POST /login
// This route will accept a login request with the
// username, password and project id for user. Upon
// successful login, it creates a user session with
// all enterprise, project, and user data for easy access.
exports.login = function login(req, res, next) {
'use strict';
exports.login = login;

// GET /logout
exports.logout = logout;

/**
* @method login
*
* @description
* Logs a client into the server. The /login route accepts a POST request with
* a username, password, and project id. It checks if the username and password
* exist in the database, then verifies that the user has permission to access
* the database all enterprise, project, and user data for easy access.
*/
function login(req, res, next) {
let username = req.body.username;
let password = req.body.password;
let projectId = req.body.project;

const session = {};

let sql =
`SELECT user.id, user.username, user.first, user.last, user.email, project.enterprise_id , project.id AS project_id
let sql = `
SELECT user.id, user.username, user.first, user.last, user.email, project.enterprise_id , project.id AS project_id
FROM user JOIN project_permission JOIN project ON
user.id = project_permission.user_id AND project.id = project_permission.project_id
WHERE user.username = ? AND user.password = PASSWORD(?) AND project_permission.project_id = ?;`;
WHERE user.username = ? AND user.password = PASSWORD(?) AND project_permission.project_id = ?;
`;

db.exec(sql, [username, password, projectId])
.then(function (rows) {
Expand All @@ -63,7 +76,8 @@ exports.login = function login(req, res, next) {
}

// update the database for when the user logged in
sql = 'UPDATE user SET user.active = 1, user.last_login = ? WHERE user.id = ?;';
sql =
'UPDATE user SET user.active = 1, user.last_login = ? WHERE user.id = ?;';

return db.exec(sql, [new Date(), session.user.id]);
})
Expand All @@ -82,6 +96,7 @@ exports.login = function login(req, res, next) {
return db.exec(sql, [session.user.enterprise_id]);
})
.then(function (rows) {

if (rows.length === 0) {
throw new InternalServerError('There are no enterprises registered in the database!');
}
Expand All @@ -103,29 +118,49 @@ exports.login = function login(req, res, next) {
session.project = rows[0];

// bind the session variables
req.session.project = session.project;
req.session.user = session.user;
req.session.enterprise = session.enterprise;
req.session.project = session.project;

// broadcast LOGIN event
topic.publish(topic.channels.APP, {
event: topic.events.LOGIN,
entity: topic.entities.USER,
user_id : req.session.user.id,
id: session.user.id
});

// send the session data back to the client
res.status(200).json(session);
})
.catch(next)
.done();
};

// GET /logout
// Destroys a user's session
exports.logout = function logout(req, res, next) {
}

var sql =
'UPDATE user SET user.active = 0 WHERE user.id = ?';
/**
* @method logout
*
* Destroys the server side session and sets the user as inactive.
*/
function logout(req, res, next) {
let sql =
'UPDATE user SET user.active = 0 WHERE user.id = ?;';

db.exec(sql, [req.session.user.id])
.then(function () {
.then(() => {

// broadcast LOGOUT event
topic.publish(topic.channels.APP, {
event: topic.events.LOGOUT,
entity: topic.entities.USER,
user_id : req.session.user.id,
id: req.session.user.id,
});

// destroy the session
req.session.destroy();
res.status(200).send();
res.sendStatus(200);
})
.catch(next)
.done();
};
}
55 changes: 55 additions & 0 deletions server/controllers/events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* @overview
* This controller uses the topic library to broadcast events to the client
* along a server-sent events channel. It includes two channels:
* 1. `/stream` for real-time event broadcasts.
* 2. `/events` for listing all events in the last day
*
* @requires lib/db
* @requires lib/topic
*/

'use strict';

const db = require('../lib/db');
const topic = require('../lib/topic');

// GET /stream
exports.stream = stream;

// GET /events
exports.list = list;

// event stream to be set to the client
function stream(req, res) {

// ensure the socket hangs open forever
res.set('Content-Type', 'text/event-stream');
res.set('Content-Control', 'no-cache');

// this listener publishes events to the client as server-sent events
function listener(data) {
res.write(`data: ${JSON.stringify(data)}\n\n`).flush();
}

// listen for server events and echo them to the client
topic.subscribe(topic.channels.ALL, listener);

// remove listener on when the client closes the connection
res.on('close', () => topic.unsubscribe(topic.channels.ALL, listener));
}

// list the events in the database
function list(req, res, next) {
let sql = `
SELECT event.data FROM event LIMIT 500;
`;

db.exec(sql)
.then(rows => {
let events = rows.map(row => JSON.parse(row.data));
res.status(200).json(events);
})
.catch(next)
.done();
}
45 changes: 39 additions & 6 deletions server/controllers/medical/patients/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
* @module medical/patient
*
* @description
* The /patient HTTP API endpoint
*
* @description
* This module is responsible for handling all crud operations relatives to patients
* and define all patient API functions.
*
*
* @requires lodash
* @requires lib/db
* @requires lib/topic
* @requires lib/node-uuid
* @requires lib/errors/BadRequest
* @requires lib/errors/NotFound
Expand All @@ -21,6 +26,7 @@

const _ = require('lodash');
const db = require('../../../lib/db');
const topic = require('../../lib/topic');
const uuid = require('node-uuid');
const BadRequest = require('../../../lib/errors/BadRequest');
const NotFound = require('../../../lib/errors/NotFound');
Expand Down Expand Up @@ -119,6 +125,14 @@ function create(req, res, next) {
res.status(201).json({
uuid : uuid.unparse(medical.uuid)
});

// publish a CREATE event on the medical channel
topic.publish(topic.channels.MEDICAL, {
event: topic.events.CREATE,
entity: topic.entities.PATIENT,
user_id: req.session.user.id,
uuid: uuid.unparse(medical.uuid)
});
})
.catch(next)
.done();
Expand All @@ -131,7 +145,15 @@ function generatePatientText(patient) {
}

/**
* Returns details associated to a patient directly and indirectly
* @method detail
*
* @description
* Returns details associated to a patient directly and indirectly.
*
* @example
* var patient = require('medical/patient');
* patient.detail(req, res, next);
*
* @todo review if this many details should be returned under a patient end point
*/
function detail(req, res, next) {
Expand Down Expand Up @@ -164,6 +186,14 @@ function update(req, res, next) {
})
.then(function (updatedPatient) {
res.status(200).json(updatedPatient);

// publish an UPDATE event on the medical channel
topic.publish(topic.channels.MEDICAL, {
event: topic.events.UPDATE,
entity: topic.entities.PATIENT,
user_id: req.session.user.id,
uuid: patientUuid
});
})
.catch(next)
.done();
Expand All @@ -172,7 +202,7 @@ function update(req, res, next) {
function handleFetchPatient(patientUuid) {

// convert uuid to database usable binary uuid
var buid = db.bid(patientUuid);
let buid = db.bid(patientUuid);

var patientDetailQuery =
`SELECT BUID(p.uuid) as uuid, p.project_id, BUID(p.debtor_uuid) AS debtor_uuid, p.first_name,
Expand Down Expand Up @@ -340,11 +370,14 @@ function visit(req, res, next) {
.done();
}

/**
* @function logVisit
*/
function logVisit(patientData, userId) {
var visitId = db.bid(uuid.v4());
var sql =
'INSERT INTO `patient_visit` (`uuid`, `patient_uuid`, `registered_by`) VALUES (?, ?, ?)';
return db.exec(sql, [visitId, db.bid(patientData.uuid), userId]);
let visitId = db.bid(uuid.v4());
let sql =
'INSERT INTO patient_visit (uuid, patient_uuid, registered_by) VALUES (?);';
return db.exec(sql, [[visitId, db.bid(patientData.uuid), userId]]);
}

/**
Expand Down
Loading

0 comments on commit 662c7fb

Please sign in to comment.