Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Bulk loading for elastic search
Java Ruby Shell

Fetching latest commit…

Cannot retrieve the latest commit at this time

Failed to load latest commit information.



Wonderdog is a bulkloader for Elastic Search.


Hadoop cluster setup:

Wonderdog makes use of hadoop to do its bulk loading so you’ll need to have a fully functional hadoop cluster lying around. However, since wonderdog uses hadoop’s distributed cache to distribute configuration and other files, no additional configuration of the hadoop cluster is necessary.

ElasticSearch cluster setup:

Well, you’ll have to have an elasticsearch cluster setup somewhere. There is far better documentation for doing that elsewhere, namely the elasticsearch guide


Native Hadoop TSV Loader

Note: the tsv loader is deprecated. Instead, use the ElasticSearchOutputFormat coupled with either Apache Pig storefunc (ElasticSearchIndex or ElasticSearchJsonIndex).

Once you’ve got a working set up you should be ready to launch your bulkload process. The best way to explain is with an example. Say you’ve got a tsv file of user records (name,login,email,description) and you want to index all the fields. Assuming you’re going to write to an index called users with objects of type user (elasticsearch will create this object automatically the first time you upload one). The workflow is as follows:

  • Create the users index:

bin/estool --host=<elasticsearch_host> --index_name=users create_index
  • Upload the data

# Will only work if the hadoop elasticsearch processes can discover the running elasticsearch cluster
bin/wonderdog --rm --index_name=users --bulk_size=4096 --object_type=user --field_names=name,login,email,description --id_field=1 /hdfs/path/to/users.tsv /tmp/failed_records/users

Notice the output path. When the bulk indexing job runs it is possible for index requests to fail for various reasons (too much load, etc). In this case the documents that failed are simply written to the hdfs so they can be retried in a later job.

  • Refresh Index

After the bulk load is finished you’ll want to refresh the index so your documents will actually be searchable:

bin/estool --host=<elasticsearch_host> --index_name=users refresh_index
  • Snapshot Index

You’ll definitely want to do this after the bulk load finishes so you don’t lose any data in case of cluster failure:

bin/estool --host=<elasticsearch_host> --index_name=users snapshot_index
  • Bump the replicas for the index up to at least one.

bin/estool --host=<elasticsearch_host> --index_name=users --replicas=1 set_replicas

This will take a while to finish and the cluster health will show yellow until it does.

  • Optimize the index

bin/estool --host=<elasticsearch_host> --index_name=users optimize_index

This will also take a while to finish.

TSV loader command-line options

  • index_name – Index to write data to. It does not have to exist ahead of time
  • object_type – Type of object to index. The mapping for this object does not have to exist ahead of time. Fields will be updated dynamically by elasticsearch.
  • field_names – A comma separated list of field names describing the tsv record input
  • id_field – Index of field to use as object id (counting from 0; default 1), use -1 if there is no id field
  • bulk_size – Number of records per bulk request sent to elasticsearch cluster
  • es_home – Path to elasticsearch installation, read from the ES_HOME environment variable if it’s set
  • es_config – Path to elasticsearch config file (elasticsearch.yml)
  • rm – Remove existing output? (true or leave blank)
  • hadoop_home – Path to hadoop installation, read from the HADOOP_HOME environment variable if it’s set
  • min_split_size – Min split size for maps

Using the ElasticSearchIndex and ElasticSearchJsonIndex Pig Store Functions

The most up-to-date (and simplest) way to store data into elasticsearch with hadoop is to use the Pig Store Functions. Here you’ve got two options.


This allows you to store tabular data (eg. tsv, csv) into elasticsearch.

register target/wonderdog.jar
register /usr/local/share/elasticsearch/lib/elasticsearch-0.14.2.jar
register /usr/local/share/elasticsearch/lib/jline-0.9.94.jar
register /usr/local/share/elasticsearch/lib/jna-3.2.7.jar
register /usr/local/share/elasticsearch/lib/log4j-1.2.15.jar
register /usr/local/share/elasticsearch/lib/lucene-analyzers-3.0.3.jar
register /usr/local/share/elasticsearch/lib/lucene-core-3.0.3.jar
register /usr/local/share/elasticsearch/lib/lucene-fast-vector-highlighter-3.0.3.jar
register /usr/local/share/elasticsearch/lib/lucene-highlighter-3.0.3.jar
register /usr/local/share/elasticsearch/lib/lucene-memory-3.0.3.jar
register /usr/local/share/elasticsearch/lib/lucene-queries-3.0.3.jar

%default INDEX 'ufo_sightings'
%default OBJ   'ufo_sighting'        

ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv' AS (sighted_at:long, reported_at:long, location:chararray, shape:chararray, duration:chararray, description:chararray);
STORE ufo_sightings INTO 'es://$INDEX/$OBJ' USING com.infochimps.elasticsearch.pig.ElasticSearchIndex('-1', '1000');

Other constructors for the udf include:

  • ElasticSearchIndex()
  • ElasticSearchIndex(idField, bulkSize)
  • ElasticSearchIndex(idField, bulkSize, esConfig)
  • ElasticSearchIndex(idField, bulkSize, esConfig, esPlugins)


idField = Which field of the record to use as the record id. If none is passed in
then the record is assumed to have no id.
bulkSize = Number of records for ElasticSearchOutputFormat to batch up before sending
a bulk index request to Elastic Search. Default: 1000.
esConfig = Full path to local elasticsearch.yml. Default: /etc/elasticsearch/elasticsearch.yml
esPlugins = Full path to local elastic search plugins dir. Default: /usr/local/share/elasticsearch/plugins

Note that elasticsearch.yml and the plugins directory are distributed to every machine in the cluster automatically via hadoop’s distributed cache mechanism.


This allows you to store arbitrary json data into elasticsearch.

register target/wonderdog-1.0-SNAPSHOT.jar;
register /usr/local/share/elasticsearch/lib/elasticsearch-0.14.2.jar;
register /usr/local/share/elasticsearch/lib/jline-0.9.94.jar;
register /usr/local/share/elasticsearch/lib/jna-3.2.7.jar;
register /usr/local/share/elasticsearch/lib/log4j-1.2.15.jar;
register /usr/local/share/elasticsearch/lib/lucene-analyzers-3.0.3.jar;
register /usr/local/share/elasticsearch/lib/lucene-core-3.0.3.jar;
register /usr/local/share/elasticsearch/lib/lucene-fast-vector-highlighter-3.0.3.jar;
register /usr/local/share/elasticsearch/lib/lucene-highlighter-3.0.3.jar;
register /usr/local/share/elasticsearch/lib/lucene-memory-3.0.3.jar;
register /usr/local/share/elasticsearch/lib/lucene-queries-3.0.3.jar;        
%default INDEX 'foo_test'
%default OBJ   'foo'        

foo = LOAD 'test/foo.json' AS (data:chararray);
STORE foo INTO 'es://$INDEX/$OBJ' USING com.infochimps.elasticsearch.pig.ElasticSearchJsonIndex('-1', '10');


There are a number of convenience commands in bin/estool. Enumerating a few:

  • Print status of all indices as a json hash to the terminal

# See everything (tmi)
bin/estool --host=<elasticsearch_host> status

# Just see index level stuff on docs and replicas:
bin/estool --host=<elasticsearch_host> status | grep -C10 number_of_replicas
  • Check cluster health (red,green,yellow,relocated shards, etc)

bin/estool --host=<elasticsearch_host> health
  • Set replicas for an index

bin/estool --host=<elasticsearch_host> --index_name=<index_name> --replicas=<num_replicas> set_replicas
  • Optimize an index

bin/estool --host=<elasticsearch_host> --index_name=<index_name> optimize_index
  • Snapshot an index

bin/estool --host=<elasticsearch_host> --index_name=<index_name> snapshot_index
  • Delete an index

You’d better be sure.

bin/estool --host=<elasticsearch_host> --index_name=<index_name> delete_index

And so on. Most of the common rest api operations have be mapped into estool.


How to choose shards, replicas and cluster size: Rules of Thumb.

sh = shards
rf = replication factor. replicas = 0 implies rf = 1, or 1 replica of each shard.

pm = running data_esnode processes per machine
N = number of machines

n_cores = number of cpu cores per machine
n_disks = number of disks per machine

  • You must have at least as many data_esnodes as
    Mandatory: (sh * rf) < (pm * N)
Shards: shard size < 10GB

More shards = more parallel writes

curl XPUT “`cat /etc/motd | grep IP | cuttab 2`:9200/tweet-2010q1/_settings” -d ‘{ “index”:{ number_of_replicas } }’

Something went wrong with that request. Please try again.