diff --git a/.travis.yml b/.travis.yml index 8decc815..35c0dbed 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ sudo: false services: - mongodb - redis-server + - elasticsearch # - couchdb language: node_js diff --git a/README.md b/README.md index fbc569e3..d626e9b2 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ The project goal is to provide an eventstore implementation for node.js: - load and store events via EventStream object - event dispatching to your publisher (optional) -- supported Dbs (inmemory, mongodb, redis, tingodb, azuretable) +- supported Dbs (inmemory, mongodb, redis, tingodb, elasticsearch, azuretable) - snapshot support - query your events @@ -81,6 +81,18 @@ example with tingodb: timeout: 10000 // optional }); +example with elasticsearch: + + var es = require('eventstore')({ + type: 'elasticsearch', + host: 'localhost:9200', // optional + indexName: 'eventstore', // optional + eventsTypeName: 'events', // optional + snapshotsTypeName: 'snapshots', // optional + log: 'warning', // optional + maxSearchResults: 10000 // optional + }); + example with azuretable: var es = require('eventstore')({ diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js new file mode 100644 index 00000000..77acd1ba --- /dev/null +++ b/lib/databases/elasticsearch.js @@ -0,0 +1,263 @@ +'use strict'; + +var util = require('util'), + Store = require('../base'), + _ = require('lodash'), + uuid = require('uuid'), + elasticsearch = Store.use('elasticsearch'), + elasticsearchVersion = Store.use('elasticsearch/package.json').version, + debug = require('debug')('eventstore:store:elasticsearch'); + +function Elastic(options) { + options = options || {}; + + Store.call(this, options); + + var defaults = { + host: 'localhost:9200', + indexName: 'eventstore', + eventsTypeName: 'events', + snapshotsTypeName: 'snapshots', + log: 'warning', + maxSearchResults: 10000 + }; + + _.defaults(options, defaults); + + var defaultOpt = { + auto_reconnect: false, + ssl: false + }; + + options.options = options.options || {}; + + _.defaults(options.options, defaultOpt); + + this.options = options; +} + +util.inherits(Elastic, Store); + +_.extend(Elastic.prototype, { + + connect: function (callback) { + var options = this.options; + this.client = new elasticsearch.Client({host: options.host, log: options.log}); + this.emit('connect'); + if (callback) callback(null); + }, + + disconnect: function (callback) { + this.client = null; + this.emit('disconnect'); + if (callback) callback(null); + }, + + clear: function (callback) { + var self = this; + var options = this.options; + this.client.indices.exists({index: options.indexName}, function (err, result) { + if (result){ + self.client.indices.delete({index: options.indexName}, function (err) { + if (callback) callback(err); + }); + } else { + if (callback) callback(err); + } + }); + }, + + getNewId: function(callback) { + callback(null, uuid.v4()); + }, + + addEvents: function (events, callback) { + var options = this.options; + + if (events.length === 0) { + if (callback) callback(null); + return; + } + + var noAggId = false + var bulkMap = []; + + _.forEach(events, function (evt) { + if (!evt.aggregateId) { + noAggId = true; + } + evt.dispatched = false; + bulkMap.push({create: {_index: options.indexName, _type: options.eventsTypeName, _id: evt.id}}); + bulkMap.push(evt); + }); + + if (noAggId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + this.client.bulk({body: bulkMap, refresh: true}, function(error, response){ + if (callback) callback(error); + }); + }, + + _search: function (type, find, sort, skip, limit, callback) { + var options = this.options; + var searchOptions = { + index: this.options.indexName, + type: type, + defaultOperator: 'AND', + from: (!skip ? 0 : skip), + size: (!limit || limit === -1 ? options.maxSearchResults : limit) + }; + if (find && find.length) searchOptions.q = find.join(' AND '); + if (sort && sort.length) searchOptions.sort = sort; + + this.client.search(searchOptions, function (error, response) { + var dataList = []; + if (response && response.hits && response.hits.hits && response.hits.hits.length) { + if (response.hits.hits.length >= options.maxSearchResults){ + var errMsg = 'reached maximum of ' + options.maxSearchResults + ' search results!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + dataList = response.hits.hits.map(function (data) { + data._source.commitStamp = new Date(data._source.commitStamp); + return data._source; + }); + } + callback(null, dataList); + }); + }, + + _searchEvents: function(find, skip, limit, callback) { + this._search(this.options.eventsTypeName, find, ['commitStamp:asc', 'streamRevision:asc', 'commitSequence:asc'], skip, limit, callback); + }, + + _searchSnapshots: function(find, skip, limit, callback) { + this._search(this.options.snapshotsTypeName, find, ['revision:desc', 'version:desc', 'commitStamp:desc'], skip, limit, callback); + }, + + getEvents: function (query, skip, limit, callback) { + var findStatement = []; + if (query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query.context) findStatement.push('context:' + query.context); + if (query.aggregateId) findStatement.push('aggregateId:' + query.aggregateId); + + this._searchEvents(findStatement, skip, limit, callback); + }, + + getLastEvent: function (query, callback) { + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var findStatement = [ 'aggregateId:' + query.aggregateId ]; + if (query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query.context) findStatement.push('context:' + query.context); + + this._search(this.options.eventsTypeName, findStatement, ['commitStamp:desc', 'streamRevision:desc', 'commitSequence:desc'], 0, 1, function(error, response){ + var event = response && response.length ? response[0] : null; + if (callback) callback(null, event); + }); + }, + + getEventsSince: function (date, skip, limit, callback) { + var findStatement = ['commitStamp:[' + date.toJSON() + ' TO *]']; + + this._searchEvents(findStatement, skip, limit, callback); + }, + + getEventsByRevision: function (query, revMin, revMax, callback) { + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var findStatement = []; + if (revMax === -1) { + findStatement.push('streamRevision:[' + revMin + ' TO *]'); + } else { + findStatement.push('streamRevision:[' + revMin + ' TO ' + revMax + '}'); + } + findStatement.push('aggregateId:' + query.aggregateId); + if (query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query.context) findStatement.push('context:' + query.context); + + this._searchEvents(findStatement, null, null, callback); + }, + + getUndispatchedEvents: function (query, callback) { + var findStatement = ['dispatched:false']; + if (query && query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query && query.context) findStatement.push('context:' + query.context); + if (query && query.aggregateId) findStatement.push('aggregateId:' + query.aggregateId); + + this._searchEvents(findStatement, null, null, callback); + }, + + setEventToDispatched: function (id, callback) { + this.client.update({ + index: this.options.indexName, + type: this.options.eventsTypeName, + id: id, + body: { + doc: { + dispatched: true + } + }, + refresh: true + }, function (error, response) { + if (callback) callback(error); + }); + }, + + addSnapshot: function(snap, callback) { + if (!snap.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + this.client.create({ + index: this.options.indexName, + type: this.options.snapshotsTypeName, + id: snap.id, + body: snap, + refresh: true + }, function (error, response) { + if (callback) callback(error); + }); + }, + + getSnapshot: function (query, revMax, callback) { + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var findStatement = ['aggregateId:' + query.aggregateId]; + + if (query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query.context) findStatement.push('context:' + query.context); + if (revMax > -1) findStatement.push('revision:[* TO ' + revMax + ']'); + + this._searchSnapshots(findStatement, 0, 1, function(error, response){ + var snap = response && response.length ? response[0] : null; + if (callback) callback(null, snap); + }); + } + +}); + +module.exports = Elastic; diff --git a/package.json b/package.json index 6b317e36..68d84514 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,8 @@ "tingodb", "azure", "azuretable", - "inmemory" + "inmemory", + "elasticsearch" ], "main": "./index.js", "directories": { @@ -46,11 +47,13 @@ "lodash": "3.10.1", "node-uuid": "1.4.7", "parent-require": "1.0.0", - "tolerance": "1.0.0" + "tolerance": "1.0.0", + "uuid": "^2.0.1" }, "devDependencies": { "azure-storage": ">= 0.3.0", "cradle": ">=0.6.7", + "elasticsearch": ">= 10.0.0", "eslint": ">=1.0.0", "expect.js": ">= 0.1.2", "mocha": ">= 1.0.1", diff --git a/test/eventstoreTest.js b/test/eventstoreTest.js index 3027635d..03b8f0ec 100644 --- a/test/eventstoreTest.js +++ b/test/eventstoreTest.js @@ -800,7 +800,7 @@ describe('eventstore', function () { describe('with options containing a type property with the value of', function () { - var types = ['inmemory', 'mongodb', 'tingodb', 'redis'/*, 'azuretable'*/]; + var types = ['inmemory', 'mongodb', 'tingodb', 'redis', 'elasticsearch'/*, 'azuretable'*/]; types.forEach(function (type) { diff --git a/test/storeTest.js b/test/storeTest.js index 3ef93886..d64d67cd 100644 --- a/test/storeTest.js +++ b/test/storeTest.js @@ -3,7 +3,7 @@ var expect = require('expect.js'), async = require('async'), _ = require('lodash'); -var types = ['inmemory', 'mongodb', 'tingodb', 'redis'/*, 'azuretable'*/]; +var types = ['inmemory', 'mongodb', 'tingodb', 'redis', 'elasticsearch'/*, 'azuretable'*/]; types.forEach(function (type) {