From 60fff2d04cded217bdb8b6c820e999e8439c7238 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Tue, 9 Sep 2014 23:32:49 +0300 Subject: [PATCH] Improve escaping of projected fields Fix #264 (cherry picked from commit c759218fee9c02e20b7ecc17bbec21f5aa8020a6) --- .../hadoop/cascading/EsHadoopScheme.java | 2 +- .../hadoop/cascading/EsLocalTap.java | 2 +- .../hive/AbstractHiveExtraTests.java | 6 +- .../hadoop/integration/hive/HiveSuite.java | 4 +- hive/src/itest/resources/cars-bulk.txt | 12 ++-- .../hadoop/hive/EsHiveInputFormat.java | 2 +- .../hadoop/rest/QueryBuilder.java | 9 ++- .../commonshttp/CommonsHttpTransport.java | 2 +- .../dto/mapping/MappingUtils.java | 4 +- .../hadoop/util/StringUtils.java | 57 ++++++++++++++++++- .../elasticsearch/hadoop/rest/EscapeTest.java | 44 ++++++++++++++ .../hadoop/rest/ResourceTest.java | 6 +- .../elasticsearch/hadoop/pig/PigUtils.java | 4 +- 13 files changed, 127 insertions(+), 27 deletions(-) create mode 100644 mr/src/test/java/org/elasticsearch/hadoop/rest/EscapeTest.java diff --git a/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java b/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java index 7201d59af0899c..ba4798f5370606 100644 --- a/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java +++ b/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsHadoopScheme.java @@ -126,7 +126,7 @@ public void sourceConfInit(FlowProcess flowProcess, Tap fields = CascadingUtils.fieldToAlias(set, getSourceFields()); // load only the necessary fields - conf.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(fields, ",")); + conf.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenateAndUriEncode(fields, ",")); if (log.isTraceEnabled()) { log.trace("Initialized (source) configuration " + HadoopCfgUtils.asProperties(conf)); diff --git a/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsLocalTap.java b/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsLocalTap.java index f3f1ff366f1969..3ad5fc9cc20c60 100644 --- a/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsLocalTap.java +++ b/cascading/src/main/java/org/elasticsearch/hadoop/cascading/EsLocalTap.java @@ -83,7 +83,7 @@ public TupleEntryIterator openForRead(FlowProcess flowProcess, Scrol MappingUtils.validateMapping(fields, mapping, validation, log); } - input = QueryBuilder.query(settings).fields(StringUtils.concatenate(fields, ",")).build(client, new ScrollReader(new JdkValueReader(), mapping)); + input = QueryBuilder.query(settings).fields(StringUtils.concatenateAndUriEncode(fields, ",")).build(client, new ScrollReader(new JdkValueReader(), mapping)); } return new TupleEntrySchemeIterator(flowProcess, getScheme(), input, getIdentifier()); } diff --git a/hive/src/itest/java/org/elasticsearch/hadoop/integration/hive/AbstractHiveExtraTests.java b/hive/src/itest/java/org/elasticsearch/hadoop/integration/hive/AbstractHiveExtraTests.java index 5233ca512ea000..91d57d4741f558 100644 --- a/hive/src/itest/java/org/elasticsearch/hadoop/integration/hive/AbstractHiveExtraTests.java +++ b/hive/src/itest/java/org/elasticsearch/hadoop/integration/hive/AbstractHiveExtraTests.java @@ -54,8 +54,9 @@ public void testQuery() throws Exception { String create = "CREATE EXTERNAL TABLE cars2 (" + "color STRING," + "price BIGINT," - + "sold TIMESTAMP) " - + HiveSuite.tableProps("cars/transactions", null, (String[]) null); + + "sold TIMESTAMP, " + + "alias STRING) " + + HiveSuite.tableProps("cars/transactions", null, "'es.mapping.names'='alias:&c'"); String query = "SELECT * from cars2"; String count = "SELECT count(1) from cars2"; @@ -64,6 +65,7 @@ public void testQuery() throws Exception { server.execute(create); List result = server.execute(query); assertEquals(6, result.size()); + assertTrue(result.get(0).contains("foobar")); result = server.execute(count); assertEquals("6", result.get(0)); } diff --git a/hive/src/itest/java/org/elasticsearch/hadoop/integration/hive/HiveSuite.java b/hive/src/itest/java/org/elasticsearch/hadoop/integration/hive/HiveSuite.java index 2652ab078d8f14..844a2a7b343570 100644 --- a/hive/src/itest/java/org/elasticsearch/hadoop/integration/hive/HiveSuite.java +++ b/hive/src/itest/java/org/elasticsearch/hadoop/integration/hive/HiveSuite.java @@ -37,9 +37,9 @@ import org.junit.runners.Suite; @RunWith(Suite.class) -@Suite.SuiteClasses({ AbstractHiveSaveTest.class, AbstractHiveSaveJsonTest.class, AbstractHiveSearchTest.class, AbstractHiveSearchJsonTest.class, AbstractHiveExtraTests.class, AbstractHiveExtraTests.class }) +//@Suite.SuiteClasses({ AbstractHiveSaveTest.class, AbstractHiveSaveJsonTest.class, AbstractHiveSearchTest.class, AbstractHiveSearchJsonTest.class, AbstractHiveExtraTests.class, AbstractHiveExtraTests.class }) //@Suite.SuiteClasses({ AbstractHiveSaveJsonTest.class, AbstractHiveSearchJsonTest.class }) -//@Suite.SuiteClasses({ AbstractHiveExtraTests.class }) +@Suite.SuiteClasses({ AbstractHiveExtraTests.class }) public class HiveSuite { static HiveInstance server; diff --git a/hive/src/itest/resources/cars-bulk.txt b/hive/src/itest/resources/cars-bulk.txt index 9aca4f61d766b5..ccada22be7598c 100644 --- a/hive/src/itest/resources/cars-bulk.txt +++ b/hive/src/itest/resources/cars-bulk.txt @@ -1,12 +1,12 @@ { "index": {}} -{ "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18" } +{ "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18", "&c" : "foobar" } { "index": {}} -{ "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-02" } +{ "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-02", "&c" : "foobar" } { "index": {}} -{ "price" : 12000, "color" : "green", "make" : "toyota", "sold" : "2014-08-19" } +{ "price" : 12000, "color" : "green", "make" : "toyota", "sold" : "2014-08-19", "&c" : "foobar" } { "index": {}} -{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" } +{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05", "&c" : "foobar" } { "index": {}} -{ "price" : 80000, "color" : "red", "make" : "bmw", "sold" : "2014-01-01" } +{ "price" : 80000, "color" : "red", "make" : "bmw", "sold" : "2014-01-01", "&c" : "foobar" } { "index": {}} -{ "price" : 25000, "color" : "blue", "make" : "ford", "sold" : "2014-02-12" } +{ "price" : 25000, "color" : "blue", "make" : "ford", "sold" : "2014-02-12", "&c" : "foobar" } diff --git a/hive/src/main/java/org/elasticsearch/hadoop/hive/EsHiveInputFormat.java b/hive/src/main/java/org/elasticsearch/hadoop/hive/EsHiveInputFormat.java index ea821adb5fb01a..ec2bdb4affff98 100644 --- a/hive/src/main/java/org/elasticsearch/hadoop/hive/EsHiveInputFormat.java +++ b/hive/src/main/java/org/elasticsearch/hadoop/hive/EsHiveInputFormat.java @@ -103,7 +103,7 @@ public FileSplit[] getSplits(JobConf job, int numSplits) throws IOException { Log log = LogFactory.getLog(getClass()); // move on to initialization InitializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log); - settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(HiveUtils.columnToAlias(settings), ",")); + settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenateAndUriEncode(HiveUtils.columnToAlias(settings), ",")); // set read resource settings.setResourceRead(settings.getResourceRead()); HiveUtils.init(settings, log); diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/QueryBuilder.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/QueryBuilder.java index 0f9be0f6274d89..655c2a7c28934d 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/QueryBuilder.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/QueryBuilder.java @@ -141,7 +141,10 @@ public QueryBuilder fields(String fieldsCSV) { } private String assemble() { - StringBuilder sb = new StringBuilder(resource.indexAndType()); + StringBuilder sb = new StringBuilder(); + sb.append(StringUtils.encodePath(resource.index())); + sb.append("/"); + sb.append(StringUtils.encodePath(resource.type())); sb.append("/_search?"); // override infrastructure params @@ -153,7 +156,7 @@ private String assemble() { if (StringUtils.hasText(fields)) { if (IS_ES_10) { uriQuery.put("_source", fields); - uriQuery.remove("escapedFields"); + uriQuery.remove("fields"); } else { uriQuery.put("fields", fields); @@ -195,7 +198,7 @@ private String assemble() { } public ScrollQuery build(RestRepository client, ScrollReader reader) { - String scrollUri = StringUtils.escapeUri(assemble()); + String scrollUri = assemble(); return client.scan(scrollUri, bodyQuery, reader); } diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/commonshttp/CommonsHttpTransport.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/commonshttp/CommonsHttpTransport.java index 85deedc6ee9417..dd6a1d22b4dc6f 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/commonshttp/CommonsHttpTransport.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/commonshttp/CommonsHttpTransport.java @@ -379,7 +379,7 @@ public void close() { private static String escapeUri(String uri) { // escape the uri right away - String escaped = StringUtils.escapeUri(uri); + String escaped = StringUtils.encodeUri(uri); return escaped.contains("://") ? escaped : "http://" + escaped; } diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/dto/mapping/MappingUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/dto/mapping/MappingUtils.java index b52ffe6f0a09da..86e283ed2e01b3 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/dto/mapping/MappingUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/dto/mapping/MappingUtils.java @@ -45,7 +45,7 @@ public class MappingUtils { public static void validateMapping(String fields, Field mapping, FieldPresenceValidation validation, Log log) { if (StringUtils.hasText(fields)) { - validateMapping(StringUtils.tokenize(fields), mapping, validation, log); + validateMapping(StringUtils.tokenizeAndUriDecode(fields, ","), mapping, validation, log); } } @@ -60,7 +60,7 @@ public static void validateMapping(Collection fields, Field mapping, Fie return; } - String message = String.format("Field(s) [%s] not found in the Elasticsearch mapping specified; did you mean [%s]?", + String message = String.format("Field(s) [%s] not found in the Elasticsearch mapping specified; did you mean [%s]?", removeDoubleBrackets(results[0]), removeDoubleBrackets(results[1])); if (validation == FieldPresenceValidation.WARN) { log.warn(message); diff --git a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java index f81a4441584521..a4ea2151e5649f 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java @@ -18,6 +18,9 @@ */ package org.elasticsearch.hadoop.util; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; @@ -64,6 +67,17 @@ public static List tokenize(String string, String delimiters) { return tokenize(string, delimiters, true, true); } + public static List tokenizeAndUriDecode(String string, String delimiters) { + List tokenize = tokenize(string, delimiters, true, true); + List decoded = new ArrayList(tokenize.size()); + + for (String token : tokenize) { + decoded.add(StringUtils.decodeQuery(token)); + } + + return decoded; + } + public static List tokenize(String string, String delimiters, boolean trimTokens, boolean ignoreEmptyTokens) { if (string == null) { return Collections.emptyList(); @@ -100,6 +114,17 @@ public static String concatenate(Collection list, String delimiter) { return sb.toString(); } + public static String concatenateAndUriEncode(Collection list, String delimiter) { + Collection escaped = new ArrayList(); + + if (list != null) { + for (Object object : list) { + escaped.add(encodeQuery(object.toString())); + } + } + return concatenate(escaped, delimiter); + } + public static String concatenate(Object[] array, String delimiter) { if (array == null || array.length == 0) { return EMPTY; @@ -155,7 +180,8 @@ public static int levenshteinDistance(CharSequence one, CharSequence another, in // if one string is empty, the edit distance is necessarily the length of the other if (n == 0) { return m <= threshold ? m : -1; - } else if (m == 0) { + } + else if (m == 0) { return n <= threshold ? n : -1; } @@ -206,7 +232,8 @@ public static int levenshteinDistance(CharSequence one, CharSequence another, in if (one.charAt(i - 1) == t_j) { // diagonally left and up d[i] = p[i - 1]; - } else { + } + else { // 1 + minimum of cell to the left, to the top, diagonally left and up d[i] = 1 + Math.min(Math.min(d[i - 1], p[i]), p[i - 1]); } @@ -260,11 +287,35 @@ public static String sanitizeResource(String resource) { return res; } - public static String escapeUri(String uri) { + public static String encodeUri(String uri) { try { return URIUtil.encodePathQuery(uri); } catch (URIException ex) { throw new EsHadoopIllegalArgumentException("Cannot escape uri" + uri); } } + + public static String encodePath(String path) { + try { + return URIUtil.encodePath(path, "UTF-8"); + } catch (URIException ex) { + throw new EsHadoopIllegalArgumentException("Cannot encode path" + path, ex); + } + } + + public static String encodeQuery(String query) { + try { + return URLEncoder.encode(query, "UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new EsHadoopIllegalArgumentException("Cannot encode path" + query, ex); + } + } + + public static String decodeQuery(String query) { + try { + return URLDecoder.decode(query, "UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new EsHadoopIllegalArgumentException("Cannot encode path" + query, ex); + } + } } \ No newline at end of file diff --git a/mr/src/test/java/org/elasticsearch/hadoop/rest/EscapeTest.java b/mr/src/test/java/org/elasticsearch/hadoop/rest/EscapeTest.java new file mode 100644 index 00000000000000..09a31671307565 --- /dev/null +++ b/mr/src/test/java/org/elasticsearch/hadoop/rest/EscapeTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.hadoop.rest; + +import java.util.Arrays; + +import org.elasticsearch.hadoop.util.StringUtils; +import org.junit.Test; + +import static org.junit.Assert.*; + +import static org.hamcrest.CoreMatchers.*; + +public class EscapeTest { + + @Test + public void testSingleAmpersandEscape() { + String uri = StringUtils.encodeQuery("&c"); + assertThat(uri, is("%26c")); + } + + @Test + public void testMultiAmpersandEscapeSimple() { + String uri = StringUtils.concatenateAndUriEncode(Arrays.asList("&a", "$b", "#c", "!d", "/e", ":f"), ","); + assertThat(uri, is("%26a,%24b,%23c,%21d,%2Fe,%3Af")); + } + +} diff --git a/mr/src/test/java/org/elasticsearch/hadoop/rest/ResourceTest.java b/mr/src/test/java/org/elasticsearch/hadoop/rest/ResourceTest.java index a8224df138d379..d9ce276a2ce1ce 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/rest/ResourceTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/rest/ResourceTest.java @@ -73,8 +73,8 @@ private Resource createResource(String target) { @Test public void testURiEscaping() throws Exception { - assertEquals("http://localhost:9200/index/type%7Cfoo?q=foo%7Cbar:bar%7Cfoo", StringUtils.escapeUri("http://localhost:9200/index/type|foo?q=foo|bar:bar|foo")); - assertEquals("foo%7Cbar", StringUtils.escapeUri("foo|bar")); - System.out.println(StringUtils.escapeUri("foo|bar,abc,xyz|rpt")); + assertEquals("http://localhost:9200/index/type%7Cfoo?q=foo%7Cbar:bar%7Cfoo", StringUtils.encodeUri("http://localhost:9200/index/type|foo?q=foo|bar:bar|foo")); + assertEquals("foo%7Cbar", StringUtils.encodeUri("foo|bar")); + System.out.println(StringUtils.encodeUri("foo|bar,abc,xyz|rpt")); } } diff --git a/pig/src/main/java/org/elasticsearch/hadoop/pig/PigUtils.java b/pig/src/main/java/org/elasticsearch/hadoop/pig/PigUtils.java index c24b8b270fbec7..ab03d3f878dc6e 100644 --- a/pig/src/main/java/org/elasticsearch/hadoop/pig/PigUtils.java +++ b/pig/src/main/java/org/elasticsearch/hadoop/pig/PigUtils.java @@ -102,7 +102,7 @@ static String asProjection(Schema schema, Properties props) { List fields = new ArrayList(); addField(schema, fields, alias(new PropertiesSettings(props)), null); - return StringUtils.concatenate(fields.toArray(new String[fields.size()]), ","); + return StringUtils.concatenate(fields, ","); } private static void addField(Schema schema, List fields, FieldAlias fa, String currentNode) { @@ -139,7 +139,7 @@ static String asProjection(RequiredFieldList list, Properties props) { addField(field, fields, alias, ""); } - return StringUtils.concatenate(fields.toArray(new String[fields.size()]), ","); + return StringUtils.concatenateAndUriEncode(fields, ","); } private static void addField(RequiredField field, List fields, FieldAlias fa, String currentNode) {