From 7a0ed647ca9dbc73b7b4feab25cb618bf4869d9d Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Fri, 18 Jul 2014 00:00:25 +0300 Subject: [PATCH] Improve handling of nested objects given as params relates #223 (cherry picked from commit 921c34145e26572352cb1a16c1d307c32a89c9d9) --- .../AbstractCascadingLocalJsonSaveTest.java | 4 +- .../mr/AbstractMROldApiSaveTest.java | 30 +++++++- .../mr/AbstractMROldApiSearchTest.java | 13 ++++ .../elasticsearch/hadoop/mr/RestUtils.java | 11 +++ .../hadoop/serialization/ParsingUtils.java | 4 +- .../bulk/AbstractBulkFactory.java | 70 ++++++++++++++----- .../field/AbstractDefaultParamsExtractor.java | 41 ++++++++--- .../serialization/field/FieldExplainer.java | 2 +- .../serialization/field/FieldExtractor.java | 2 +- .../field/JsonFieldExtractors.java | 24 +++++-- .../serialization/field/WithoutQuotes.java | 24 +++++++ .../hadoop/util/BytesArrayPool.java | 70 +++++++++++++++++++ .../elasticsearch/hadoop/util/BytesRef.java | 6 ++ .../serialization/JsonValuePathTest.java | 6 +- 14 files changed, 268 insertions(+), 39 deletions(-) create mode 100644 mr/src/main/java/org/elasticsearch/hadoop/serialization/field/WithoutQuotes.java create mode 100644 mr/src/main/java/org/elasticsearch/hadoop/util/BytesArrayPool.java diff --git a/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingLocalJsonSaveTest.java b/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingLocalJsonSaveTest.java index 65ddab3de0db6a..e9c62edc3eaf5b 100644 --- a/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingLocalJsonSaveTest.java +++ b/cascading/src/itest/java/org/elasticsearch/hadoop/integration/cascading/AbstractCascadingLocalJsonSaveTest.java @@ -127,7 +127,7 @@ public void testUpdateOnlyParamScript() throws Exception { properties.put(ConfigurationOptions.ES_UPDATE_RETRY_ON_CONFLICT, "3"); properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter = param1; anothercounter = param2"); properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "mvel"); - properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:id "); + properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:number "); properties.put(ConfigurationOptions.ES_INPUT_JSON, "yes"); Tap in = sourceTap(); @@ -197,7 +197,7 @@ public void testUpsertParamScript() throws Exception { properties.put(ConfigurationOptions.ES_INPUT_JSON, "yes"); properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter += param1; anothercounter += param2"); properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "mvel"); - properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:id "); + properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:number "); Tap in = sourceTap(); // use an existing id to allow the update to succeed diff --git a/mr/src/itest/java/org/elasticsearch/hadoop/integration/mr/AbstractMROldApiSaveTest.java b/mr/src/itest/java/org/elasticsearch/hadoop/integration/mr/AbstractMROldApiSaveTest.java index 065fc229c43fd6..39d81d39fc995d 100644 --- a/mr/src/itest/java/org/elasticsearch/hadoop/integration/mr/AbstractMROldApiSaveTest.java +++ b/mr/src/itest/java/org/elasticsearch/hadoop/integration/mr/AbstractMROldApiSaveTest.java @@ -26,6 +26,8 @@ import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -54,6 +56,8 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import static org.junit.Assume.*; + @FixMethodOrder(MethodSorters.NAME_ASCENDING) @RunWith(Parameterized.class) public class AbstractMROldApiSaveTest { @@ -88,6 +92,18 @@ public void map(Object key, Object value, OutputCollector output, Reporter repor output.collect(key, WritableUtils.toWritable(entry)); } } + + public static class ConstantMapper extends MapReduceBase implements Mapper { + + @Override + public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { + MapWritable map = new MapWritable(); + map.put(new Text("key"), new Text("value")); + output.collect(new LongWritable(), map); + } + } + + public static class SplittableTextInputFormat extends TextInputFormat { @Override @@ -136,6 +152,18 @@ public AbstractMROldApiSaveTest(JobConf config, String indexPrefix) { } + @Test + public void testNoInput() throws Exception { + JobConf conf = createJobConf(); + + // use only when dealing with constant input + assumeFalse(conf.get(ConfigurationOptions.ES_INPUT_JSON).equals("true")); + conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/constant"); + conf.setMapperClass(ConstantMapper.class); + + runJob(conf); + } + @Test public void testBasicIndex() throws Exception { JobConf conf = createJobConf(); @@ -348,7 +376,7 @@ public void testUpsertOnlyParamScriptWithArrayOnArrayField() throws Exception { conf.set(ConfigurationOptions.ES_MAPPING_ID, "<1>"); conf.set(ConfigurationOptions.ES_UPDATE_SCRIPT, "ctx._source.tags = update_tags"); conf.set(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "mvel"); - conf.set(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, "update_tags:tags"); + conf.set(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, (conf.get(ConfigurationOptions.ES_INPUT_JSON).equals("true") ? "update_tags:name" :"update_tags:list")); runJob(conf); } diff --git a/mr/src/itest/java/org/elasticsearch/hadoop/integration/mr/AbstractMROldApiSearchTest.java b/mr/src/itest/java/org/elasticsearch/hadoop/integration/mr/AbstractMROldApiSearchTest.java index b3f4cb906012d5..8591fe291dfe42 100644 --- a/mr/src/itest/java/org/elasticsearch/hadoop/integration/mr/AbstractMROldApiSearchTest.java +++ b/mr/src/itest/java/org/elasticsearch/hadoop/integration/mr/AbstractMROldApiSearchTest.java @@ -44,6 +44,10 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import static org.junit.Assert.*; + +import static org.hamcrest.Matchers.*; + @RunWith(Parameterized.class) public class AbstractMROldApiSearchTest { @@ -141,6 +145,15 @@ public void testDynamicPatternWithFormat() throws Exception { Assert.assertTrue(RestUtils.exists("mroldapi/pattern-format-2945-10-06")); } + @Test + public void testUpsertOnlyParamScriptWithArrayOnArrayField() throws Exception { + String target = "mroldapi/createwitharrayupsert/1"; + Assert.assertTrue(RestUtils.exists(target)); + String result = RestUtils.get(target); + System.out.println(result); + assertThat(result, not(containsString("ArrayWritable@"))); + } + //@Test public void testNested() throws Exception { JobConf conf = createJobConf(); diff --git a/mr/src/itest/java/org/elasticsearch/hadoop/mr/RestUtils.java b/mr/src/itest/java/org/elasticsearch/hadoop/mr/RestUtils.java index 91077aaa8c61cb..e99c7081713766 100644 --- a/mr/src/itest/java/org/elasticsearch/hadoop/mr/RestUtils.java +++ b/mr/src/itest/java/org/elasticsearch/hadoop/mr/RestUtils.java @@ -47,6 +47,10 @@ public Response execute(Request.Method method, String path, ByteSequence buffer) return super.execute(method, path, buffer); } + public String get(String index) throws IOException { + return IOUtils.asString(execute(Request.Method.GET, index)); + } + public String post(String index, byte[] buffer) throws IOException { return IOUtils.asString(execute(Request.Method.POST, index, new BytesArray(buffer)).body()); } @@ -74,6 +78,13 @@ public static Field getMapping(String index) throws Exception { return parseField; } + public static String get(String index) throws Exception { + ExtendedRestClient rc = new ExtendedRestClient(); + String str = rc.get(index); + rc.close(); + return str; + } + public static void putMapping(String index, String location) throws Exception { putMapping(index, TestUtils.fromInputStream(RestUtils.class.getClassLoader().getResourceAsStream(location))); } diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/ParsingUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/ParsingUtils.java index 8663d4407ccf10..7df1c8f3482f60 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/ParsingUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/ParsingUtils.java @@ -29,6 +29,8 @@ public abstract class ParsingUtils { + public static final String NOT_FOUND = "(not found)"; + /** * Seeks the field with the given name in the stream and positions (and returns) the parser to the next available token (value or not). * Return null if no token is found. @@ -139,7 +141,7 @@ public static List values(Parser parser, String... paths) { List matches = new ArrayList(); for (Matcher matcher : matchers) { - matches.add(matcher.matched ? matcher.value.toString() : null); + matches.add(matcher.matched ? (matcher.value != null ? matcher.value.toString() : StringUtils.EMPTY) : NOT_FOUND); } return matches; diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java index 0eaa9ed2314caf..7bdd56ed33140b 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java @@ -32,8 +32,10 @@ import org.elasticsearch.hadoop.serialization.field.FieldExtractor; import org.elasticsearch.hadoop.serialization.field.IndexExtractor; import org.elasticsearch.hadoop.serialization.field.JsonFieldExtractors; +import org.elasticsearch.hadoop.serialization.field.WithoutQuotes; import org.elasticsearch.hadoop.serialization.json.JacksonJsonGenerator; import org.elasticsearch.hadoop.util.BytesArray; +import org.elasticsearch.hadoop.util.BytesArrayPool; import org.elasticsearch.hadoop.util.FastByteArrayOutputStream; import org.elasticsearch.hadoop.util.ObjectUtils; import org.elasticsearch.hadoop.util.StringUtils; @@ -53,45 +55,75 @@ abstract class AbstractBulkFactory implements BulkFactory { private FieldExtractor idExtractor, parentExtractor, routingExtractor, versionExtractor, ttlExtractor, timestampExtractor, paramsExtractor; + static final BytesArray QUOTE = new BytesArray("\""); + class FieldWriter { final FieldExtractor extractor; - final BytesArray pad; + final boolean addQuotesIfNecessary; + final BytesArrayPool pool = new BytesArrayPool(); FieldWriter(FieldExtractor extractor) { - this(extractor, new BytesArray(64)); - } - - FieldWriter(FieldExtractor extractor, BytesArray pad) { this.extractor = extractor; - this.pad = pad; + addQuotesIfNecessary = (extractor instanceof WithoutQuotes); } - BytesArray write(Object object) { - pad.reset(); + BytesArrayPool write(Object object) { + pool.reset(); Object value = extractor.field(object); if (value == FieldExtractor.NOT_FOUND) { String obj = (extractor instanceof FieldExplainer ? ((FieldExplainer) extractor).toString(object) : object.toString()); throw new EsHadoopIllegalArgumentException(String.format("[%s] cannot extract value from object [%s]", extractor, obj)); } + + if (value instanceof List) { + List list = (List) value; + for (int i = 0; i < list.size() - 1; i++) { + doWrite(list.get(i), false); + } + // + doWrite(list.get(list.size() - 1), true); + } + // weird if/else to save one collection/iterator instance + else { + doWrite(value, true); + } + + return pool; + } + + void doWrite(Object value, boolean lookForQuotes) { // common-case - constants - if (value instanceof String) { - pad.bytes(value.toString()); + if (value instanceof String || jsonInput) { + String val = value.toString(); + if (lookForQuotes && addQuotesIfNecessary) { + if (val.startsWith("[") || val.startsWith("{")) { + pool.get().bytes(val); + } + else { + pool.get().bytes(QUOTE); + pool.get().bytes(val); + pool.get().bytes(QUOTE); + } + } + else { + pool.get().bytes(val); + } } else { - JacksonJsonGenerator generator = new JacksonJsonGenerator(new FastByteArrayOutputStream(pad)); + BytesArray ba = pool.get(); + JacksonJsonGenerator generator = new JacksonJsonGenerator(new FastByteArrayOutputStream(ba)); valueWriter.write(value, generator); generator.flush(); generator.close(); + // jackson likely will add leading/trailing "" which are added down the pipeline so remove them // however that's not mandatory in case the source is a number (instead of a string) - if (pad.bytes()[pad.offset()] == '"') { - int size = pad.length(); - pad.size(Math.max(0, size - 2)); - pad.offset(1); + if ((lookForQuotes && !addQuotesIfNecessary) && ba.bytes()[ba.offset()] == '"') { + ba.size(Math.max(0, ba.length() - 2)); + ba.offset(1); } } - return pad; } } @@ -268,7 +300,7 @@ private List compact(List list) { stringAccumulator.setLength(0); lastString = null; } - compacted.add(new FieldWriter((FieldExtractor) object)); + compacted.add(createFieldWriter((FieldExtractor) object)); } else { String str = object.toString(); @@ -287,6 +319,10 @@ private List compact(List list) { return compacted; } + protected Object createFieldWriter(FieldExtractor extractor) { + return new FieldWriter(extractor); + } + protected void writeBeforeObject(List pieces) { startHeader(pieces); diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/AbstractDefaultParamsExtractor.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/AbstractDefaultParamsExtractor.java index d6a396d1ebfe3a..8400103decda68 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/AbstractDefaultParamsExtractor.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/AbstractDefaultParamsExtractor.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.hadoop.serialization.field; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -28,22 +29,35 @@ import org.elasticsearch.hadoop.util.Assert; import org.elasticsearch.hadoop.util.StringUtils; -public abstract class AbstractDefaultParamsExtractor implements FieldExtractor, SettingsAware { +public abstract class AbstractDefaultParamsExtractor implements FieldExtractor, SettingsAware, FieldExplainer, WithoutQuotes { private Map params = new LinkedHashMap(); protected Settings settings; + // field explainer saved in case of a failure for diagnostics + private FieldExtractor lastFailingFieldExtractor; @Override - public String field(Object target) { - StringBuilder sb = new StringBuilder(); + public Object field(Object target) { + List list = new ArrayList(params.size()); for (Entry entry : params.entrySet()) { - sb.append("\""); - sb.append(entry.getKey()); - sb.append("\":\""); - sb.append(entry.getValue().field(target)); - sb.append("\","); + list.add("\""); + list.add(entry.getKey()); + list.add("\":"); + Object field = entry.getValue().field(target); + if (field == FieldExtractor.NOT_FOUND) { + lastFailingFieldExtractor = entry.getValue(); + return FieldExtractor.NOT_FOUND; + } + list.add(field); + list.add(","); } - return sb.substring(0, sb.length() - 1); + list.remove(list.size() - 1); + return list; + } + + @Override + public String toString(Object target) { + return (lastFailingFieldExtractor instanceof FieldExplainer ? ((FieldExplainer) lastFailingFieldExtractor).toString(target) : target.toString()); } @Override @@ -60,5 +74,14 @@ public void setSettings(Settings settings) { } } + @Override + public String toString() { + if (lastFailingFieldExtractor != null) { + return lastFailingFieldExtractor.toString(); + } + + return String.format("%s for fields [%s]", getClass().getSimpleName(), params.keySet()); + } + protected abstract FieldExtractor createFieldExtractor(String fieldName); } diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/FieldExplainer.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/FieldExplainer.java index 5bbf386bb42191..f0c32d5ae41b2c 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/FieldExplainer.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/FieldExplainer.java @@ -20,5 +20,5 @@ public interface FieldExplainer { - public String toString(Object field); + String toString(Object field); } diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/FieldExtractor.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/FieldExtractor.java index 37762ea5f02114..783c2057ff32dd 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/FieldExtractor.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/FieldExtractor.java @@ -26,7 +26,7 @@ */ public interface FieldExtractor { - public String NOT_FOUND = "(not found)"; + public Object NOT_FOUND = new Object(); /** * Returns the associated JSON representation for the given target. diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/JsonFieldExtractors.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/JsonFieldExtractors.java index 73066dd085358c..201147982c3567 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/JsonFieldExtractors.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/JsonFieldExtractors.java @@ -48,14 +48,25 @@ public class JsonFieldExtractors { class PrecomputedFieldExtractor implements FieldExtractor { private final int slot; + private final String fieldName; - public PrecomputedFieldExtractor(int slot) { + public PrecomputedFieldExtractor(int slot, String fieldName) { this.slot = slot; + this.fieldName = fieldName; } @Override - public String field(Object target) { - return results.get(slot); + public Object field(Object target) { + String result = results.get(slot); + if (result == ParsingUtils.NOT_FOUND) { + return FieldExtractor.NOT_FOUND; + } + return result; + } + + @Override + public String toString() { + return String.format("JsonExtractor for field [%s]", fieldName); } } @@ -70,6 +81,11 @@ public FixedFieldExtractor(String value) { public String field(Object target) { return value; } + + @Override + public String toString() { + return "ConstantJsonExtractor"; + } } public JsonFieldExtractors(Settings settings) { @@ -124,7 +140,7 @@ private FieldExtractor init(String fieldName, List pathList) { private FieldExtractor createJsonFieldExtractor(String fieldName, List pathList) { pathList.add(fieldName); - return new PrecomputedFieldExtractor(pathList.size() - 1); + return new PrecomputedFieldExtractor(pathList.size() - 1, fieldName); } private String initConstant(String field) { diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/WithoutQuotes.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/WithoutQuotes.java new file mode 100644 index 00000000000000..0b23f45789a08d --- /dev/null +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/field/WithoutQuotes.java @@ -0,0 +1,24 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.hadoop.serialization.field; + +// Marker interface indicating that a field extractor does not handle quotes and thus these need to be added further down the pipe-line +public interface WithoutQuotes { + +} \ No newline at end of file diff --git a/mr/src/main/java/org/elasticsearch/hadoop/util/BytesArrayPool.java b/mr/src/main/java/org/elasticsearch/hadoop/util/BytesArrayPool.java new file mode 100644 index 00000000000000..90608c8f9031f3 --- /dev/null +++ b/mr/src/main/java/org/elasticsearch/hadoop/util/BytesArrayPool.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.hadoop.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Basic class acting as a pool of {@link BytesArray}. The goal here is to reuse the arrays even when dealing with a unknown/dynamic list of "bytes" and reuse them across calls. + */ +public class BytesArrayPool implements ByteSequence { + + private final List pool = new ArrayList(); + private int inUse = 0; + + @Override + public int length() { + int size = 0; + for (int i = 0; i < inUse; i++) { + size += pool.get(inUse).length(); + } + return size; + } + + @Override + public void writeTo(OutputStream out) throws IOException { + throw new UnsupportedOperationException(); + } + + public void reset() { + for (BytesArray ba : pool) { + ba.reset(); + } + inUse = 0; + } + + public BytesArray get() { + if (inUse < pool.size() - 1) { + return pool.get(inUse++); + } + else { + BytesArray ba = new BytesArray(64); + pool.add(ba); + inUse++; + return ba; + } + } + + public List inUse() { + return pool; + } +} diff --git a/mr/src/main/java/org/elasticsearch/hadoop/util/BytesRef.java b/mr/src/main/java/org/elasticsearch/hadoop/util/BytesRef.java index 55c7fd12e6aad6..7995af565831c0 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/util/BytesRef.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/util/BytesRef.java @@ -29,6 +29,12 @@ public class BytesRef { List list = null; private int size = 0; + public void add(BytesArrayPool baPool) { + for (BytesArray pool : baPool.inUse()) { + add(pool); + } + } + public void add(BytesArray bytes) { if (list == null) { list = new ArrayList(); diff --git a/mr/src/test/java/org/elasticsearch/hadoop/serialization/JsonValuePathTest.java b/mr/src/test/java/org/elasticsearch/hadoop/serialization/JsonValuePathTest.java index 7f3730df9ebc03..4eb3a2fd2ad228 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/serialization/JsonValuePathTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/serialization/JsonValuePathTest.java @@ -48,7 +48,7 @@ public void testFirstLevel() throws Exception { List vals = ParsingUtils.values(parser, "firstName", "foo", "age"); assertEquals(3, vals.size()); assertEquals("John", vals.get(0)); - assertNull(vals.get(1)); + assertSame(ParsingUtils.NOT_FOUND, vals.get(1)); assertEquals("25", vals.get(2)); } @@ -57,8 +57,8 @@ public void testSecondLevel() throws Exception { List vals = ParsingUtils.values(parser, "address.state", "address.foo", "address.building.floors", "address.building.bar"); assertEquals(4, vals.size()); assertEquals("NY", vals.get(0)); - assertNull(vals.get(1)); + assertSame(ParsingUtils.NOT_FOUND, vals.get(1)); assertEquals("10", vals.get(2)); - assertNull(vals.get(3)); + assertSame(ParsingUtils.NOT_FOUND, vals.get(3)); } } \ No newline at end of file