Skip to content
Bulk loading for elastic search
Java Ruby Shell
Find file
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Failed to load latest commit information.



Wonderdog is a bulkloader for Elastic Search.


Hadoop cluster setup:

Wonderdog makes use of hadoop to do its bulk loading so you’ll need to have a fully functional hadoop cluster lying around. Additionally, each machine in the hadoop cluster will need to have elasticsearch installed in the same location on each machine. Defaults for most everything will want elasticsearch itself to live in /usr/local/share/elasticsearch and the configuration elasticsearch/config/* to actually live in /etc/elasticsearch. For simplicity you might set ES_HOME and ES_CONF to point to the right places. As an example setup process (with elasticsearch 0.14.2):

$: wget
$: unzip
$: sudo mv elasticsearch-0.14.2 /usr/local/share/
$: sudo ln -s /usr/local/share/elasticsearch-0.14.2 /usr/local/share/elasticsearch
$: sudo cp config/* /etc/elasticsearch/

Then, make all your changes to the configuration files in /etc/elasticsearch. See config/* for a set of example configuration files.

ElasticSearch cluster setup:

Follow the same procedure as that for the hadoop cluster setup. Once elasticsearch is installed on all the machines you’ll want to start the daemons. It’s generally a good idea to have created an elasticsearch user and run the daemons as this user. Then:

sudo -u elasticsearch $ES_HOME/bin/elasticsearch -Des.config=/etc/elasticsearch/elasticsearch.yml

should get you started. Do this for every machine in you elasticsearch cluster. Finally, both

java/bin/estool --host=<elasticsearch_host> status and
java/bin/estool --host=<elasticsearch_host> health

should indicate a happy elasticsearch cluster.

In addition to the setup for each cluster you’ll need to ensure they can talk to each other. This is addressed in the documentation for elasticsearch itself.


Once you’ve got a working set up you should be ready to launch your bulkload process. Right now the bulkload only accepts tsv though json and avro support should be possible with straightforward changes (contributions?).

The best way to explain is with an example. Say you’ve got a tsv file of user records (name,login,email,description) and you want to index all the fields. Assuming you’re going to write to an index called users with objects of type user (elasticsearch will create this object automatically the first time you upload one). The workflow is as follows:

  • Create the users index:

bin/estool --host=<elasticsearch_host> --index_name=users create_index
  • Upload the data

# Will only work if the hadoop elasticsearch processes can discover the running elasticsearch cluster
bin/wonderdog --rm --index_name=users --bulk_size=4096 --object_type=user --field_names=name,login,email,description --id_field=1 /hdfs/path/to/users.tsv /tmp/failed_records/users

Notice the output path. When the bulk indexing job runs it is possible for index requests to fail for various reasons (too much load, etc). In this case the documents that failed are simply written to the hdfs so they can be retried in a later job.

  • Refresh Index

After the bulk load is finished you’ll want to refresh the index so your documents will actually be searchable:

bin/estool --host=<elasticsearch_host> --index_name=users refresh_index
  • Snapshot Index

You’ll definitely want to do this after the bulk load finishes so you don’t lose any data in case of cluster failure:

bin/estool --host=<elasticsearch_host> --index_name=users snapshot_index
  • Bump the replicas for the index up to at least one.

bin/estool --host=<elasticsearch_host> --index_name=users --replicas=1 set_replicas

This will take a while to finish and the cluster health will show yellow until it does.

  • Optimize the index

bin/estool --host=<elasticsearch_host> --index_name=users optimize_index

This will also take a while to finish.

Wonderdog Command-line options

  • index_name – Index to write data to. It does not have to exist ahead of time
  • object_type – Type of object to index. The mapping for this object does not have to exist ahead of time. Fields will be updated dynamically by elasticsearch.
  • field_names – A comma separated list of field names describing the tsv record input
  • id_field – Index of field to use as object id (counting from 0; default 1), use -1 if there is no id field
  • bulk_size – Number of records per bulk request sent to elasticsearch cluster
  • es_home – Path to elasticsearch installation, read from the ES_HOME environment variable if it’s set
  • es_config – Path to elasticsearch config file (elasticsearch.yml)
  • rm – Remove existing output? (true or leave blank)
  • hadoop_home – Path to hadoop installation, read from the HADOOP_HOME environment variable if it’s set
  • min_split_size – Min split size for maps


There are a number of convenience commands in bin/estool. Enumerating a few:

  • Print status of all indices as a json hash to the terminal

# See everything (tmi)
bin/estool --host=<elasticsearch_host> status

# Just see index level stuff on docs and replicas:
bin/estool --host=<elasticsearch_host> status | grep -C10 number_of_replicas
  • Check cluster health (red,green,yellow,relocated shards, etc)

bin/estool --host=<elasticsearch_host> health
  • Set replicas for an index

bin/estool --host=<elasticsearch_host> --index_name=<index_name> --replicas=<num_replicas> set_replicas
  • Optimize an index

bin/estool --host=<elasticsearch_host> --index_name=<index_name> optimize_index
  • Snapshot an index

bin/estool --host=<elasticsearch_host> --index_name=<index_name> snapshot_index
  • Delete an index

You’d better be sure.

bin/estool --host=<elasticsearch_host> --index_name=<index_name> delete_index

And so on. Most of the common rest api operations have be mapped into estool.

Using the ElasticSearchIndex storefunc

The wonderdog repo comes with a nifty storage udf for Apache Pig. Here’s a simple example of how to use it:

register build/wonderdog.jar
register /usr/local/share/elasticsearch/lib/elasticsearch-0.14.2.jar
register /usr/local/share/elasticsearch/lib/jline-0.9.94.jar
register /usr/local/share/elasticsearch/lib/jna-3.2.7.jar
register /usr/local/share/elasticsearch/lib/log4j-1.2.15.jar
register /usr/local/share/elasticsearch/lib/lucene-analyzers-3.0.3.jar
register /usr/local/share/elasticsearch/lib/lucene-core-3.0.3.jar
register /usr/local/share/elasticsearch/lib/lucene-fast-vector-highlighter-3.0.3.jar
register /usr/local/share/elasticsearch/lib/lucene-highlighter-3.0.3.jar
register /usr/local/share/elasticsearch/lib/lucene-memory-3.0.3.jar
register /usr/local/share/elasticsearch/lib/lucene-queries-3.0.3.jar

%default INDEX 'ufo_sightings'
%default OBJ   'ufo_sighting'        

ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv' AS (sighted_at:long, reported_at:long, location:chararray, shape:chararray, duration:chararray, description:chararray);
STORE ufo_sightings INTO 'es://$INDEX/$OBJ' USING com.infochimps.elasticsearch.pig.ElasticSearchIndex('-1', '1000');

Other constructors for the udf include:

  • ElasticSearchIndex()
  • ElasticSearchIndex(idField, bulkSize)
  • ElasticSearchIndex(idField, bulkSize, esConfig)
  • ElasticSearchIndex(idField, bulkSize, esConfig, esPlugins)


idField = Which field of the record to use as the record id. If none is passed in
then the record is assumed to have no id.
bulkSize = Number of records for ElasticSearchOutputFormat to batch up before sending
a bulk index request to Elastic Search. Default: 1000.
esConfig = Full path to elasticsearch.yml. Default: /etc/elasticsearch/elasticsearch.yml
esPlugins = Full path to elastic search plugins dir. Default: /usr/local/share/elasticsearch/plugins


How to choose shards, replicas and cluster size: Rules of Thumb.

sh = shards
rf = replication factor. replicas = 0 implies rf = 1, or 1 replica of each shard.

pm = running data_esnode processes per machine
N = number of machines

n_cores = number of cpu cores per machine
n_disks = number of disks per machine

  • You must have at least as many data_esnodes as
    Mandatory: (sh * rf) < (pm * N)
Shards: shard size < 10GB

More shards = more parallel writes

curl XPUT “`cat /etc/motd | grep IP | cuttab 2`:9200/tweet-2010q1/_settings” -d ‘{ “index”:{ number_of_replicas } }’

Something went wrong with that request. Please try again.