Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] spark-sql-30 from legacy to opensearch namespace #18

Merged
merged 1 commit into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,16 +301,16 @@ Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
JavaEsSpark.saveToEs(javaRDD, "spark/docs");
JavaOpenSearchSpark.saveToEs(javaRDD, "spark/docs");
```

#### Spark SQL

```java
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;
import org.opensearch.spark.sql.api.java.JavaOpenSearchSparkSQL;

DataFrame df = sqlContext.read.json("examples/people.json")
JavaEsSparkSQL.saveToEs(df, "spark/docs")
JavaOpenSearchSparkSQL.saveToEs(df, "spark/docs")
```

## [Apache Storm][]
Expand Down
6 changes: 3 additions & 3 deletions qa/kerberos/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ dependencies {

implementation("org.scala-lang:scala-library:${scala212Version}")
implementation("org.scala-lang:scala-reflect:${scala212Version}")
implementation(project(":elasticsearch-spark-30")) {
implementation(project(":opensearch-spark-30")) {
capabilities {
// Spark 3.x on Scala 2.12
requireCapability("org.elasticsearch.spark.sql.variant:spark30scala212:$project.version")
Expand Down Expand Up @@ -259,7 +259,7 @@ if (disableTests) {
def mrJar = project(':opensearch-hadoop-mr').tasks.getByName('jar') as Jar
def hiveJar = project(':opensearch-hadoop-hive').tasks.getByName('jar') as Jar
def pigJar = project(':opensearch-hadoop-pig').tasks.getByName('jar') as Jar
def sparkJar = project(':elasticsearch-spark-30').tasks.getByName('jar') as Jar
def sparkJar = project(':opensearch-spark-30').tasks.getByName('jar') as Jar

// Need these for SSL items, test data, and scripts
File resourceDir = project.sourceSets.main.resources.getSrcDirs().head()
Expand All @@ -270,7 +270,7 @@ if (disableTests) {
doLast {
project.javaexec {
executable = project.runtimeJavaHome.toString() + "/bin/java"
mainClass = 'org.elasticsearch.hadoop.qa.kerberos.setup.SetupKerberosUsers'
mainClass = 'org.opensearch.hadoop.qa.kerberos.setup.SetupKerberosUsers'
classpath = sourceSets.main.runtimeClasspath
systemProperty('opensearch.nodes', opensearchAddress)
systemProperty('es.net.http.auth.user', 'test_admin')
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ project(":sql-20").name = "opensearch-spark-20"

include 'sql-30'
project(":sql-30").projectDir = new File(settingsDir, "spark/sql-30")
project(":sql-30").name = "elasticsearch-spark-30"
project(":sql-30").name = "opensearch-spark-30"

include 'storm'
project(":storm").name = "elasticsearch-storm"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void testEsSchemaRDD1WriteWithMappingExclude() throws Exception {
public void testEsdataFrame2Read() throws Exception {
String target = resource("sparksql-test-scala-basic-write", "data", version);

// DataFrame dataFrame = JavaEsSparkSQL.esDF(sqc, target);
// DataFrame dataFrame = JavaOpenSearchSparkSQL.esDF(sqc, target);
DataFrame dataFrame = sqc.read().format("es").load(target);
assertTrue(dataFrame.count() > 300);
String schema = dataFrame.schema().treeString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.opensearch.hadoop.util.OpenSearchMajorVersion;
import org.elasticsearch.hadoop.util.TestSettings;
import org.elasticsearch.hadoop.util.TestUtils;
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;
import org.opensearch.spark.sql.api.java.JavaOpenSearchSparkSQL;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand All @@ -66,7 +66,7 @@
import static scala.collection.JavaConversions.*;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class AbstractJavaEsSparkSQLTest implements Serializable {
public class AbstractJavaOpenSearchSparkSQLTest implements Serializable {

private static final transient SparkConf conf = new SparkConf()
.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS))
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testEsDataset1Write() throws Exception {
Dataset<Row> dataset = artistsAsDataset();

String target = resource("sparksql-test-scala-basic-write", "data", version);
JavaEsSparkSQL.saveToEs(dataset, target);
JavaOpenSearchSparkSQL.saveToEs(dataset, target);
assertTrue(RestUtils.exists(target));
assertThat(RestUtils.get(target + "/_search?"), containsString("345"));
}
Expand All @@ -122,7 +122,7 @@ public void testEsDataset1WriteWithId() throws Exception {
String target = resource("sparksql-test-scala-basic-write-id-mapping", "data", version);
String docEndpoint = docEndpoint("sparksql-test-scala-basic-write-id-mapping", "data", version);

JavaEsSparkSQL.saveToEs(dataset, target,
JavaOpenSearchSparkSQL.saveToEs(dataset, target,
ImmutableMap.of(ES_MAPPING_ID, "id"));
assertTrue(RestUtils.exists(target));
assertThat(RestUtils.get(target + "/_search?"), containsString("345"));
Expand All @@ -134,7 +134,7 @@ public void testEsSchemaRDD1WriteWithMappingExclude() throws Exception {
Dataset<Row> dataset = artistsAsDataset();

String target = resource("sparksql-test-scala-basic-write-exclude-mapping", "data", version);
JavaEsSparkSQL.saveToEs(dataset, target,
JavaOpenSearchSparkSQL.saveToEs(dataset, target,
ImmutableMap.of(ES_MAPPING_EXCLUDE, "url"));
assertTrue(RestUtils.exists(target));
assertThat(RestUtils.get(target + "/_search?"), not(containsString("url")));
Expand All @@ -144,7 +144,7 @@ public void testEsSchemaRDD1WriteWithMappingExclude() throws Exception {
public void testEsDataset2Read() throws Exception {
String target = resource("sparksql-test-scala-basic-write", "data", version);

// Dataset<Row> dataset = JavaEsSparkSQL.esDF(sqc, target);
// Dataset<Row> dataset = JavaOpenSearchSparkSQL.esDF(sqc, target);
Dataset<Row> dataset = sqc.read().format("es").load(target);
assertTrue(dataset.count() > 300);
String schema = dataset.schema().treeString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(Parameterized.class)
public class AbstractJavaEsSparkStreamingTest implements Serializable {
public class AbstractJavaOpenSearchSparkStreamingTest implements Serializable {

private static final transient SparkConf conf = new SparkConf()
.setMaster("local")
Expand Down Expand Up @@ -118,7 +118,7 @@ public static void clean() throws Exception {
private JavaStreamingContext ssc = null;
private OpenSearchMajorVersion version = TestUtils.getOpenSearchClusterInfo().getMajorVersion();

public AbstractJavaEsSparkStreamingTest(String prefix, boolean readMetadata) {
public AbstractJavaOpenSearchSparkStreamingTest(String prefix, boolean readMetadata) {
this.prefix = prefix;
this.cfg.put(ES_READ_METADATA, Boolean.toString(readMetadata));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.junit.runners.Suite;

@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractJavaEsSparkSQLTest.class })
@Suite.SuiteClasses({ AbstractJavaOpenSearchSparkSQLTest.class })
public class SparkSQLSuite {

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.junit.runners.Suite;

@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractJavaEsSparkStreamingTest.class })
@Suite.SuiteClasses({ AbstractJavaOpenSearchSparkStreamingTest.class })
public class SparkStreamingSuite {

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.opensearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.TestSettings;
import org.elasticsearch.hadoop.util.TestUtils;
import org.opensearch.spark.rdd.api.java.JavaEsSpark;
import org.opensearch.spark.rdd.api.java.JavaOpenSearchSpark;
import org.opensearch.spark.sql.streaming.api.java.JavaStreamingQueryTestHarness;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ import org.elasticsearch.hadoop.util.TestSettings
import org.elasticsearch.hadoop.util.TestUtils
import org.elasticsearch.hadoop.util.TestUtils.resource
import org.elasticsearch.hadoop.util.TestUtils.docEndpoint
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.spark.sparkRDDFunctions
import org.elasticsearch.spark.sparkStringJsonRDDFunctions
import org.elasticsearch.spark.sql.EsSparkSQL
import org.elasticsearch.spark.sql.ScalaEsRow
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL
import org.elasticsearch.spark.sql.sparkDatasetFunctions
import org.elasticsearch.spark.sql.sqlContextFunctions
import org.opensearch.spark.cfg.SparkSettingsManager
import org.opensearch.spark.sparkRDDFunctions
import org.opensearch.spark.sparkStringJsonRDDFunctions
import org.opensearch.spark.sql.OpenSearchSparkSQL
import org.opensearch.spark.sql.ScalaOpenSearchRow
import org.opensearch.spark.sql.api.java.JavaOpenSearchSparkSQL
import org.opensearch.spark.sql.sparkDatasetFunctions
import org.opensearch.spark.sql.sqlContextFunctions
import org.hamcrest.Matchers.containsString
import org.hamcrest.Matchers.is
import org.hamcrest.Matchers.not
Expand Down Expand Up @@ -715,10 +715,10 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val index = wrapIndex("sparksql-test-scala-basic-write")
val (target, _) = makeTargets(index, "data")

val dfNoQuery = JavaEsSparkSQL.esDF(sqc, target, cfg.asJava)
val dfNoQuery = JavaOpenSearchSparkSQL.esDF(sqc, target, cfg.asJava)
val query = s"""{ "query" : { "query_string" : { "query" : "name:me*" } } //, "fields" : ["name"]
}"""
val dfWQuery = JavaEsSparkSQL.esDF(sqc, target, query, cfg.asJava)
val dfWQuery = JavaOpenSearchSparkSQL.esDF(sqc, target, query, cfg.asJava)

println(dfNoQuery.head())
println(dfWQuery.head())
Expand All @@ -730,7 +730,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus

@Test
def testEsDataFrame3WriteWithRichMapping() {
val path = Paths.get(AbstractScalaEsScalaSparkSQL.testData.sampleArtistsDatUri())
val path = Paths.get(AbstractScalaOpenSearchScalaSparkSQL.testData.sampleArtistsDatUri())
// because Windows...
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala

Expand Down Expand Up @@ -1442,8 +1442,8 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val (target, _) = makeTargets(index, "data")
val table = wrapIndex("save_mode_append")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Append).save(target)
val df = EsSparkSQL.esDF(sqc, target)
srcFrame.write.format("org.opensearch.spark.sql").mode(SaveMode.Append).save(target)
val df = OpenSearchSparkSQL.esDF(sqc, target)

assertEquals(3, df.count())
srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Append).save(target)
Expand All @@ -1457,8 +1457,8 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val (target, _) = makeTargets(index, "data")
val table = wrapIndex("save_mode_overwrite")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).save(target)
val df = EsSparkSQL.esDF(sqc, target)
srcFrame.write.format("org.opensearch.spark.sql").mode(SaveMode.Overwrite).save(target)
val df = OpenSearchSparkSQL.esDF(sqc, target)

assertEquals(3, df.count())
srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).save(target)
Expand All @@ -1471,11 +1471,11 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val index = wrapIndex("sparksql-test-savemode_overwrite_id")
val (target, _) = makeTargets(index, "data")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(target)
val df = EsSparkSQL.esDF(sqc, target)
srcFrame.write.format("org.opensearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(target)
val df = OpenSearchSparkSQL.esDF(sqc, target)

assertEquals(3, df.count())
srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(target)
srcFrame.write.format("org.opensearch.spark.sql").mode(SaveMode.Overwrite).option("es.mapping.id", "number").save(target)
assertEquals(3, df.count())
}

Expand All @@ -1486,8 +1486,8 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val (target, docPath) = makeTargets(index, "data")
val table = wrapIndex("save_mode_ignore")

srcFrame.write.format("org.elasticsearch.spark.sql").mode(SaveMode.Ignore).save(target)
val df = EsSparkSQL.esDF(sqc, target)
srcFrame.write.format("org.opensearch.spark.sql").mode(SaveMode.Ignore).save(target)
val df = OpenSearchSparkSQL.esDF(sqc, target)

assertEquals(3, df.count())
// should not go through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
}
rr.close()
}
EsSparkSQL.saveToEs(data, parameters)
OpenSearchSparkSQL.saveToEs(data, parameters)
}

def isEmpty(): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.spark.sql
package org.opensearch.spark.sql

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.elasticsearch.spark.rdd.EsRDDWriter
import org.opensearch.spark.rdd.OpenSearchRDDWriter
import org.opensearch.hadoop.serialization.{BytesConverter, JdkBytesConverter}
import org.opensearch.hadoop.serialization.builder.ValueWriter
import org.opensearch.hadoop.serialization.field.FieldExtractor

private[spark] class EsDataFrameWriter
private[spark] class OpenSearchDataFrameWriter
(schema: StructType, override val serializedSettings: String)
extends EsRDDWriter[Row](serializedSettings:String) {
extends OpenSearchRDDWriter[Row](serializedSettings:String) {

override protected def valueWriter: Class[_ <: ValueWriter[_]] = classOf[DataFrameValueWriter]
override protected def bytesConverter: Class[_ <: BytesConverter] = classOf[JdkBytesConverter]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.spark.sql
package org.opensearch.spark.sql

import org.apache.commons.logging.LogFactory
import org.apache.spark.sql.DataFrame
Expand All @@ -26,7 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.opensearch.hadoop.cfg.ConfigurationOptions.ES_QUERY
import org.opensearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ
import org.opensearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.opensearch.spark.cfg.SparkSettingsManager
import org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException
import org.opensearch.hadoop.cfg.PropertiesSettings
import org.opensearch.hadoop.mr.security.HadoopUserProvider
Expand All @@ -37,11 +37,11 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import scala.collection.Map

object EsSparkSQL {
object OpenSearchSparkSQL {

private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }
private val init = { ObjectUtils.loadClass("org.opensearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }

@transient private[this] val LOG = LogFactory.getLog(EsSparkSQL.getClass)
@transient private[this] val LOG = LogFactory.getLog(OpenSearchSparkSQL.getClass)

//
// Read
Expand Down Expand Up @@ -100,7 +100,7 @@ object EsSparkSQL {
InitializationUtils.checkIdForOperation(esCfg)
InitializationUtils.checkIndexExistence(esCfg)

sparkCtx.runJob(srdd.toDF().rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)
sparkCtx.runJob(srdd.toDF().rdd, new OpenSearchDataFrameWriter(srdd.schema, esCfg.save()).write _)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[sql] trait RowValueReader extends SettingsAware {
}
}

def addToBuffer(esRow: ScalaEsRow, key: AnyRef, value: Any): Unit = {
def addToBuffer(esRow: ScalaOpenSearchRow, key: AnyRef, value: Any): Unit = {
val pos = esRow.rowOrder.indexOf(key.toString())
if (pos < 0 || pos >= esRow.values.size) {
// geo types allow fields which are ignored - need to skip these if they are not part of the schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.Row

private[spark] class ScalaEsRow(private[spark] val rowOrder: Seq[String]) extends Row {
private[spark] class ScalaOpenSearchRow(private[spark] val rowOrder: Seq[String]) extends Row {

lazy private[spark] val values: ArrayBuffer[Any] = ArrayBuffer.fill(rowOrder.size)(null)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ private[spark] class ScalaEsRowRDDIterator(

override def createValue(value: Array[Object]): Row = {
// drop the ID
value(1).asInstanceOf[ScalaEsRow]
value(1).asInstanceOf[ScalaOpenSearchRow]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu
}
else rowColumns(sparkRowField)

new ScalaEsRow(rowOrd)
new ScalaOpenSearchRow(rowOrd)
}
}

Expand Down Expand Up @@ -114,7 +114,7 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu
override def addToMap(map: AnyRef, key: AnyRef, value: Any) = {
map match {
case m: Map[_, _] => super.addToMap(map, key, value)
case r: ScalaEsRow => addToBuffer(r, key, value)
case r: ScalaOpenSearchRow => addToBuffer(r, key, value)
}
}

Expand Down