Skip to content

Commit

Permalink
[Refactor] spark-sql-30 from legacy to opensearch namespace (#18)
Browse files Browse the repository at this point in the history
Refactors classes in spark-sql-30 from legacy to opensearch namespace.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize committed Oct 27, 2022
1 parent 2281bed commit 4a93821
Show file tree
Hide file tree
Showing 84 changed files with 341 additions and 341 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,16 +301,16 @@ Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
JavaEsSpark.saveToEs(javaRDD, "spark/docs");
JavaOpenSearchSpark.saveToEs(javaRDD, "spark/docs");
```

#### Spark SQL

```java
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;
import org.opensearch.spark.sql.api.java.JavaOpenSearchSparkSQL;

DataFrame df = sqlContext.read.json("examples/people.json")
JavaEsSparkSQL.saveToEs(df, "spark/docs")
JavaOpenSearchSparkSQL.saveToEs(df, "spark/docs")
```

## [Apache Storm][]
Expand Down
6 changes: 3 additions & 3 deletions qa/kerberos/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ dependencies {

implementation("org.scala-lang:scala-library:${scala212Version}")
implementation("org.scala-lang:scala-reflect:${scala212Version}")
implementation(project(":elasticsearch-spark-30")) {
implementation(project(":opensearch-spark-30")) {
capabilities {
// Spark 3.x on Scala 2.12
requireCapability("org.elasticsearch.spark.sql.variant:spark30scala212:$project.version")
Expand Down Expand Up @@ -259,7 +259,7 @@ if (disableTests) {
def mrJar = project(':opensearch-hadoop-mr').tasks.getByName('jar') as Jar
def hiveJar = project(':opensearch-hadoop-hive').tasks.getByName('jar') as Jar
def pigJar = project(':opensearch-hadoop-pig').tasks.getByName('jar') as Jar
def sparkJar = project(':elasticsearch-spark-30').tasks.getByName('jar') as Jar
def sparkJar = project(':opensearch-spark-30').tasks.getByName('jar') as Jar

// Need these for SSL items, test data, and scripts
File resourceDir = project.sourceSets.main.resources.getSrcDirs().head()
Expand All @@ -270,7 +270,7 @@ if (disableTests) {
doLast {
project.javaexec {
executable = project.runtimeJavaHome.toString() + "/bin/java"
mainClass = 'org.elasticsearch.hadoop.qa.kerberos.setup.SetupKerberosUsers'
mainClass = 'org.opensearch.hadoop.qa.kerberos.setup.SetupKerberosUsers'
classpath = sourceSets.main.runtimeClasspath
systemProperty('opensearch.nodes', opensearchAddress)
systemProperty('es.net.http.auth.user', 'test_admin')
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ project(":sql-20").name = "opensearch-spark-20"

include 'sql-30'
project(":sql-30").projectDir = new File(settingsDir, "spark/sql-30")
project(":sql-30").name = "elasticsearch-spark-30"
project(":sql-30").name = "opensearch-spark-30"

include 'storm'
project(":storm").name = "elasticsearch-storm"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void testEsSchemaRDD1WriteWithMappingExclude() throws Exception {
public void testEsdataFrame2Read() throws Exception {
String target = resource("sparksql-test-scala-basic-write", "data", version);

// DataFrame dataFrame = JavaEsSparkSQL.esDF(sqc, target);
// DataFrame dataFrame = JavaOpenSearchSparkSQL.esDF(sqc, target);
DataFrame dataFrame = sqc.read().format("es").load(target);
assertTrue(dataFrame.count() > 300);
String schema = dataFrame.schema().treeString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.opensearch.hadoop.util.OpenSearchMajorVersion;
import org.elasticsearch.hadoop.util.TestSettings;
import org.elasticsearch.hadoop.util.TestUtils;
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;
import org.opensearch.spark.sql.api.java.JavaOpenSearchSparkSQL;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand All @@ -66,7 +66,7 @@
import static scala.collection.JavaConversions.*;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class AbstractJavaEsSparkSQLTest implements Serializable {
public class AbstractJavaOpenSearchSparkSQLTest implements Serializable {

private static final transient SparkConf conf = new SparkConf()
.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS))
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testEsDataset1Write() throws Exception {
Dataset<Row> dataset = artistsAsDataset();

String target = resource("sparksql-test-scala-basic-write", "data", version);
JavaEsSparkSQL.saveToEs(dataset, target);
JavaOpenSearchSparkSQL.saveToEs(dataset, target);
assertTrue(RestUtils.exists(target));
assertThat(RestUtils.get(target + "/_search?"), containsString("345"));
}
Expand All @@ -122,7 +122,7 @@ public void testEsDataset1WriteWithId() throws Exception {
String target = resource("sparksql-test-scala-basic-write-id-mapping", "data", version);
String docEndpoint = docEndpoint("sparksql-test-scala-basic-write-id-mapping", "data", version);

JavaEsSparkSQL.saveToEs(dataset, target,
JavaOpenSearchSparkSQL.saveToEs(dataset, target,
ImmutableMap.of(ES_MAPPING_ID, "id"));
assertTrue(RestUtils.exists(target));
assertThat(RestUtils.get(target + "/_search?"), containsString("345"));
Expand All @@ -134,7 +134,7 @@ public void testEsSchemaRDD1WriteWithMappingExclude() throws Exception {
Dataset<Row> dataset = artistsAsDataset();

String target = resource("sparksql-test-scala-basic-write-exclude-mapping", "data", version);
JavaEsSparkSQL.saveToEs(dataset, target,
JavaOpenSearchSparkSQL.saveToEs(dataset, target,
ImmutableMap.of(ES_MAPPING_EXCLUDE, "url"));
assertTrue(RestUtils.exists(target));
assertThat(RestUtils.get(target + "/_search?"), not(containsString("url")));
Expand All @@ -144,7 +144,7 @@ public void testEsSchemaRDD1WriteWithMappingExclude() throws Exception {
public void testEsDataset2Read() throws Exception {
String target = resource("sparksql-test-scala-basic-write", "data", version);

// Dataset<Row> dataset = JavaEsSparkSQL.esDF(sqc, target);
// Dataset<Row> dataset = JavaOpenSearchSparkSQL.esDF(sqc, target);
Dataset<Row> dataset = sqc.read().format("es").load(target);
assertTrue(dataset.count() > 300);
String schema = dataset.schema().treeString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(Parameterized.class)
public class AbstractJavaEsSparkStreamingTest implements Serializable {
public class AbstractJavaOpenSearchSparkStreamingTest implements Serializable {

private static final transient SparkConf conf = new SparkConf()
.setMaster("local")
Expand Down Expand Up @@ -118,7 +118,7 @@ public static void clean() throws Exception {
private JavaStreamingContext ssc = null;
private OpenSearchMajorVersion version = TestUtils.getOpenSearchClusterInfo().getMajorVersion();

public AbstractJavaEsSparkStreamingTest(String prefix, boolean readMetadata) {
public AbstractJavaOpenSearchSparkStreamingTest(String prefix, boolean readMetadata) {
this.prefix = prefix;
this.cfg.put(ES_READ_METADATA, Boolean.toString(readMetadata));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.junit.runners.Suite;

@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractJavaEsSparkSQLTest.class })
@Suite.SuiteClasses({ AbstractJavaOpenSearchSparkSQLTest.class })
public class SparkSQLSuite {

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.junit.runners.Suite;

@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractJavaEsSparkStreamingTest.class })
@Suite.SuiteClasses({ AbstractJavaOpenSearchSparkStreamingTest.class })
public class SparkStreamingSuite {

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.opensearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.TestSettings;
import org.elasticsearch.hadoop.util.TestUtils;
import org.opensearch.spark.rdd.api.java.JavaEsSpark;
import org.opensearch.spark.rdd.api.java.JavaOpenSearchSpark;
import org.opensearch.spark.sql.streaming.api.java.JavaStreamingQueryTestHarness;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ import org.elasticsearch.hadoop.util.TestSettings
import org.elasticsearch.hadoop.util.TestUtils
import org.elasticsearch.hadoop.util.TestUtils.resource
import org.elasticsearch.hadoop.util.TestUtils.docEndpoint
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.spark.sparkRDDFunctions
import org.elasticsearch.spark.sparkStringJsonRDDFunctions
import org.elasticsearch.spark.sql.EsSparkSQL
import org.elasticsearch.spark.sql.ScalaEsRow
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL
import org.elasticsearch.spark.sql.sparkDatasetFunctions
import org.elasticsearch.spark.sql.sqlContextFunctions
import org.opensearch.spark.cfg.SparkSettingsManager
import org.opensearch.spark.sparkRDDFunctions
import org.opensearch.spark.sparkStringJsonRDDFunctions
import org.opensearch.spark.sql.OpenSearchSparkSQL
import org.opensearch.spark.sql.ScalaOpenSearchRow
import org.opensearch.spark.sql.api.java.JavaOpenSearchSparkSQL
import org.opensearch.spark.sql.sparkDatasetFunctions
import org.opensearch.spark.sql.sqlContextFunctions
import org.hamcrest.Matchers.containsString
import org.hamcrest.Matchers.is
import org.hamcrest.Matchers.not
Expand Down Expand Up @@ -715,10 +715,10 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val index = wrapIndex("sparksql-test-scala-basic-write")
val (target, _) = makeTargets(index, "data")

val dfNoQuery = JavaEsSparkSQL.esDF(sqc, target, cfg.asJava)
val dfNoQuery = JavaOpenSearchSparkSQL.esDF(sqc, target, cfg.asJava)
val query = s"""{ "query" : { "query_string" : { "query" : "name:me*" } } //, "fields" : ["name"]
}"""
val dfWQuery = JavaEsSparkSQL.esDF(sqc, target, query, cfg.asJava)
val dfWQuery = JavaOpenSearchSparkSQL.esDF(sqc, target, query, cfg.asJava)

println(dfNoQuery.head())
println(dfWQuery.head())
Expand All @@ -730,7 +730,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus

@Test
def testEsDataFrame3WriteWithRichMapping() {
val path = Paths.get(AbstractScalaEsScalaSparkSQL.testData.sampleArtistsDatUri())
val path = Paths.get(AbstractScalaOpenSearchScalaSparkSQL.testData.sampleArtistsDatUri())
// because Windows...
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala

Expand Down Expand Up @@ -1442,8 +1442,8 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val (target, _) = makeTargets(index, "data")
val table = wrapIndex("save_mode_append")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Append).save(target)
val df = EsSparkSQL.esDF(sqc, target)
srcFrame.write.format("org.opensearch.spark.sql").mode(SaveMode.Append).save(target)
val df = OpenSearchSparkSQL.esDF(sqc, target)

assertEquals(3, df.count())
srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Append).save(target)
Expand All @@ -1457,8 +1457,8 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val (target, _) = makeTargets(index, "data")
val table = wrapIndex("save_mode_overwrite")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).save(target)
val df = EsSparkSQL.esDF(sqc, target)
srcFrame.write.format("org.opensearch.spark.sql").mode(SaveMode.Overwrite).save(target)
val df = OpenSearchSparkSQL.esDF(sqc, target)

assertEquals(3, df.count())
srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).save(target)
Expand All @@ -1471,11 +1471,11 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val index = wrapIndex("sparksql-test-savemode_overwrite_id")
val (target, _) = makeTargets(index, "data")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(target)
val df = EsSparkSQL.esDF(sqc, target)
srcFrame.write.format("org.opensearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(target)
val df = OpenSearchSparkSQL.esDF(sqc, target)

assertEquals(3, df.count())
srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(target)
srcFrame.write.format("org.opensearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(target)
assertEquals(3, df.count())
}

Expand All @@ -1486,8 +1486,8 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val (target, docPath) = makeTargets(index, "data")
val table = wrapIndex("save_mode_ignore")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Ignore).save(target)
val df = EsSparkSQL.esDF(sqc, target)
srcFrame.write.format("org.opensearch.spark.sql").mode(SaveMode.Ignore).save(target)
val df = OpenSearchSparkSQL.esDF(sqc, target)

assertEquals(3, df.count())
// should not go through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
}
rr.close()
}
EsSparkSQL.saveToEs(data, parameters)
OpenSearchSparkSQL.saveToEs(data, parameters)
}

def isEmpty(): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.spark.sql
package org.opensearch.spark.sql

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.elasticsearch.spark.rdd.EsRDDWriter
import org.opensearch.spark.rdd.OpenSearchRDDWriter
import org.opensearch.hadoop.serialization.{BytesConverter, JdkBytesConverter}
import org.opensearch.hadoop.serialization.builder.ValueWriter
import org.opensearch.hadoop.serialization.field.FieldExtractor

private[spark] class EsDataFrameWriter
private[spark] class OpenSearchDataFrameWriter
(schema: StructType, override val serializedSettings: String)
extends EsRDDWriter[Row](serializedSettings:String) {
extends OpenSearchRDDWriter[Row](serializedSettings:String) {

override protected def valueWriter: Class[_ <: ValueWriter[_]] = classOf[DataFrameValueWriter]
override protected def bytesConverter: Class[_ <: BytesConverter] = classOf[JdkBytesConverter]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.spark.sql
package org.opensearch.spark.sql

import org.apache.commons.logging.LogFactory
import org.apache.spark.sql.DataFrame
Expand All @@ -26,7 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.opensearch.hadoop.cfg.ConfigurationOptions.ES_QUERY
import org.opensearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ
import org.opensearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.opensearch.spark.cfg.SparkSettingsManager
import org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException
import org.opensearch.hadoop.cfg.PropertiesSettings
import org.opensearch.hadoop.mr.security.HadoopUserProvider
Expand All @@ -37,11 +37,11 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import scala.collection.Map

object EsSparkSQL {
object OpenSearchSparkSQL {

private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }
private val init = { ObjectUtils.loadClass("org.opensearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }

@transient private[this] val LOG = LogFactory.getLog(EsSparkSQL.getClass)
@transient private[this] val LOG = LogFactory.getLog(OpenSearchSparkSQL.getClass)

//
// Read
Expand Down Expand Up @@ -100,7 +100,7 @@ object EsSparkSQL {
InitializationUtils.checkIdForOperation(esCfg)
InitializationUtils.checkIndexExistence(esCfg)

sparkCtx.runJob(srdd.toDF().rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)
sparkCtx.runJob(srdd.toDF().rdd, new OpenSearchDataFrameWriter(srdd.schema, esCfg.save()).write _)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[sql] trait RowValueReader extends SettingsAware {
}
}

def addToBuffer(esRow: ScalaEsRow, key: AnyRef, value: Any): Unit = {
def addToBuffer(esRow: ScalaOpenSearchRow, key: AnyRef, value: Any): Unit = {
val pos = esRow.rowOrder.indexOf(key.toString())
if (pos < 0 || pos >= esRow.values.size) {
// geo types allow fields which are ignored - need to skip these if they are not part of the schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.Row

private[spark] class ScalaEsRow(private[spark] val rowOrder: Seq[String]) extends Row {
private[spark] class ScalaOpenSearchRow(private[spark] val rowOrder: Seq[String]) extends Row {

lazy private[spark] val values: ArrayBuffer[Any] = ArrayBuffer.fill(rowOrder.size)(null)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ private[spark] class ScalaEsRowRDDIterator(

override def createValue(value: Array[Object]): Row = {
// drop the ID
value(1).asInstanceOf[ScalaEsRow]
value(1).asInstanceOf[ScalaOpenSearchRow]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu
}
else rowColumns(sparkRowField)

new ScalaEsRow(rowOrd)
new ScalaOpenSearchRow(rowOrd)
}
}

Expand Down Expand Up @@ -114,7 +114,7 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu
override def addToMap(map: AnyRef, key: AnyRef, value: Any) = {
map match {
case m: Map[_, _] => super.addToMap(map, key, value)
case r: ScalaEsRow => addToBuffer(r, key, value)
case r: ScalaOpenSearchRow => addToBuffer(r, key, value)
}
}

Expand Down

0 comments on commit 4a93821

Please sign in to comment.