Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

First pass at bulk operations.

  • Loading branch information...
commit 612d897b862f1b7fae976f7b5edf0b3de71ee578 0 parents
@othiym23 authored
1  .gitignore
@@ -0,0 +1 @@
+node_modules
4 README.md
@@ -0,0 +1,4 @@
+## Testing
+
+This module has integration tests written using [node-tap](https://github.com/isaacs/node-tap).
+To run them, you'll need elasticsearch on your path.
130 index.js
@@ -0,0 +1,130 @@
+'use strict';
+
+var querystring = require('querystring')
+ , ElasticSearchClient = require('elasticsearchclient')
+ ;
+
+var Operation = function (operation, command, document) {
+ this.operation = operation;
+ this.command = command;
+ this.document = document;
+};
+
+Operation.prototype.marshal = function () {
+ var command = {};
+ command[this.operation] = this.command;
+ var marshalled = JSON.stringify(command) + '\n';
+
+ if (this.document) marshalled += JSON.stringify(this.document) + '\n';
+
+ return marshalled;
+};
+
+
+var CORE_OPERATIONS = [
+ 'multiget',
+ 'multisearch',
+ 'percolate',
+ 'percolator',
+ 'search'
+];
+
+var BulkAPI = function (queue) {
+ this.queue = queue;
+};
+
+BulkAPI.prototype.count = function (indexName, typeName, query) {
+ var command = {
+ _index : indexName,
+ _type : typeName,
+ _query : query
+ };
+
+ this.queue.push(new Operation('count', command));
+};
+
+BulkAPI.prototype.deleteByQuery = function (indexName, typeName, queryObj) {
+ var command = {
+ _index : indexName,
+ _type : typeName,
+ _query : queryObj
+ };
+
+ this.queue.push(new Operation('delete', command));
+};
+
+BulkAPI.prototype.deleteDocument = function (indexName, typeName, documentId) {
+ var command = {
+ _index : indexName,
+ _type : typeName,
+ _id : documentId
+ };
+
+ this.queue.push(new Operation('delete', command));
+};
+
+BulkAPI.prototype.get = function (indexName, typeName, documentId) {
+ var command = {
+ _index : indexName,
+ _type : typeName,
+ _id : documentId
+ };
+
+ this.queue.push(new Operation('get', command));
+};
+
+BulkAPI.prototype.index = function (indexName, typeName, document) {
+ var command = {
+ _index : indexName,
+ _type : typeName
+ };
+
+ if (document.id) {
+ command._id = document.id;
+ delete document.id;
+ }
+
+ this.queue.push(new Operation('index', command, document));
+};
+
+BulkAPI.prototype.moreLikeThis = function (indexName, typeName, documentId) {
+ var command = {
+ _index : indexName,
+ _type : typeName,
+ _id : documentId
+ };
+
+ this.queue.push(new Operation('mlt', command));
+};
+
+BulkAPI.prototype.multiget = function (indexName, typeName, documentArray) {
+ var command = {
+ _index : indexName,
+ _type : typeName
+ };
+
+ this.queue.push(new Operation('mget', command, documentArray));
+};
+
+
+// there is a bulk operation already defined on the prototype, but it's pretty raw
+ElasticSearchClient.prototype.bulk = function (callback, options) {
+ var path = '/_bulk'
+ , qs = ''
+ , queue = []
+ , commandBuffer = ''
+ ;
+
+ if (options) qs = querystring.stringify(options);
+ if (qs.length > 0) path += "?" + qs;
+
+ callback(new BulkAPI(queue));
+ queue.forEach(function (command) { commandBuffer += command.marshal(); });
+
+ return this.createCall({path : path,
+ method : 'POST',
+ data : commandBuffer},
+ this.clientOptions);
+};
+
+module.exports = ElasticSearchClient;
29 package.json
@@ -0,0 +1,29 @@
+{
+ "name": "elasticsearch-bulk",
+ "version": "0.0.0",
+ "description": "Extends elasticsearch-client to support the bulk API.",
+ "main": "index.js",
+ "scripts": {
+ "test": "tap test/*.tap.js"
+ },
+ "repository": {
+ "type": "git",
+ "url": "https://github.com/othiym23/node-elasticsearch-bulk.git"
+ },
+ "keywords": [
+ "elasticsearch",
+ "indexing",
+ "search",
+ "client"
+ ],
+ "author": "Forrest L Norvell <ogd@aoaioxxysz.net> (http://github.com/othiym23)",
+ "license": "BSD",
+ "dependencies": {
+ "elasticsearchclient": "http://github.com/othiym23/node-elasticsearch-client/tarball/master"
+ },
+ "devDependencies": {
+ "tap": "~0.3.3",
+ "carrier": "~0.1.7",
+ "longjohn": "0.0.4"
+ }
+}
58 test/bulk.tap.js
@@ -0,0 +1,58 @@
+'use strict';
+
+var spawn = require('child_process').spawn
+ , test = require('tap').test
+ , carrier = require('carrier')
+ , longjohn = require('longjohn')
+ , ElasticSearchClient = require('../index')
+ ;
+
+/*
+ * constants
+ */
+var INDEX_NAME = 'your_index_name'
+ , OBJECT_NAME = 'your_object_name'
+ ;
+
+test("the bulk command", function (t) {
+ // shutdown shuts down the nodes, so need to run the cluster ourselves
+ var server = spawn('elasticsearch', ['-f'], {stdio : 'pipe'});
+ server.on('close', function () {
+ console.log('# elasticsearch server shut down');
+ });
+
+ this.tearDown(function () {
+ server.kill();
+ });
+
+ carrier.carry(server.stdout, function (line) {
+ if (line.match('started')) {
+ process.nextTick(function () {
+ var client = new ElasticSearchClient({
+ host : 'localhost',
+ port : 9200
+ });
+
+ t.test("simple bulk command", function (t) {
+ t.plan(1);
+ var bulk = client.bulk(function (b) {
+ b.index(INDEX_NAME, OBJECT_NAME, {name : 'name', id : '1111'});
+ b.index(INDEX_NAME, OBJECT_NAME, {name : 'another', id : '2222'});
+ b.deleteDocument(INDEX_NAME, OBJECT_NAME, '2222');
+ });
+
+ bulk.on('data', function (data) {
+ var returned = JSON.parse(data);
+ console.dir(returned.items);
+ t.equals(returned.items.length, 3, "succeeded");
+ t.end();
+ });
+
+ bulk.exec();
+ });
+
+ t.end();
+ });
+ }
+ });
+});
Please sign in to comment.
Something went wrong with that request. Please try again.