Skip to content

Commit

Permalink
[Test] ignore strict pushdown tests (#42)
Browse files Browse the repository at this point in the history
Ignoring wildcard query tests until upstream bugfix #5461 lands. Also refactors
esDataSource and EsPigInputFormat to OpenSearch name.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize committed Dec 7, 2022
1 parent 41f2c2a commit b15f47c
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@


@SuppressWarnings("rawtypes")
public class EsPigInputFormat extends EsInputFormat<String, Object> {
public class OpenSearchPigInputFormat extends EsInputFormat<String, Object> {

protected static abstract class AbstractPigEsInputRecordReader<V> extends EsInputRecordReader<String, V> {
public AbstractPigEsInputRecordReader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public String relativeToAbsolutePath(String location, Path curDir) throws IOExce
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException {
return new EsPigInputFormat();
return new OpenSearchPigInputFormat();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,21 +488,13 @@ class AbstractScalaOpenSearchScalaSpark(prefix: String, readMetadata: jl.Boolean
| }
|}""".stripMargin)

// // Upsert a value that should only modify the first document. Modification will add an address entry.
// val lines = sc.makeRDD(List("""{"id":"1","address":{"zipcode":"12345","id":"1"}}"""))
// val up_params = "new_address:address"
// val up_script = {
// "ctx._source.address.add(params.new_address)"
// }
// lines.saveToEs(target, props + ("opensearch.update.script.params" -> up_params) + ("opensearch.update.script" -> up_script))

// // Upsert a value that should only modify the second document. Modification will update the "note" field.
// val notes = sc.makeRDD(List("""{"id":"2","note":"Second"}"""))
// val note_up_params = "new_note:note"
// val note_up_script = {
// "ctx._source.note = params.new_note"
// }
// notes.saveToEs(target, props + ("opensearch.update.script.params" -> note_up_params) + ("opensearch.update.script" -> note_up_script))
val index = wrapIndex("spark-test-stored")
val typename = "data"
val target = resource(index, typename, version)
val docPath = docEndpoint(index, typename, version)
RestUtils.touch(index)
RestUtils.putMapping(index, typename, mapping.getBytes())
RestUtils.put(s"$docPath/1", """{"id":"1", "counter":5}""".getBytes(StringUtils.UTF_8))

val scriptName = "increment"
val lang = "painless"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@ import org.opensearch.spark.sql.sqlContextFunctions
import org.hamcrest.Matchers.containsString
import org.hamcrest.Matchers.is
import org.hamcrest.Matchers.not
import org.junit.AfterClass
import org.junit.{AfterClass, BeforeClass, ClassRule, FixMethodOrder, Ignore, Test}
import org.junit.Assert._
import org.junit.Assume._
import org.junit.BeforeClass
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
Expand All @@ -78,7 +75,6 @@ import org.apache.spark.rdd.RDD

import javax.xml.bind.DatatypeConverter
import org.apache.spark.sql.types.DoubleType
import org.junit.ClassRule
import org.opensearch.hadoop.rest.RestUtils
import org.opensearch.hadoop.{OpenSearchAssume, OpenSearchHadoopIllegalArgumentException, OpenSearchHadoopIllegalStateException, TestData}
import org.opensearch.hadoop.util.{OpenSearchMajorVersion, StringUtils, TestSettings, TestUtils}
Expand Down Expand Up @@ -1014,6 +1010,8 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool
}

@Test
@Ignore
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/opensearch-hadoop/issues/41")
def testDataSourcePushDown09StartsWith() {
val df = opensearchDataSource("pd_starts_with")
var filter = df.filter(df("airport").startsWith("O"))
Expand All @@ -1034,6 +1032,8 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool
}

@Test
@Ignore
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/opensearch-hadoop/issues/41")
def testDataSourcePushDown10EndsWith() {
val df = opensearchDataSource("pd_ends_with")
var filter = df.filter(df("airport").endsWith("O"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,14 @@ import org.opensearch.spark.sql.sqlContextFunctions
import org.hamcrest.Matchers.containsString
import org.hamcrest.Matchers.is
import org.hamcrest.Matchers.not
import org.junit.AfterClass
import org.junit.{AfterClass, BeforeClass, ClassRule, FixMethodOrder, Ignore, Test}
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertThat
import org.junit.Assert.assertTrue
import org.junit.Assert.fail
import org.junit.Assume.assumeFalse
import org.junit.Assume.assumeTrue
import org.junit.BeforeClass
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.MethodSorters
import org.junit.runners.Parameterized
Expand All @@ -96,7 +93,6 @@ import org.apache.spark.rdd.RDD
import javax.xml.bind.DatatypeConverter
import org.apache.spark.sql.SparkSession
import org.junit.Assert._
import org.junit.ClassRule
import org.opensearch.hadoop.{OpenSearchAssume, OpenSearchHadoopIllegalArgumentException, OpenSearchHadoopIllegalStateException, TestData}
import org.opensearch.hadoop.cfg.ConfigurationOptions
import org.opensearch.hadoop.rest.RestUtils
Expand Down Expand Up @@ -915,7 +911,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool
sc.makeRDD(Seq(trip1, trip2, trip3)).saveToEs(target)
}

private def esDataSource(table: String) = {
private def opensearchDataSource(table: String) = {
val index = wrapIndex("spark-test-scala-sql-varcols")
val (target, _) = makeTargets(index, "data")

Expand All @@ -932,7 +928,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown01EqualTo() {
val df = esDataSource("pd_equalto")
val df = opensearchDataSource("pd_equalto")
val filter = df.filter(df("airport").equalTo("OTP"))

filter.show
Expand All @@ -954,7 +950,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown015NullSafeEqualTo() {
val df = esDataSource("pd_nullsafeequalto")
val df = opensearchDataSource("pd_nullsafeequalto")
val filter = df.filter(df("airport").eqNullSafe("OTP"))

filter.show
Expand All @@ -976,23 +972,23 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown02GT() {
val df = esDataSource("pd_gt")
val df = opensearchDataSource("pd_gt")
val filter = df.filter(df("participants").gt(3))
assertEquals(1, filter.count())
assertEquals("feb", filter.select("tag").take(1)(0)(0))
}

@Test
def testDataSourcePushDown03GTE() {
val df = esDataSource("pd_gte")
val df = opensearchDataSource("pd_gte")
val filter = df.filter(df("participants").geq(3))
assertEquals(2, filter.count())
assertEquals("long", filter.select("tag").sort("tag").take(2)(1)(0))
}

@Test
def testDataSourcePushDown04LT() {
val df = esDataSource("pd_lt")
val df = opensearchDataSource("pd_lt")
df.printSchema()
val filter = df.filter(df("participants").lt(5))
assertEquals(1, filter.count())
Expand All @@ -1001,31 +997,31 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown05LTE() {
val df = esDataSource("pd_lte")
val df = opensearchDataSource("pd_lte")
val filter = df.filter(df("participants").leq(5))
assertEquals(2, filter.count())
assertEquals("long", filter.select("tag").sort("tag").take(2)(1)(0))
}

@Test
def testDataSourcePushDown06IsNull() {
val df = esDataSource("pd_is_null")
val df = opensearchDataSource("pd_is_null")
val filter = df.filter(df("participants").isNull)
assertEquals(1, filter.count())
assertEquals("jan", filter.select("tag").take(1)(0)(0))
}

@Test
def testDataSourcePushDown07IsNotNull() {
val df = esDataSource("pd_is_not_null")
val df = opensearchDataSource("pd_is_not_null")
val filter = df.filter(df("reason").isNotNull)
assertEquals(1, filter.count())
assertEquals("jan", filter.select("tag").take(1)(0)(0))
}

@Test
def testDataSourcePushDown08In() {
val df = esDataSource("pd_in")
val df = opensearchDataSource("pd_in")
var filter = df.filter("airport IN ('OTP', 'SFO', 'MUC')")

if (strictPushDown) {
Expand All @@ -1040,7 +1036,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown08InWithNumbersAsStrings() {
val df = esDataSource("pd_in_numbers_strings")
val df = opensearchDataSource("pd_in_numbers_strings")
var filter = df.filter("date IN ('2015-12-28', '2012-12-28')")

if (strictPushDown) {
Expand All @@ -1054,7 +1050,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown08InWithNumber() {
val df = esDataSource("pd_in_number")
val df = opensearchDataSource("pd_in_number")
var filter = df.filter("participants IN (1, 2, 3)")

assertEquals(1, filter.count())
Expand All @@ -1063,15 +1059,17 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown08InWithNumberAndStrings() {
val df = esDataSource("pd_in_number")
val df = opensearchDataSource("pd_in_number")
var filter = df.filter("participants IN (2, 'bar', 1, 'foo')")

assertEquals(0, filter.count())
}

@Test
@Ignore
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/opensearch-hadoop/issues/41")
def testDataSourcePushDown09StartsWith() {
val df = esDataSource("pd_starts_with")
val df = opensearchDataSource("pd_starts_with")
var filter = df.filter(df("airport").startsWith("O"))

if (!keepHandledFilters && !strictPushDown) {
Expand All @@ -1090,8 +1088,10 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool
}

@Test
@Ignore
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/opensearch-hadoop/issues/41")
def testDataSourcePushDown10EndsWith() {
val df = esDataSource("pd_ends_with")
val df = opensearchDataSource("pd_ends_with")
var filter = df.filter(df("airport").endsWith("O"))

if (!keepHandledFilters && !strictPushDown) {
Expand All @@ -1111,15 +1111,15 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown11Contains() {
val df = esDataSource("pd_contains")
val df = opensearchDataSource("pd_contains")
val filter = df.filter(df("reason").contains("us"))
assertEquals(1, filter.count())
assertEquals("jan", filter.select("tag").take(1)(0)(0))
}

@Test
def testDataSourcePushDown12And() {
val df = esDataSource("pd_and")
val df = opensearchDataSource("pd_and")
var filter = df.filter(df("reason").isNotNull.and(df("tag").equalTo("jan")))

assertEquals(1, filter.count())
Expand All @@ -1128,7 +1128,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown13Not() {
val df = esDataSource("pd_not")
val df = opensearchDataSource("pd_not")
val filter = df.filter(!df("reason").isNull)

assertEquals(1, filter.count())
Expand All @@ -1137,7 +1137,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown14OR() {
val df = esDataSource("pd_or")
val df = opensearchDataSource("pd_or")
var filter = df.filter(df("reason").contains("us").or(df("airport").equalTo("OTP")))

if (strictPushDown) {
Expand All @@ -1159,7 +1159,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool
@Test
def testEsSchemaFromDocsWithDifferentProperties() {
val table = wrapIndex("sqlvarcol")
esDataSource(table)
opensearchDataSource(table)

val index = wrapIndex("spark-test-scala-sql-varcols")
val (target, _) = makeTargets(index, "data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class AbstractScalaOpenSearchSparkStructuredStreaming(prefix: String, something:
.getOrElse(throw new OpenSearchHadoopIllegalStateException("Spark not started..."))
val version: OpenSearchMajorVersion = TestUtils.getOpenSearchClusterInfo.getMajorVersion

import org.opensearch.spark.integration.Products._
import spark.implicits._

def wrapIndex(name: String): String = {
Expand Down Expand Up @@ -255,7 +256,7 @@ class AbstractScalaOpenSearchSparkStructuredStreaming(prefix: String, something:
.runTest {
test.stream
.writeStream
.option(SparkSqlStreamingConfigs.ES_SINK_LOG_ENABLE, "false")
.option(SparkSqlStreamingConfigs.OPENSEARCH_SINK_LOG_ENABLE, "false")
.option("checkpointLocation", checkpoint(target))
.format("opensearch")
.start(target)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,15 +423,17 @@ private[sql] case class OpenSearchRelation(parameters: Map[String, String], @tra
val x = f.productElement(1).toString()
if (!strictPushDown) x.toLowerCase(Locale.ROOT) else x
}
s"""{"wildcard":{"${f.productElement(0)}":"$arg*"}}"""
val strict = strictPushDown == false
s"""{"wildcard":{"${f.productElement(0)}":{"value":"$arg*","case_insensitive":$strict}}}"""
}

case f:Product if isClass(f, "org.apache.spark.sql.sources.StringEndsWith") => {
val arg = {
val x = f.productElement(1).toString()
if (!strictPushDown) x.toLowerCase(Locale.ROOT) else x
}
s"""{"wildcard":{"${f.productElement(0)}":"*$arg"}}"""
val strict = !strictPushDown
s"""{"wildcard":{"${f.productElement(0)}":{"value":"*$arg","case_insensitive":$strict}}}"""
}

case f:Product if isClass(f, "org.apache.spark.sql.sources.StringContains") => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class AbstractJavaOpenSearchSparkStreamingTest implements Serializable {
private static final transient SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("opensearchtest")
.setJars(SparkUtils.ES_SPARK_TESTING_JAR);
.setJars(SparkUtils.OPENSEARCH_SPARK_TESTING_JAR);

private static transient JavaSparkContext sc = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

public abstract class SparkUtils {

public static final String[] ES_SPARK_TESTING_JAR = new String[] {Provisioner.OPENSEARCHHADOOP_TESTING_JAR};
public static final String[] OPENSEARCH_SPARK_TESTING_JAR = new String[] {Provisioner.OPENSEARCHHADOOP_TESTING_JAR};

public static Kryo sparkSerializer(SparkConf conf) throws Exception {
// reflection galore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class AbstractJavaOpenSearchSparkStructuredStreamingTest {
private static final transient SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("opensearch-structured-streaming-test")
.setJars(SparkUtils.ES_SPARK_TESTING_JAR);
.setJars(SparkUtils.OPENSEARCH_SPARK_TESTING_JAR);

private static transient SparkSession spark = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object AbstractScalaOpenSearchScalaSparkStreaming {
.setMaster("local")
.setAppName("opensearchtest")
.set("spark.executor.extraJavaOptions", "-XX:MaxPermSize=256m")
.setJars(SparkUtils.ES_SPARK_TESTING_JAR)
.setJars(SparkUtils.OPENSEARCH_SPARK_TESTING_JAR)
@transient var sc: SparkContext = null
@transient var ssc: StreamingContext = null

Expand Down

0 comments on commit b15f47c

Please sign in to comment.