diff --git a/notes/pigstorefunc.pig b/notes/pigstorefunc.pig new file mode 100644 index 0000000..e536a9a --- /dev/null +++ b/notes/pigstorefunc.pig @@ -0,0 +1,45 @@ +-- +-- Doesn't work at the moment, just some notes on how the storefunc might look. +-- + + +-- +-- Right now the ElasticSearchOutputFormat gets all its options from the +-- Job object. We can use the call to setStoreLocation in the storefunc +-- to set the required parameters. Need to make sure the following are +-- set: +-- +-- wonderdog.index.name - should be set by the storefunc constructor +-- wonderdog.bulk.size - should be set by the storefunc constructor +-- wonderdog.field.names - should be set by the call to checkSchema +-- wonderdog.id.field - should be set by the storefunc constructor +-- wonderdog.object.type - should be set by the storefunc constructor +-- wonderdog.plugins.dir - should be set by call to setStoreLocation +-- wonderdog.config - should be set by call to setStoreLocation +-- +-- FIXME: options used in the ElasticSearchOutputFormat should NOT be +-- namespaced with 'wonderdog' + +%default INDEX 'es_index' +%default OBJ 'text_obj' + + +records = LOAD '$DATA' AS (text_field:chararray); +records_with_id = LOAD '$IDDATA' AS (id_field:int, text_field:chararray); + +-- Here we would use the elasticsearch index name as the uri, pass in a +-- comma separated list of field names as the first arg, the id field +-- as the second arg and the bulk size as the third. +-- +-- and so on. +STORE records INTO '$INDEX/$OBJ' USING ElasticSearchStorage('my_text_field', '-1', '1000'); + + +-- but it would be really nice to duplicate what's in WonderDog.java in that, +-- should a bulk request fail, the failed records are written to hdfs. The +-- user should have some control of this. Also, it should be possible to generate +-- the field names directly from the pig schema? (We'd have to be VERY explicit in the +-- docs about this as it would be a point of headscratching/swearing...) In this +-- case we might have something like: +named_records = FOREACH records GENERATE text_field AS text_field_name; +STORE records INTO '/path/to/failed_requests' USING ElasticSearchStorage('$INDEX/$OBJ', '-1', '1000'); diff --git a/src/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java b/src/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java index 94b2292..137d07d 100644 --- a/src/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java +++ b/src/java/com/infochimps/elasticsearch/ElasticSearchOutputFormat.java @@ -130,6 +130,7 @@ public RecordWriter getRecordWriter(final TaskAttempt return new ElasticSearchRecordWriter(); } + // FIXME: options used here should NOT be namespaced with 'wonderdog' public void setConf(Configuration conf) { this.indexName = conf.get("wonderdog.index.name"); this.bulkSize = Integer.parseInt(conf.get("wonderdog.bulk.size"));