Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Elasticsearch query string syntax #1662

Merged
merged 1 commit into from
Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions presto-docs/src/main/sphinx/connector/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,15 @@ _id The Elasticsearch document ID
_score The document score returned by the Elasticsearch query
_source The source of the original document
======= =======================================================

Full Text Queries
-----------------

Presto SQL queries can be combined with Elasticsearch queries by providing the `full text query`_
as part of the table name, separated by a colon. For example:

.. code-block:: sql

SELECT * FROM "tweets: +presto SQL^2"

.. _full text query: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#query-string-syntax
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@
import io.prestosql.elasticsearch.client.SearchShardsResponse;
import io.prestosql.elasticsearch.client.Shard;
import io.prestosql.spi.PrestoException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
Expand Down Expand Up @@ -72,6 +75,7 @@
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.prestosql.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_CONNECTION_ERROR;
import static io.prestosql.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_INVALID_RESPONSE;
import static io.prestosql.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_QUERY_FAILURE;
import static io.prestosql.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_SSL_INITIALIZATION_FAILURE;
import static java.lang.StrictMath.toIntExact;
import static java.lang.String.format;
Expand Down Expand Up @@ -417,6 +421,30 @@ public SearchResponse beginSearch(String index, int shard, QueryBuilder query, O
catch (IOException e) {
throw new PrestoException(ELASTICSEARCH_CONNECTION_ERROR, e);
}
catch (ElasticsearchStatusException e) {
Throwable[] suppressed = e.getSuppressed();
if (suppressed.length > 0) {
Throwable cause = suppressed[0];
if (cause instanceof ResponseException) {
HttpEntity entity = ((ResponseException) cause).getResponse().getEntity();
try {
JsonNode reason = OBJECT_MAPPER.readTree(entity.getContent()).path("error")
.path("root_cause")
.path(0)
.path("reason");

if (!reason.isMissingNode()) {
throw new PrestoException(ELASTICSEARCH_QUERY_FAILURE, reason.asText(), e);
}
}
catch (IOException ex) {
e.addSuppressed(ex);
}
}
}

throw new PrestoException(ELASTICSEARCH_CONNECTION_ERROR, e);
}
}

public SearchResponse nextPage(String scrollId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import io.prestosql.spi.ErrorType;

import static io.prestosql.spi.ErrorType.EXTERNAL;
import static io.prestosql.spi.ErrorType.USER_ERROR;

public enum ElasticsearchErrorCode
implements ErrorCodeSupplier
{
ELASTICSEARCH_CONNECTION_ERROR(0, EXTERNAL),
ELASTICSEARCH_INVALID_RESPONSE(1, EXTERNAL),
ELASTICSEARCH_SSL_INITIALIZATION_FAILURE(2, EXTERNAL);
ELASTICSEARCH_SSL_INITIALIZATION_FAILURE(2, EXTERNAL),
ELASTICSEARCH_QUERY_FAILURE(3, USER_ERROR);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,15 @@ public ElasticsearchTableHandle getTableHandle(ConnectorSession session, SchemaT
requireNonNull(tableName, "tableName is null");

if (tableName.getSchemaName().equals(schemaName)) {
if (listTables(session, Optional.of(schemaName)).contains(tableName)) {
return new ElasticsearchTableHandle(schemaName, tableName.getTableName());
String[] parts = tableName.getTableName().split(":", 2);
String table = parts[0];
Optional<String> query = Optional.empty();
if (parts.length == 2) {
query = Optional.of(parts[1]);
}

if (listTables(session, Optional.of(schemaName)).contains(new SchemaTableName(schemaName, table))) {
return new ElasticsearchTableHandle(schemaName, table, query);
}
}

Expand Down Expand Up @@ -245,7 +252,8 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
handle = new ElasticsearchTableHandle(
handle.getSchema(),
handle.getIndex(),
handle.getConstraint());
handle.getConstraint(),
handle.getQuery());

return Optional.of(new ConstraintApplicationResult<>(handle, constraint.getSummary()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public ElasticsearchPageSource(
SearchResponse searchResponse = client.beginSearch(
table.getIndex(),
split.getShard(),
buildSearchQuery(table.getConstraint(), columns),
buildSearchQuery(table.getConstraint(), columns, table.getQuery()),
needAllFields ? Optional.empty() : Optional.of(requiredFields),
documentFields);
readTimeNanos += System.nanoTime() - start;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.elasticsearch.index.query.ExistsQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -43,7 +45,7 @@ public class ElasticsearchQueryBuilder
{
private ElasticsearchQueryBuilder() {}

public static QueryBuilder buildSearchQuery(TupleDomain<ColumnHandle> constraint, List<ElasticsearchColumnHandle> columns)
public static QueryBuilder buildSearchQuery(TupleDomain<ColumnHandle> constraint, List<ElasticsearchColumnHandle> columns, Optional<String> query)
{
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
for (ElasticsearchColumnHandle column : columns) {
Expand All @@ -57,6 +59,10 @@ public static QueryBuilder buildSearchQuery(TupleDomain<ColumnHandle> constraint
}
boolQueryBuilder.must(columnQueryBuilder);
}

query.map(QueryStringQueryBuilder::new)
.ifPresent(boolQueryBuilder::must);

if (boolQueryBuilder.hasClauses()) {
return boolQueryBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.prestosql.spi.predicate.TupleDomain;

import java.util.Objects;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -29,11 +30,13 @@ public final class ElasticsearchTableHandle
private final String schema;
private final String index;
private final TupleDomain<ColumnHandle> constraint;
private final Optional<String> query;

public ElasticsearchTableHandle(String schema, String index)
public ElasticsearchTableHandle(String schema, String index, Optional<String> query)
{
this.schema = requireNonNull(schema, "schema is null");
this.index = requireNonNull(index, "index is null");
this.query = requireNonNull(query, "query is null");

constraint = TupleDomain.all();
}
Expand All @@ -42,11 +45,13 @@ public ElasticsearchTableHandle(String schema, String index)
public ElasticsearchTableHandle(
@JsonProperty("schema") String schema,
@JsonProperty("index") String index,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint,
@JsonProperty("query") Optional<String> query)
{
this.schema = requireNonNull(schema, "schema is null");
this.index = requireNonNull(index, "index is null");
this.constraint = requireNonNull(constraint, "constraint is null");
this.query = requireNonNull(query, "query is null");
}

@JsonProperty
Expand All @@ -67,6 +72,12 @@ public TupleDomain<ColumnHandle> getConstraint()
return constraint;
}

@JsonProperty
public Optional<String> getQuery()
{
return query;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -79,12 +90,13 @@ public boolean equals(Object o)
ElasticsearchTableHandle that = (ElasticsearchTableHandle) o;
return schema.equals(that.schema) &&
index.equals(that.index) &&
constraint.equals(that.constraint);
constraint.equals(that.constraint) &&
query.equals(that.query);
}

@Override
public int hashCode()
{
return Objects.hash(schema, index, constraint);
return Objects.hash(schema, index, constraint, query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.prestosql.elasticsearch;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closer;
import io.airlift.tpch.TpchTable;
Expand All @@ -31,6 +32,7 @@

import static io.prestosql.elasticsearch.ElasticsearchQueryRunner.createElasticsearchQueryRunner;
import static io.prestosql.elasticsearch.EmbeddedElasticsearchNode.createEmbeddedElasticsearchNode;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static io.prestosql.testing.MaterializedResult.resultBuilder;
import static io.prestosql.testing.assertions.Assert.assertEquals;
Expand Down Expand Up @@ -308,6 +310,24 @@ public void testDataTypesNested()
assertEquals(rows.getMaterializedRows(), expected.getMaterializedRows());
}

@Test
public void testQueryString()
{
MaterializedResult actual = computeActual("SELECT count(*) FROM \"orders: +packages -slyly\"");

MaterializedResult expected = resultBuilder(getSession(), ImmutableList.of(BIGINT))
.row(1639L)
.build();

assertEquals(actual, expected);
}

@Test
public void testQueryStringError()
{
assertQueryFails("SELECT count(*) FROM \"orders: ++foo AND\"", "\\QFailed to parse query [ ++foo and]\\E");
}

private void index(String indexName, Map<String, Object> document)
{
embeddedElasticsearchNode.getClient()
Expand Down