To try executing queries against a "real" Elasticsearch database, we first require Calcite as well as the tests for the Elasticsearch adapter which contain some useful code to get us started.

In [1]:
%%loadFromPOM
<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <version>1.20.0</version>
</dependency>
<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-elasticsearch</artifactId>
    <version>1.20.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.0.1</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.plugin</groupId>
    <artifactId>transport-netty4-client</artifactId>
    <version>7.0.1</version>
</dependency>
<dependency>
    <groupId>org.codelibs.elasticsearch.module</groupId>
    <artifactId>lang-painless</artifactId>
    <version>7.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-elasticsearch</artifactId>
    <version>1.20.0</version>
    <classifier>tests</classifier>
    <type>test-jar</type>
</dependency>
<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <version>1.20.0</version>
    <classifier>tests</classifier>
    <type>test-jar</type>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.11.0</version>
</dependency>

Here we construct an instance of `HrSchema` using `CalciteAssert` which is also from Calcite's testing libraries.
In practice, you would connect to a real database or at least some source of schema information.
With the schema, we first construct a [`RelBuilder`](https://calcite.apache.org/apidocs/org/apache/calcite/tools/RelBuilder.html) that we can use to build up a tree of relational algebra operators.

In [2]:
import org.apache.calcite.adapter.elasticsearch.ElasticSearchAdapterTest;
import org.apache.calcite.adapter.elasticsearch.ElasticsearchSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.client.RestClient;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.DriverManager;

// Normally we would use a JSON model file, but there area bunch of
// hacks here so we can use an embedded Elasticsearch instance instead
// See the docs below for more information:
// https://calcite.apache.org/docs/elasticsearch_adapter.html
Connection connection = DriverManager.getConnection("jdbc:calcite:lex=JAVA");
SchemaPlus root = connection.unwrap(CalciteConnection.class).getRootSchema();

// Since the class we need is not exposed by tests,
// we need some reflection hackery to get it
// This is *not* how the adapter should be used,
// but it is useful for demonstration
Object policy = ElasticSearchAdapterTest.NODE;
Field field = policy.getClass().getDeclaredField("node");
field.setAccessible(true);
Object node = field.get(policy);

// Start the embedded node
Method method = node.getClass().getMethod("start");
method.setAccessible(true);
method.invoke(node);
ElasticSearchAdapterTest.setupInstance();

// Get the REST client
Method restClientMethod = policy.getClass().getDeclaredMethod("restClient");
restClientMethod.setAccessible(true);
RestClient restClient = (RestClient) restClientMethod.invoke(policy);

// Get the object mapper
Method mapperMethod = policy.getClass().getDeclaredMethod("mapper");
mapperMethod.setAccessible(true);
ObjectMapper mapper = (ObjectMapper) mapperMethod.invoke(policy);

// Create a new schema
root.add("elastic", new ElasticsearchSchema(restClient, mapper, "zips"));
System.out.println("Loaded schema");

Loaded schema


We can now access Elasticsearch and execute SQL queries.
Note from the output that Calcite does not attempt to do any sort of schema inference.
The schema calcite exposes contains a single column named `_MAP` which is of the SQL map type and contains all the data in the document.

In [3]:
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;

Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT * FROM `elastic`.`zips` WHERE _MAP['id']=83704");

ResultSetMetaData rsmd = resultSet.getMetaData();
int columnsNumber = rsmd.getColumnCount();
while (resultSet.next()) {
    for (int i = 1; i <= columnsNumber; i++) {
        if (i > 1) System.out.print(",  ");
        String columnValue = resultSet.getString(i);
        System.out.print(rsmd.getColumnName(i) + "=" + columnValue);
    }
    System.out.println("");
}

_MAP={id=83704, city=BOISE, loc=[-116.295099, 43.633001], pop=40912, state=ID}


While we can write queries in this form by accessing fields in the map such as `_MAP['city']`, this is not very convenient.
Calcite also does not know the type of each of these fields are retrieved.
We can make it much easier to deal with schema by defining a view.
Here we add the view programmatically, but views can also be [defined in the model JSON](https://calcite.apache.org/docs/model.html#view).

In [4]:
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.ViewTableMacro;

final String viewSql = "select cast(_MAP['city'] AS varchar(20)) AS \"city\", "
  + "cast(_MAP['loc'][0] AS float) AS \"longitude\",\n"
  + "cast(_MAP['loc'][1] AS float) AS \"latitude\",\n"
  + "cast(_MAP['pop'] AS integer) AS \"pop\", "
  + "cast(_MAP['state'] AS varchar(2)) AS \"state\", "
  + "cast(_MAP['id'] AS varchar(5)) AS \"id\" "
  + "from \"elastic\".\"zips\"";

ViewTableMacro macro = ViewTable.viewMacro(root, viewSql,
  Collections.singletonList("elastic"), Arrays.asList("elastic", "view"), false);
root.add("zips", macro);

Now we can query the schema using this view and see all the columns defined in the view included.
While it is not directly shown, the types of these columns are also correctly defined.

In [5]:
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT * FROM `zips` ORDER BY `city`, `state`");

ResultSetMetaData rsmd = resultSet.getMetaData();
int columnsNumber = rsmd.getColumnCount();
while (resultSet.next()) {
    for (int i = 1; i <= columnsNumber; i++) {
        if (i > 1) System.out.print(",  ");
        String columnValue = resultSet.getString(i);
        System.out.print(rsmd.getColumnName(i) + "=" + columnValue);
    }
    System.out.println("");
}

city=ABERDEEN,  longitude=-98.485642,  latitude=45.466109,  pop=28786,  state=SD,  id=57401
city=AIKEN,  longitude=-81.719429,  latitude=33.553024,  pop=51233,  state=SC,  id=29801
city=ALTON,  longitude=-98.342647,  latitude=26.24153,  pop=67604,  state=TX,  id=78572
city=AMES,  longitude=-93.639398,  latitude=42.029859,  pop=52105,  state=IA,  id=50010
city=ANCHORAGE,  longitude=-149.74467,  latitude=61.203696,  pop=32383,  state=AK,  id=99504
city=BALTIMORE,  longitude=-76.679397,  latitude=39.344572,  pop=74402,  state=MD,  id=21215
city=BANGOR,  longitude=-68.791839,  latitude=44.824199,  pop=40434,  state=ME,  id=04401
city=BAVARIA,  longitude=-97.608787,  latitude=38.823802,  pop=45208,  state=KS,  id=67401
city=BAYONNE,  longitude=-74.119169,  latitude=40.666399,  pop=61444,  state=NJ,  id=07002
city=BEAVERTON,  longitude=-122.805395,  latitude=45.475035,  pop=46660,  state=OR,  id=97005
city=BECKLEY,  longitude=-81.206084,  latitude=37.793214,  pop=45196,  state=WV,  id=25801
