Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More ES refactoring #32

Merged
merged 4 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 20 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ JobClient.runJob(conf);
```java
Configuration conf = new Configuration();
conf.set("opensearch.resource", "radio/artists");
conf.set("es.query", "?q=me*"); // replace this with the relevant query
conf.set("opensearch.query", "?q=me*"); // replace this with the relevant query
Job job = new Job(conf)
job.setInputFormatClass(EsInputFormat.class);
...
Expand All @@ -115,9 +115,9 @@ job.waitForCompletion(true);
## [Apache Hive][]
ES-Hadoop provides a Hive storage handler for Elasticsearch, meaning one can define an [external table][] on top of ES.

Add es-hadoop-<version>.jar to `hive.aux.jars.path` or register it manually in your Hive script (recommended):
Add opensearch-hadoop-<version>.jar to `hive.aux.jars.path` or register it manually in your Hive script (recommended):
```
ADD JAR /path_to_jar/es-hadoop-<version>.jar;
ADD JAR /path_to_jar/opensearch-hadoop-<version>.jar;
```
### Reading
To read data from ES, define a table backed by the desired index:
Expand All @@ -127,7 +127,7 @@ CREATE EXTERNAL TABLE artists (
name STRING,
links STRUCT<url:STRING, picture:STRING>)
STORED BY 'org.opensearch.hive.hadoop.EsStorageHandler'
TBLPROPERTIES('opensearch.resource' = 'radio/artists', 'es.query' = '?q=me*');
TBLPROPERTIES('opensearch.resource' = 'radio/artists', 'opensearch.query' = '?q=me*');
```
The fields defined in the table are mapped to the JSON when communicating with Elasticsearch. Notice the use of `TBLPROPERTIES` to define the location, that is the query used for reading from this table.

Expand Down Expand Up @@ -160,7 +160,7 @@ ES-Hadoop provides both read and write functions for Pig so you can access Elast

Register ES-Hadoop jar into your script or add it to your Pig classpath:
```
REGISTER /path_to_jar/es-hadoop-<version>.jar;
REGISTER /path_to_jar/opensearch-hadoop-<version>.jar;
```
Additionally one can define an alias to save some chars:
```
Expand All @@ -171,7 +171,7 @@ and use `$ESSTORAGE` for storage definition.
### Reading
To read data from ES, use `OpenSearchStorage` and specify the query through the `LOAD` function:
```SQL
A = LOAD 'radio/artists' USING org.opensearch.pig.hadoop.EsStorage('es.query=?q=me*');
A = LOAD 'radio/artists' USING org.opensearch.pig.hadoop.EsStorage('opensearch.query=?q=me*');
DUMP A;
```

Expand All @@ -188,10 +188,10 @@ ES-Hadoop provides native (Java and Scala) integration with Spark: for reading a
### Scala

### Reading
To read data from ES, create a dedicated `RDD` and specify the query as an argument:
To read data from OpenSearch, create a dedicated `RDD` and specify the query as an argument:

```scala
import org.elasticsearch.spark._
import org.opensearch.spark._

..
val conf = ...
Expand All @@ -201,20 +201,20 @@ sc.esRDD("radio/artists", "?q=me*")

#### Spark SQL
```scala
import org.elasticsearch.spark.sql._
import org.opensearch.spark.sql._

// DataFrame schema automatically inferred
val df = sqlContext.read.format("es").load("buckethead/albums")
val df = sqlContext.read.format("opensearch").load("buckethead/albums")

// operations get pushed down and translated at runtime to Elasticsearch QueryDSL
// operations get pushed down and translated at runtime to OpenSearch QueryDSL
val playlist = df.filter(df("category").equalTo("pikes").and(df("year").geq(2016)))
```

### Writing
Import the `org.elasticsearch.spark._` package to gain `savetoEs` methods on your `RDD`s:
Import the `org.opensearch.spark._` package to gain `savetoEs` methods on your `RDD`s:

```scala
import org.elasticsearch.spark._
import org.opensearch.spark._

val conf = ...
val sc = new SparkContext(conf)
Expand All @@ -228,22 +228,22 @@ sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")
#### Spark SQL

```scala
import org.elasticsearch.spark.sql._
import org.opensearch.spark.sql._

val df = sqlContext.read.json("examples/people.json")
df.saveToEs("spark/people")
```

### Java

In a Java environment, use the `org.elasticsearch.spark.rdd.java.api` package, in particular the `JavaOpenSearchSpark` class.
In a Java environment, use the `org.opensearch.spark.rdd.java.api` package, in particular the `JavaOpenSearchSpark` class.

### Reading
To read data from ES, create a dedicated `RDD` and specify the query as an argument.

```java
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaOpenSearchSpark;
import org.opensearch.spark.rdd.api.java.JavaOpenSearchSpark;

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);
Expand All @@ -255,15 +255,15 @@ JavaPairRDD<String, Map<String, Object>> esRDD = JavaOpenSearchSpark.esRDD(jsc,

```java
SQLContext sql = new SQLContext(sc);
DataFrame df = sql.read().format("es").load("buckethead/albums");
DataFrame df = sql.read().format("opensearch").load("buckethead/albums");
DataFrame playlist = df.filter(df.col("category").equalTo("pikes").and(df.col("year").geq(2016)))
```

### Writing

Use `JavaOpenSearchSpark` to index any `RDD` to Elasticsearch:
```java
import org.elasticsearch.spark.rdd.api.java.JavaOpenSearchSpark;
import org.opensearch.spark.rdd.api.java.JavaOpenSearchSpark;

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);
Expand Down Expand Up @@ -294,7 +294,7 @@ import org.opensearch.storm.OpenSearchSpout;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("opensearch-spout", new OpenSearchSpout("storm/docs", "?q=me*"), 5);
builder.setBolt("bolt", new PrinterBolt()).shuffleGrouping("es-spout");
builder.setBolt("bolt", new PrinterBolt()).shuffleGrouping("opensearch-spout");
```

### Writing
Expand All @@ -305,7 +305,7 @@ import org.opensearch.storm.OpenSearchBolt;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 10);
builder.setBolt("es-bolt", new EsBolt("storm/docs"), 5).shuffleGrouping("spout");
builder.setBolt("opensearch-bolt", new EsBolt("storm/docs"), 5).shuffleGrouping("spout");
```

## Building the source
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ if (project.hasProperty("find-artifact")) {
task generateDependenciesReport(type: ConcatFilesTask) {
files = fileTree(dir: project.rootDir, include: '**/dependencies.csv' )
headerLine = "name,version,url,license"
target = new File(System.getProperty('csv')?: "${project.buildDir}/reports/dependencies/es-hadoop-dependencies.csv")
target = new File(System.getProperty('csv')?: "${project.buildDir}/reports/dependencies/opensearch-hadoop-dependencies.csv")
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,7 @@ class OpenSearchFixturePlugin implements Plugin<Project> {
File scriptsDir = new File(project.buildDir, 'scripts')
scriptsDir.mkdirs()
File script = null
if (majorVersion <= 2) {
scriptsDir.mkdirs()
script = new File(scriptsDir, "increment.groovy").setText("ctx._source.counter+=1", 'UTF-8')
} else if (majorVersion == 5) {
scriptsDir.mkdirs()
script = new File(scriptsDir, "increment.painless").setText("ctx._source.counter = ctx._source.getOrDefault('counter', 0) + 1", 'UTF-8')
}
script = new File(scriptsDir, "increment.painless").setText("ctx._source.counter = ctx._source.getOrDefault('counter', 0) + 1", 'UTF-8')
if (script != null) {
integTestCluster.extraConfigFile("script", script)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private String findJavaHome(String version) {
if (versionedJavaHome == null) {
final String exceptionMessage = String.format(
Locale.ROOT,
"$%s must be set to build Elasticsearch. "
"$%s must be set to build OpenSearch. "
+ "Note that if the variable was just set you "
+ "might have to run `./gradlew --stop` for "
+ "it to be picked up. See https://github.com/elastic/elasticsearch/issues/31399 details.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/

/**
* All classes in org.elasticsearch.hadoop.gradle.buildtools are originally
* All classes in org.opensearch.hadoop.gradle.buildtools are originally
* copied from the elasticsearch build-tools project. These classes are
* considered internal in build tools and will not be supported by the build tools
* team and also not being published in upcoming build tool releases.
Expand Down
2 changes: 1 addition & 1 deletion dist/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ task generateDependenciesReport(type: ConcatFilesTask) {
dependsOn rootProject.allprojects.collect { it.tasks.withType(DependenciesInfoTask) }
files = fileTree(dir: project.rootDir, include: '**/dependencies.csv' )
headerLine = "name,version,url,license"
target = new File(System.getProperty('csv')?: "${project.buildDir}/reports/dependencies/es-hadoop-dependencies.csv")
target = new File(System.getProperty('csv')?: "${project.buildDir}/reports/dependencies/opensearch-hadoop-dependencies.csv")
}

project.tasks.named('dependencyLicenses', DependencyLicensesTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testQuery() throws Exception {
+ "price BIGINT,"
+ "sold TIMESTAMP, "
+ "alias STRING) "
+ HiveSuite.tableProps(resource, null, "'es.mapping.names'='alias:&c'");
+ HiveSuite.tableProps(resource, null, "'opensearch.mapping.names'='alias:&c'");

String query = "SELECT * from cars2";
String count = "SELECT count(*) from cars2";
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testDate() throws Exception {
String create = "CREATE EXTERNAL TABLE nixtime ("
+ "type BIGINT,"
+ "dte TIMESTAMP)"
+ HiveSuite.tableProps("hive-date-as-long", null, "'es.mapping.names'='dte:&t'");
+ HiveSuite.tableProps("hive-date-as-long", null, "'opensearch.mapping.names'='dte:&t'");

String query = "SELECT * from nixtime WHERE type = 1";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.opensearch.hadoop.HdpBootstrap;
import org.opensearch.hadoop.QueryTestParams;
import org.opensearch.hadoop.cfg.ConfigurationOptions;
import org.opensearch.hadoop.OpenSearchAssume;
import org.opensearch.hadoop.rest.RestUtils;
import org.opensearch.hadoop.util.OpenSearchMajorVersion;
import org.opensearch.hadoop.util.TestUtils;
Expand Down Expand Up @@ -96,7 +95,7 @@ public void after() throws Exception {
public void basicLoad() throws Exception {

String create = "CREATE EXTERNAL TABLE jsonartistsread" + testInstance + " (data INT, garbage INT, garbage2 STRING) "
+ tableProps(resource("json-hive-artists", "data", targetVersion), "'opensearch.output.json' = 'true'", "'es.mapping.names'='garbage2:refuse'");
+ tableProps(resource("json-hive-artists", "data", targetVersion), "'opensearch.output.json' = 'true'", "'opensearch.mapping.names'='garbage2:refuse'");

String select = "SELECT * FROM jsonartistsread" + testInstance;

Expand All @@ -112,7 +111,7 @@ public void basicLoad() throws Exception {
public void basicLoadWithNameMappings() throws Exception {

String create = "CREATE EXTERNAL TABLE jsonartistsread" + testInstance + " (refuse INT, garbage INT, data STRING) "
+ tableProps(resource("json-hive-artists", "data", targetVersion), "'opensearch.output.json' = 'true'", "'es.mapping.names'='data:boomSomethingYouWerentExpecting'");
+ tableProps(resource("json-hive-artists", "data", targetVersion), "'opensearch.output.json' = 'true'", "'opensearch.mapping.names'='data:boomSomethingYouWerentExpecting'");

String select = "SELECT * FROM jsonartistsread" + testInstance;

Expand Down Expand Up @@ -141,7 +140,7 @@ public void basicLoadWithNoGoodCandidateField() throws Exception {
@Test
public void testMissingIndex() throws Exception {
String create = "CREATE EXTERNAL TABLE jsonmissingread" + testInstance + " (data STRING) "
+ tableProps(resource("foobar", "missing", targetVersion), "'es.index.read.missing.as.empty' = 'true'", "'opensearch.output.json' = 'true'");
+ tableProps(resource("foobar", "missing", targetVersion), "'opensearch.index.read.missing.as.empty' = 'true'", "'opensearch.output.json' = 'true'");

String select = "SELECT * FROM jsonmissingread" + testInstance;

Expand All @@ -157,7 +156,7 @@ public void testNoSourceFilterCollisions() throws Exception {
+ tableProps(
resource("json-hive-artists", "data", targetVersion),
"'opensearch.output.json' = 'true'",
"'es.read.source.filter'='name'"
"'opensearch.read.source.filter'='name'"
);

String select = "SELECT * FROM jsonartistscollisionread" + testInstance;
Expand Down Expand Up @@ -193,7 +192,7 @@ private static void assertContains(List<String> str, String content) {

private String tableProps(String resource, String... params) {
List<String> copy = new ArrayList(Arrays.asList(params));
copy.add("'" + ConfigurationOptions.ES_READ_METADATA + "'='" + readMetadata + "'");
copy.add("'" + ConfigurationOptions.OPENSEARCH_READ_METADATA + "'='" + readMetadata + "'");
return HiveSuite.tableProps(resource, query, copy.toArray(new String[copy.size()]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;

import org.opensearch.hadoop.cfg.ConfigurationOptions;
import org.opensearch.hadoop.OpenSearchAssume;
import org.opensearch.hadoop.rest.RestUtils;
import org.opensearch.hadoop.util.OpenSearchMajorVersion;
import org.opensearch.hadoop.util.StringUtils;
Expand Down Expand Up @@ -160,8 +159,8 @@ public void testCreate() throws Exception {
"CREATE EXTERNAL TABLE jsoncreatesave ("
+ "json STRING) "
+ tableProps(resource("json-hive-createsave", "data", targetVersion),
"'" + ConfigurationOptions.ES_MAPPING_ID + "'='number'",
"'" + ConfigurationOptions.ES_WRITE_OPERATION + "'='create'");
"'" + ConfigurationOptions.OPENSEARCH_MAPPING_ID + "'='number'",
"'" + ConfigurationOptions.OPENSEARCH_WRITE_OPERATION + "'='create'");

// transfer data
String insert =
Expand All @@ -188,8 +187,8 @@ public void testCreateWithDuplicates() throws Exception {
"CREATE EXTERNAL TABLE jsoncreatesaveduplicate ("
+ "json STRING) "
+ tableProps(resource("json-hive-createsave", "data", targetVersion),
"'" + ConfigurationOptions.ES_MAPPING_ID + "'='number'",
"'" + ConfigurationOptions.ES_WRITE_OPERATION + "'='create'");
"'" + ConfigurationOptions.OPENSEARCH_MAPPING_ID + "'='number'",
"'" + ConfigurationOptions.OPENSEARCH_WRITE_OPERATION + "'='create'");

String selectTest = "SELECT s.json FROM jsoncreatesourceduplicate s";

Expand Down Expand Up @@ -219,8 +218,8 @@ public void testUpdateWithId() throws Exception {
"CREATE EXTERNAL TABLE jsonupdatesave ("
+ "json STRING) "
+ tableProps(resource("json-hive-updatesave", "data", targetVersion),
"'" + ConfigurationOptions.ES_MAPPING_ID + "'='number'",
"'" + ConfigurationOptions.ES_WRITE_OPERATION + "'='upsert'");
"'" + ConfigurationOptions.OPENSEARCH_MAPPING_ID + "'='number'",
"'" + ConfigurationOptions.OPENSEARCH_WRITE_OPERATION + "'='upsert'");

String selectTest = "SELECT s.json FROM jsonupdatesource s";

Expand Down Expand Up @@ -251,8 +250,8 @@ public void testUpdateWithoutUpsert() throws Exception {
"CREATE EXTERNAL TABLE jsonupdatewoupsertsave ("
+ "json STRING) "
+ tableProps(resource("json-hive-updatewoupsertsave", "data", targetVersion),
"'" + ConfigurationOptions.ES_MAPPING_ID + "'='number'",
"'" + ConfigurationOptions.ES_WRITE_OPERATION + "'='update'");
"'" + ConfigurationOptions.OPENSEARCH_MAPPING_ID + "'='number'",
"'" + ConfigurationOptions.OPENSEARCH_WRITE_OPERATION + "'='update'");

String selectTest = "SELECT s.json FROM jsonupdatewoupsertsource s";

Expand Down Expand Up @@ -343,7 +342,7 @@ private String loadData(String tableName) {

private static String tableProps(String resource, String... params) {
List<String> parms = new ArrayList<String>();
parms.add("'es.input.json'='true'");
parms.add("'opensearch.input.json'='true'");
if (params != null) {
Collections.addAll(parms, params);
}
Expand Down