Permalink
Browse files

Feature: Updated README

Updated README from Dhruv Bansal

Squashed commit of the following:

commit a25383b
Author: Dhruv Bansal <dhruv@ph.utexas.edu>
Date:   Fri Mar 21 14:27:09 2014 -0500

    updated README with more context for reading from ElasticSearch

commit e24104f
Author: Dhruv Bansal <dhruv@ph.utexas.edu>
Date:   Fri Mar 21 10:56:13 2014 -0500

    updated README to be more accurate based on all the work we've done recently

Change-Id: Iba3017bf9f3476409bb3ee71f5a52ff543e16e27
  • Loading branch information...
1 parent c069fbf commit eaba41a90db74900f1e83fbcf4b72244b0ecf5cf Travis Dempsey committed Mar 21, 2014
Showing with 261 additions and 200 deletions.
  1. +261 −200 README.md
View
461 README.md
@@ -1,16 +1,257 @@
# Wonderdog
-Wonderdog is a Hadoop interface to Elastic Search. While it is specifically intended for use with Apache Pig, it does include all the necessary Hadoop input and output formats for Elastic Search. That is, it's possible to skip Pig entirely and write custom Hadoop jobs if you prefer.
+Wonderdog makes ElasticSearch easier to connect with Hadoop. It
+provides a few kinds of functionality:
+
+* <a href="#hadoop">Java InputFormat and OutputFormat classes</a> that you can use in your own Hadoop MapReduce jobs
+* A <a href="#wukong">Wukong plugin</a> which makes these InputFormat and OutputFormat classes easy to use from [Wukong](http://github.com/infochimps-labs/wukong)
+* <a href="#pig">Java functions for Pig</a> `LOAD` from and `STORE` into ElasticSearch
+* some <a href="#utilities">command-line utilities</a> for interacting with ElasticSearch
+
+<a name="hadoop">
+# Hadoop MapReduce
+
+Wonderdog provides InputFormat and OutputFormat classes that can be
+used in your own custom Hadoop MapReduce jobs.
+
+* com.infochimps.elasticsearch.ElasticSearchInputFormat
+* com.infochimps.elasticsearch.ElasticSearchOutputFormat
+* com.infochimps.elasticsearch.ElasticSearchStreamingInputFormat
+* com.infochimps.elasticsearch.ElasticSearchStreamingOutputFormat
+
+These classes come in streaming (for the old `mapred` API) and
+non-streaming (for the new `mapreduce` API) flavors.
+
+## Installing Wonderdog
+
+To use these classes, you'll need to declare a dependency on Wonderdog
+in your project's `pom.xml`:
+
+```xml
+<project>
+ ...
+ <dependencies>
+ <dependency>
+ <groupId>com.infochimps</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ ...
+ </dependencies>
+ ...
+</project>
+```
+
+Now when you build your code, it will include the Wonderdog
+InputFormat and OutputFormat classes you need.
+
+TBD:
+* examples of using these classes in your on MapReduce jobs
+* examples of launching such a job from the command-line
+
+<a name="wukong">
+# Wukong
+
+Wonderdog also provides a
+[Wukong](http://github.com/infochimps-labs/wukong) plugin to make it
+easy to use the InputFormat and OutputFormat classes.
+
+## Installing Wonderdog
+
+Ensure that Wonderdog is in your project's Gemfile:
+
+```ruby
+# in Gemfile
+gem 'wonderdog', git: 'https://github.com/infochimps-labs/wonderdog'
+```
+
+You'll have to require Wonderdog at the top of your job
+```ruby
+# in my_elasticsearch_job.rb
+
+require 'wukong'
+require 'wonderdog'
+
+Wukong.dataflow(:mapper) do
+ ...
+end
+
+Wukong.dataflow(:reducer) do
+ ...
+end
+```
+
+If you are running a
+[deploy pack](http://github.com/infochimps-labs/wukong-deploy) then
+you may want to require Wonderdog at the top-level of your deploy
+pack by creating an initializer:
+
+```ruby
+# in config/initializers/plugins.rb
+require 'wonderdog'
+```
-## Requirements
+## Using Wonderdog From Wukong
-## Usage
+Wukong uses
+[Wukong-Hadoop](http://github.com/infochimps-labs/wukong-hadoop) to
+provide the basic functionality of connecting Wukong to Hadoop.
+Wonderdog modifies this connection by adjusting the command-lines
+passed to the `hadoop` program so that the correct input and output
+formats are used.
-### Using ElasticSearchStorage for Apache Pig
+A "normal" Hadoop streaming job launched by Wukong-Hadoop might look
+like this:
-The most up-to-date (and simplest) way to store data into elasticsearch with hadoop is to use the Pig Store Function. You can write both delimited and json data to elasticsearch as well as read data from elasticsearch.
+```
+$ wu hadoop my_job.rb --input=/some/hdfs/input/path --output=/some/hdfs/output/path
+```
+
+Assuming you've correctly installed Wonderdog into your job or deploy
+pack, you should be able to invoke Wonderdog's core classes by
+changing the URI for `input` or `output` to use a scheme of `es`. The
+"host" of the URI is the index in ElasticSearch and the "path" the
+type.
+
+### Embedded vs. Transport Nodes
+
+Wonderdog provides two different ways of connnecting to ElasticSearch
+from within a Hadoop task.
+
+* By default, each map task will spin up a
+ [transport client](http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/client.html)
+ which will attempt to connect to some ElasticSearch webnode. By
+ default it will look for this webnode on the same machine as the
+ task itself is running on. This is convenient in the common case
+ when each Hadoop tasktracker is also an ElasticSearch webnode
+ (datanodes may, of course, live elsewhere).
+
+* Each map task can also be configured to spin up its own
+ [embedded ElasticSearch node](http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/client.html#node-client)
+ which directly connects to an ElasticSearch cluster.
+
+The following options control this behavior:
+
+* `--es_transport` -- Use a transport client instead of an embedded node. True by default.
+* `--es_transport_host` -- When using a transport client, the host of
+ the ElasticSearch webnode to connect to. Defaults to `localhost`.
+* `--es_transport_port` -- When using a transport client, the port of
+ the ElasticSearch webnode to connect to. Defaults to `9300`.
+
+### Writing data to ElasticSearch
+
+Here's an example which would write all its output data to the index
+`twitter` in the type `tweet`:
+
+```
+$ wu hadoop my_job.rb --input=/some/hdfs/input/path --output=es://twitter/tweet
+```
-#### Storing tabular data:
+#### Data Format & Routing
+
+It's always assumed that the output of the reducer is
+newline-delimited, JSON formatted data. Most fields in the each
+record are passed through unmodified or read. But some fields are
+important:
+
+* `_id` - if this field is present then it will be used as the
+ document ID of the record created in ElasticSearch. This is the
+ right way to ensure that a write updates an existing document
+ instead of creating a new document. The name of this field (`_id`)
+ can be modified with the `--es_id_field` option.
+
+* `_mapping` - if this field is present then it will be used as the
+ type the document is written to, no matter what was passed on the
+ command-line as the `--output`. This is the right way to allow
+ writing to multiple types depending on the document. The name of
+ this field (`_mapping`) can be modified with the
+ `--es_mapping_field` option. And, yes, this field probably should
+ have been called `_type`...
+
+* `_index` - if this field is present then it will be used as the
+ index the document is written to, no matter what was passed on the
+ command-line as the `--output`. This is the right way to allow
+ writing to multiple types depending on the document. The name of
+ this field (`_index`) can be modified with the `--es_index_field`
+ option.
+
+#### Optimization
+
+It's not unusual to prepare an ElasticSearch index for bulk writing
+before executing a Hadoop job to write to it. The following
+operations should be enabled for best performance:
+
+* turn `index.number_of_replicas` down to 0 to ensure that there are
+ as few shards (copies) of the data as possible that need to be
+ updated on each write
+
+* turn `index.refresh_interval` to -1 to ensure that ElasticSearch
+ doesn't allocate any of its resources refreshing data for search
+ instead of indexing.
+
+It's also a good idea to have created all mappings up front, before
+loading.
+
+Wonderdog provides the `--es_bulk_size` option which sets the size of
+batch writes sent to ElasticSearch (default: 1000). Increasing this
+number can be appropriate and lead to higher throughput in some
+situations.
+
+### Reading data from ElasticSearch
+
+Here's an example which would read all its input data from the index
+`twitter` in the type `tweet`:
+
+```
+$ wu hadoop my_job.rb --input=es://twitter/tweet --output=/some/hdfs/output/path
+```
+
+This would read in every single `tweet` record. This can be
+customized using the full power of ElasticSearch by providing an
+arbitrary ElasticSearch JSON query via the `--es_query` option.
+Wonderdog will run the query at Hadoop job submission time and use the
+result-set as the input data.
+
+The result-set will be presented to Hadoop as newline-delimited,
+JSON-formatted data.
+
+Here's an example, which would capture only tweets about Chicago:
+
+```
+$ wu hadoop my_job.rb --input=es://twitter/tweet --output=/some/hdfs/output/path --es_query='{"query": {"match":{"text": "Chicago"}}}'
+```
+
+#### Optimization
+
+Wonderdog uses ElasticSearch's
+[scroll API](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html)
+to fetch data from ElasticSearch. There are several options which can
+be used to tune the usage of this API for better performance:
+
+* `--es_input_splits` -- the number of input splits to create
+* `--es_request_size` -- the number of documents to request at a time (defaults to 50)
+* `--es_scroll_timeout` -- the amount of time to wait on each scroll / longest running map task (defaults to 5 minutes)
+
+The larger the dataset and the fewer the input splits, the larger the
+data that needs to be processed (and hence scrolled through) within
+each task, the longer the scroll timeout should be set for.
+Essentially, no task should take longer to complete than the scroll
+timeout.
+
+It's recommended to read data out of ElasticSearch into a temporary
+copy in HDFS which can then be used for more intensive processing.
+
+<a name="pig">
+# Pig
+
+The most up-to-date (and simplest) way to store data into
+elasticsearch with hadoop is to use the Pig Store Function. You can
+write both delimited and json data to elasticsearch as well as read
+data from elasticsearch.
+
+## Storing Data
+
+### Storing tabular data
This allows you to store tabular data (eg. tsv, csv) into elasticsearch.
@@ -28,7 +269,7 @@ STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=false&size=1000' USING com.infoc
Here the fields that you set in Pig (eg. 'sighted_at') are used as the field names when creating json records for elasticsearch.
-#### Storing json data:
+### Storing json data
You can store json data just as easily.
@@ -37,7 +278,7 @@ ufo_sightings = LOAD '/data/domestic/aliens/ufo_awesome.tsv.json' AS (json_recor
STORE ufo_sightings INTO 'es://$INDEX/$OBJ?json=true&size=1000' USING com.infochimps.elasticsearch.pig.ElasticSearchStorage();
```
-#### Reading data:
+## Loading Data
Easy too.
@@ -47,14 +288,14 @@ alien_sightings = LOAD 'es://ufo_sightings/ufo_sightings?q=alien' USING com.info
DUMP alien_sightings;
```
-#### ElasticSearchStorage Constructor
+## ElasticSearchStorage Constructor
The constructor to the UDF can take two arguments (in the following order):
* ```esConfig``` - The full path to where elasticsearch.yml lives on the machine launching the hadoop job
* ```esPlugins``` - The full path to where the elasticsearch plugins directory lives on the machine launching the hadoop job
-#### Query Parameters
+### Query Parameters
There are a few query paramaters available:
@@ -66,224 +307,44 @@ There are a few query paramaters available:
Note that elasticsearch.yml and the plugins directory are distributed to every machine in the cluster automatically via hadoop's distributed cache mechanism.
-### Native Hadoop TSV Loader
-
-**Note**: the tsv loader is deprecated. Instead, use the ElasticSearchOutputFormat coupled with either Apache Pig storefunc (ElasticSearchIndex or ElasticSearchJsonIndex).
-
-Once you've got a working set up you should be ready to launch your bulkload process. The best way to explain is with an example. Say you've got a tsv file of user records (name,login,email,description) and you want to index all the fields. Assuming you're going to write to an index called ```users``` with objects of type ```user``` (elasticsearch will create this object automatically the first time you upload one). The workflow is as follows:
-
-* Create the ```users``` index:
-
-```
-bin/estool create --index users
-```
-
-* Upload the data
-
-```
-# Will only work if the hadoop elasticsearch processes can discover the running elasticsearch cluster
-bin/wonderdog --rm --index_name=users --bulk_size=4096 --object_type=user --field_names=name,login,email,description --id_field=1 /hdfs/path/to/users.tsv /tmp/failed_records/users
-```
-
-Notice the output path. When the bulk indexing job runs it is possible for index requests to fail for various reasons (too much load, etc). In this case the documents that failed are simply written to the hdfs so they can be retried in a later job.
-
-* Refresh Index
-
-After the bulk load is finished you'll want to refresh the index so your documents will actually be searchable:
-
-```
-bin/estool refresh --index users
-```
-
-* Snapshot Index
-
-You'll definitely want to do this after the bulk load finishes so you don't lose any data in case of cluster failure:
-
-```
-bin/estool snapshot --index users
-```
-
-* Bump the replicas for the index up to at least one.
-
-```
-bin/estool set_replication --index users --replicas=1
-```
-
-This will take a while to finish and the cluster health will show yellow until it does.
-
-* Optimize the index
-
-```
-bin/estool optimize --index users -s 3
-```
+<a name="utilities">
+# Command-Line Utilities
-This will also take a while to finish.
-
-#### TSV loader command-line options
-
-* ```index_name``` - Index to write data to. It does not have to exist ahead of time
-* ```object_type``` - Type of object to index. The mapping for this object does not have to exist ahead of time. Fields will be updated dynamically by elasticsearch.
-* ```field_names``` - A comma separated list of field names describing the tsv record input
-* ```id_field``` - Index of field to use as object id (counting from 0; default 1), use -1 if there is no id field
-* ```bulk_size``` - Number of records per bulk request sent to elasticsearch cluster
-* ```es_home``` - Path to elasticsearch installation, read from the ES_HOME environment variable if it's set
-* ```es_config``` - Path to elasticsearch config file (@elasticsearch.yml@)
-* ```rm``` - Remove existing output? (true or leave blank)
-* ```hadoop_home``` - Path to hadoop installation, read from the HADOOP_HOME environment variable if it's set
-* ```min_split_size``` - Min split size for maps
-
-## Admin
-
-There are a number of convenience commands in ```bin/estool```. Most of the common rest api operations have be mapped. Enumerating a few:
+There are a number of convenience commands in ```estool```. Most of
+the common REST API operations have be mapped. Enumerating a few:
* Print status of all indices as a json hash to the terminal
```
-# See everything (tmi)
-bin/estool -c <elasticsearch_host> status
+$ estool -c <elasticsearch_host> status
```
* Check cluster health (red,green,yellow,relocated shards, etc)
```
-bin/estool -c <elasticsearch_host> health
+$ estool -c <elasticsearch_host> health
```
* Set replicas for an index
```
-bin/estool set_replication -c <elasticsearch_host> --index <index_name> --replicas <num_replicas>
+$ estool set_replication -c <elasticsearch_host> --index <index_name> --replicas <num_replicas>
```
* Optimize an index
```
-bin/estool optimize -c <elasticsearch_host> --index <index_name>
+$ estool optimize -c <elasticsearch_host> --index <index_name>
```
* Snapshot an index
```
-bin/estool snapshot -c <elasticsearch_host> --index <index_name>
+$ estool snapshot -c <elasticsearch_host> --index <index_name>
```
* Delete an index
```
-bin/estool delete -c <elasticsearch_host> --index <index_name>
+$ estool delete -c <elasticsearch_host> --index <index_name>
```
-
-
-## Bulk Loading Tips for the Risk-seeking Dangermouse
-
-The file examples/bulkload_pageviews.pig shows an example of bulk loading elasticsearch, including preparing the index.
-
-### Elasticsearch Setup
-
-Some tips for an industrial-strength cluster, assuming exclusive use of machines and no read load during the job:
-
-* use multiple machines with a fair bit of ram (7+GB). Heap doesn't help too much for loading though, so you don't have to go nuts: we do fine with amazon m1.large's.
-* Allocate a sizeable heap, setting min and max equal, and
- - turn `bootstrap.mlockall` on, and run `ulimit -l unlimited`.
- - For example, for a 3GB heap: `-Xmx3000m -Xms3000m -Delasticsearch.bootstrap.mlockall=true`
- - Never use a heap above 12GB or so, it's dangerous (STW compaction timeouts).
- - You've succeeded if the full heap size is resident on startup: that is, in htop both the VMEM and RSS are 3000 MB or so.
-* temporarily increase the `index_buffer_size`, to say 40%.
-
-### Further reading
-
-* [Elasticsearch JVM Settings, explained](http://jprante.github.com/2012/11/28/Elasticsearch-Java-Virtual-Machine-settings-explained.html)
-
-### Example of creating an index and mapping
-
-Index:
-
- curl -XPUT ''http://localhost:9200/pageviews' -d '{"settings": {
- "index": { "number_of_shards": 12, "store.compress": { "stored": true, "tv": true } } }}'
-
- $ curl -XPUT 'http://localhost:9200/ufo_sightings/_settings?pretty=true' -d '{"settings": {
- "index": { "number_of_shards": 12, "store.compress": { "stored": true, "tv": true } } }}'
-
-Mapping (elasticsearch "type"):
-
- # Wikipedia Pageviews
- curl -XPUT ''http://localhost:9200/pageviews/pagehour/_mapping' -d '{
- "pagehour": { "_source": { "enabled" : true }, "properties" : {
- "page_id" : { "type": "long", "store": "yes" },
- "namespace": { "type": "integer", "store": "yes" },
- "title": { "type": "string", "store": "yes" },
- "num_visitors": { "type": "long", "store": "yes" },
- "date": { "type": "integer", "store": "yes" },
- "time": { "type": "long", "store": "yes" },
- "ts": { "type": "date", "store": "yes" },
- "day_of_week": { "type": "integer", "store": "yes" } } }}'
-
- $ curl -XPUT 'http://localhost:9200/ufo_sightings/sighting/_mapping' -d '{ "sighting": {
- "_source": { "enabled" : true },
- "properties" : {
- "sighted_at": { "type": "date", "store": "yes" },
- "reported_at": { "type": "date", "store": "yes" },
- "shape": { "type": "string", "store": "yes" },
- "duration": { "type": "string", "store": "yes" },
- "description": { "type": "string", "store": "yes" },
- "coordinates": { "type": "geo_point", "store": "yes" },
- "location_str": { "type": "string", "store": "no" },
- "location": { "type": "object", "dynamic": false, "properties": {
- "place_id": { "type": "string", "store": "yes" },
- "place_type": { "type": "string", "store": "yes" },
- "city": { "type": "string", "store": "yes" },
- "county": { "type": "string", "store": "yes" },
- "state": { "type": "string", "store": "yes" },
- "country": { "type": "string", "store": "yes" } } }
- } } }'
-
-
-### Temporary Bulk-load settings for an index
-
-To prepare a database for bulk loading, the following settings may help. They are
-*EXTREMELY* aggressive, and include knocking the replication factor back to 1 (zero replicas). One
-false step and you've destroyed Tokyo.
-
-Actually, you know what? Never mind. Don't apply these, they're too crazy.
-
- curl -XPUT 'http://localhost:9200/pageviews/_settings?pretty=true' -d '{"index": {
- "number_of_replicas": 0, "refresh_interval": -1, "gateway.snapshot_interval": -1,
- "translog": { "flush_threshold_ops": 50000, "flush_threshold_size": "200mb", "flush_threshold_period": "300s" },
- "merge.policy": { "max_merge_at_once": 30, "segments_per_tier": 30, "floor_segment": "10mb" },
- "store.compress": { "stored": true, "tv": true } } }'
-
-To restore your settings, in case you didn't destroy Tokyo:
-
- curl -XPUT 'http://localhost:9200/pageviews/_settings?pretty=true' -d ' {"index": {
- "number_of_replicas": 2, "refresh_interval": "60s", "gateway.snapshot_interval": "3600s",
- "translog": { "flush_threshold_ops": 5000, "flush_threshold_size": "200mb", "flush_threshold_period": "300s" },
- "merge.policy": { "max_merge_at_once": 10, "segments_per_tier": 10, "floor_segment": "10mb" },
- "store.compress": { "stored": true, "tv": true } } }'
-
-If you did destroy your database, please send your resume to jobs@infochimps.com as you begin your
-job hunt. It's the reformed sinner that makes the best missionary.
-
-
-### Post-bulkrun maintenance
-
- es_index=pageviews ; ( for foo in _flush _refresh '_optimize?max_num_segments=6&refresh=true&flush=true&wait_for_merge=true' '_gateway/snapshot' ; do echo "======= $foo" ; time curl -XPOST "http://localhost:9200/$es_index/$foo" ; done ) &
-
-### Full dump of cluster health
-
- es_index=pageviews ; es_node="projectes-elasticsearch-4"
- curl -XGET "http://localhost:9200/$es_index/_status?pretty=true"
- curl -XGET "http://localhost:9200/_cluster/state?pretty=true"
- curl -XGET "http://localhost:9200/$es_index/_stats?pretty=true&merge=true&refresh=true&flush=true&warmer=true"
- curl -XGET "http://localhost:9200/_cluster/nodes/$es_node/stats?pretty=true&all=true"
- curl -XGET "http://localhost:9200/_cluster/nodes/$es_node?pretty=true&all=true"
- curl -XGET "http://localhost:9200/_cluster/health?pretty=true"
- curl -XGET "http://localhost:9200/$es_index/_search?pretty=true&limit=3"
- curl -XGET "http://localhost:9200/$es_index/_segments?pretty=true" | head -n 200
-
-### Decommission nodes
-
-Run this, excluding the decommissionable nodes from the list:
-
- curl -XPUT http://localhost:9200/pageviews/_settings -d '{
- "index.routing.allocation.include.ironfan_name" :
- "projectes-elasticsearch-0,projectes-elasticsearch-1,projectes-elasticsearch-2" }'

0 comments on commit eaba41a

Please sign in to comment.