Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Test] ignore strict pushdown tests #42

Merged
merged 1 commit into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@


@SuppressWarnings("rawtypes")
public class EsPigInputFormat extends EsInputFormat<String, Object> {
public class OpenSearchPigInputFormat extends EsInputFormat<String, Object> {

protected static abstract class AbstractPigEsInputRecordReader<V> extends EsInputRecordReader<String, V> {
public AbstractPigEsInputRecordReader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public String relativeToAbsolutePath(String location, Path curDir) throws IOExce
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException {
return new EsPigInputFormat();
return new OpenSearchPigInputFormat();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,21 +488,13 @@ class AbstractScalaOpenSearchScalaSpark(prefix: String, readMetadata: jl.Boolean
| }
|}""".stripMargin)

// // Upsert a value that should only modify the first document. Modification will add an address entry.
// val lines = sc.makeRDD(List("""{"id":"1","address":{"zipcode":"12345","id":"1"}}"""))
// val up_params = "new_address:address"
// val up_script = {
// "ctx._source.address.add(params.new_address)"
// }
// lines.saveToEs(target, props + ("opensearch.update.script.params" -> up_params) + ("opensearch.update.script" -> up_script))

// // Upsert a value that should only modify the second document. Modification will update the "note" field.
// val notes = sc.makeRDD(List("""{"id":"2","note":"Second"}"""))
// val note_up_params = "new_note:note"
// val note_up_script = {
// "ctx._source.note = params.new_note"
// }
// notes.saveToEs(target, props + ("opensearch.update.script.params" -> note_up_params) + ("opensearch.update.script" -> note_up_script))
val index = wrapIndex("spark-test-stored")
val typename = "data"
val target = resource(index, typename, version)
val docPath = docEndpoint(index, typename, version)
RestUtils.touch(index)
RestUtils.putMapping(index, typename, mapping.getBytes())
RestUtils.put(s"$docPath/1", """{"id":"1", "counter":5}""".getBytes(StringUtils.UTF_8))

val scriptName = "increment"
val lang = "painless"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@ import org.opensearch.spark.sql.sqlContextFunctions
import org.hamcrest.Matchers.containsString
import org.hamcrest.Matchers.is
import org.hamcrest.Matchers.not
import org.junit.AfterClass
import org.junit.{AfterClass, BeforeClass, ClassRule, FixMethodOrder, Ignore, Test}
import org.junit.Assert._
import org.junit.Assume._
import org.junit.BeforeClass
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
Expand All @@ -78,7 +75,6 @@ import org.apache.spark.rdd.RDD

import javax.xml.bind.DatatypeConverter
import org.apache.spark.sql.types.DoubleType
import org.junit.ClassRule
import org.opensearch.hadoop.rest.RestUtils
import org.opensearch.hadoop.{OpenSearchAssume, OpenSearchHadoopIllegalArgumentException, OpenSearchHadoopIllegalStateException, TestData}
import org.opensearch.hadoop.util.{OpenSearchMajorVersion, StringUtils, TestSettings, TestUtils}
Expand Down Expand Up @@ -1014,6 +1010,8 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool
}

@Test
@Ignore
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/opensearch-hadoop/issues/41")
def testDataSourcePushDown09StartsWith() {
val df = opensearchDataSource("pd_starts_with")
var filter = df.filter(df("airport").startsWith("O"))
Expand All @@ -1034,6 +1032,8 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool
}

@Test
@Ignore
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/opensearch-hadoop/issues/41")
def testDataSourcePushDown10EndsWith() {
val df = opensearchDataSource("pd_ends_with")
var filter = df.filter(df("airport").endsWith("O"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,14 @@ import org.opensearch.spark.sql.sqlContextFunctions
import org.hamcrest.Matchers.containsString
import org.hamcrest.Matchers.is
import org.hamcrest.Matchers.not
import org.junit.AfterClass
import org.junit.{AfterClass, BeforeClass, ClassRule, FixMethodOrder, Ignore, Test}
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertThat
import org.junit.Assert.assertTrue
import org.junit.Assert.fail
import org.junit.Assume.assumeFalse
import org.junit.Assume.assumeTrue
import org.junit.BeforeClass
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.MethodSorters
import org.junit.runners.Parameterized
Expand All @@ -96,7 +93,6 @@ import org.apache.spark.rdd.RDD
import javax.xml.bind.DatatypeConverter
import org.apache.spark.sql.SparkSession
import org.junit.Assert._
import org.junit.ClassRule
import org.opensearch.hadoop.{OpenSearchAssume, OpenSearchHadoopIllegalArgumentException, OpenSearchHadoopIllegalStateException, TestData}
import org.opensearch.hadoop.cfg.ConfigurationOptions
import org.opensearch.hadoop.rest.RestUtils
Expand Down Expand Up @@ -915,7 +911,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool
sc.makeRDD(Seq(trip1, trip2, trip3)).saveToEs(target)
}

private def esDataSource(table: String) = {
private def opensearchDataSource(table: String) = {
val index = wrapIndex("spark-test-scala-sql-varcols")
val (target, _) = makeTargets(index, "data")

Expand All @@ -932,7 +928,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown01EqualTo() {
val df = esDataSource("pd_equalto")
val df = opensearchDataSource("pd_equalto")
val filter = df.filter(df("airport").equalTo("OTP"))

filter.show
Expand All @@ -954,7 +950,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown015NullSafeEqualTo() {
val df = esDataSource("pd_nullsafeequalto")
val df = opensearchDataSource("pd_nullsafeequalto")
val filter = df.filter(df("airport").eqNullSafe("OTP"))

filter.show
Expand All @@ -976,23 +972,23 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown02GT() {
val df = esDataSource("pd_gt")
val df = opensearchDataSource("pd_gt")
val filter = df.filter(df("participants").gt(3))
assertEquals(1, filter.count())
assertEquals("feb", filter.select("tag").take(1)(0)(0))
}

@Test
def testDataSourcePushDown03GTE() {
val df = esDataSource("pd_gte")
val df = opensearchDataSource("pd_gte")
val filter = df.filter(df("participants").geq(3))
assertEquals(2, filter.count())
assertEquals("long", filter.select("tag").sort("tag").take(2)(1)(0))
}

@Test
def testDataSourcePushDown04LT() {
val df = esDataSource("pd_lt")
val df = opensearchDataSource("pd_lt")
df.printSchema()
val filter = df.filter(df("participants").lt(5))
assertEquals(1, filter.count())
Expand All @@ -1001,31 +997,31 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown05LTE() {
val df = esDataSource("pd_lte")
val df = opensearchDataSource("pd_lte")
val filter = df.filter(df("participants").leq(5))
assertEquals(2, filter.count())
assertEquals("long", filter.select("tag").sort("tag").take(2)(1)(0))
}

@Test
def testDataSourcePushDown06IsNull() {
val df = esDataSource("pd_is_null")
val df = opensearchDataSource("pd_is_null")
val filter = df.filter(df("participants").isNull)
assertEquals(1, filter.count())
assertEquals("jan", filter.select("tag").take(1)(0)(0))
}

@Test
def testDataSourcePushDown07IsNotNull() {
val df = esDataSource("pd_is_not_null")
val df = opensearchDataSource("pd_is_not_null")
val filter = df.filter(df("reason").isNotNull)
assertEquals(1, filter.count())
assertEquals("jan", filter.select("tag").take(1)(0)(0))
}

@Test
def testDataSourcePushDown08In() {
val df = esDataSource("pd_in")
val df = opensearchDataSource("pd_in")
var filter = df.filter("airport IN ('OTP', 'SFO', 'MUC')")

if (strictPushDown) {
Expand All @@ -1040,7 +1036,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown08InWithNumbersAsStrings() {
val df = esDataSource("pd_in_numbers_strings")
val df = opensearchDataSource("pd_in_numbers_strings")
var filter = df.filter("date IN ('2015-12-28', '2012-12-28')")

if (strictPushDown) {
Expand All @@ -1054,7 +1050,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown08InWithNumber() {
val df = esDataSource("pd_in_number")
val df = opensearchDataSource("pd_in_number")
var filter = df.filter("participants IN (1, 2, 3)")

assertEquals(1, filter.count())
Expand All @@ -1063,15 +1059,17 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown08InWithNumberAndStrings() {
val df = esDataSource("pd_in_number")
val df = opensearchDataSource("pd_in_number")
var filter = df.filter("participants IN (2, 'bar', 1, 'foo')")

assertEquals(0, filter.count())
}

@Test
@Ignore
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/opensearch-hadoop/issues/41")
def testDataSourcePushDown09StartsWith() {
val df = esDataSource("pd_starts_with")
val df = opensearchDataSource("pd_starts_with")
var filter = df.filter(df("airport").startsWith("O"))

if (!keepHandledFilters && !strictPushDown) {
Expand All @@ -1090,8 +1088,10 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool
}

@Test
@Ignore
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/opensearch-hadoop/issues/41")
def testDataSourcePushDown10EndsWith() {
val df = esDataSource("pd_ends_with")
val df = opensearchDataSource("pd_ends_with")
var filter = df.filter(df("airport").endsWith("O"))

if (!keepHandledFilters && !strictPushDown) {
Expand All @@ -1111,15 +1111,15 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown11Contains() {
val df = esDataSource("pd_contains")
val df = opensearchDataSource("pd_contains")
val filter = df.filter(df("reason").contains("us"))
assertEquals(1, filter.count())
assertEquals("jan", filter.select("tag").take(1)(0)(0))
}

@Test
def testDataSourcePushDown12And() {
val df = esDataSource("pd_and")
val df = opensearchDataSource("pd_and")
var filter = df.filter(df("reason").isNotNull.and(df("tag").equalTo("jan")))

assertEquals(1, filter.count())
Expand All @@ -1128,7 +1128,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown13Not() {
val df = esDataSource("pd_not")
val df = opensearchDataSource("pd_not")
val filter = df.filter(!df("reason").isNull)

assertEquals(1, filter.count())
Expand All @@ -1137,7 +1137,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool

@Test
def testDataSourcePushDown14OR() {
val df = esDataSource("pd_or")
val df = opensearchDataSource("pd_or")
var filter = df.filter(df("reason").contains("us").or(df("airport").equalTo("OTP")))

if (strictPushDown) {
Expand All @@ -1159,7 +1159,7 @@ class AbstractScalaOpenSearchScalaSparkSQL(prefix: String, readMetadata: jl.Bool
@Test
def testEsSchemaFromDocsWithDifferentProperties() {
val table = wrapIndex("sqlvarcol")
esDataSource(table)
opensearchDataSource(table)

val index = wrapIndex("spark-test-scala-sql-varcols")
val (target, _) = makeTargets(index, "data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class AbstractScalaOpenSearchSparkStructuredStreaming(prefix: String, something:
.getOrElse(throw new OpenSearchHadoopIllegalStateException("Spark not started..."))
val version: OpenSearchMajorVersion = TestUtils.getOpenSearchClusterInfo.getMajorVersion

import org.opensearch.spark.integration.Products._
import spark.implicits._

def wrapIndex(name: String): String = {
Expand Down Expand Up @@ -255,7 +256,7 @@ class AbstractScalaOpenSearchSparkStructuredStreaming(prefix: String, something:
.runTest {
test.stream
.writeStream
.option(SparkSqlStreamingConfigs.ES_SINK_LOG_ENABLE, "false")
.option(SparkSqlStreamingConfigs.OPENSEARCH_SINK_LOG_ENABLE, "false")
.option("checkpointLocation", checkpoint(target))
.format("opensearch")
.start(target)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,15 +423,17 @@ private[sql] case class OpenSearchRelation(parameters: Map[String, String], @tra
val x = f.productElement(1).toString()
if (!strictPushDown) x.toLowerCase(Locale.ROOT) else x
}
s"""{"wildcard":{"${f.productElement(0)}":"$arg*"}}"""
val strict = strictPushDown == false
s"""{"wildcard":{"${f.productElement(0)}":{"value":"$arg*","case_insensitive":$strict}}}"""
}

case f:Product if isClass(f, "org.apache.spark.sql.sources.StringEndsWith") => {
val arg = {
val x = f.productElement(1).toString()
if (!strictPushDown) x.toLowerCase(Locale.ROOT) else x
}
s"""{"wildcard":{"${f.productElement(0)}":"*$arg"}}"""
val strict = !strictPushDown
s"""{"wildcard":{"${f.productElement(0)}":{"value":"*$arg","case_insensitive":$strict}}}"""
}

case f:Product if isClass(f, "org.apache.spark.sql.sources.StringContains") => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class AbstractJavaOpenSearchSparkStreamingTest implements Serializable {
private static final transient SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("opensearchtest")
.setJars(SparkUtils.ES_SPARK_TESTING_JAR);
.setJars(SparkUtils.OPENSEARCH_SPARK_TESTING_JAR);

private static transient JavaSparkContext sc = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

public abstract class SparkUtils {

public static final String[] ES_SPARK_TESTING_JAR = new String[] {Provisioner.OPENSEARCHHADOOP_TESTING_JAR};
public static final String[] OPENSEARCH_SPARK_TESTING_JAR = new String[] {Provisioner.OPENSEARCHHADOOP_TESTING_JAR};

public static Kryo sparkSerializer(SparkConf conf) throws Exception {
// reflection galore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class AbstractJavaOpenSearchSparkStructuredStreamingTest {
private static final transient SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("opensearch-structured-streaming-test")
.setJars(SparkUtils.ES_SPARK_TESTING_JAR);
.setJars(SparkUtils.OPENSEARCH_SPARK_TESTING_JAR);

private static transient SparkSession spark = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object AbstractScalaOpenSearchScalaSparkStreaming {
.setMaster("local")
.setAppName("opensearchtest")
.set("spark.executor.extraJavaOptions", "-XX:MaxPermSize=256m")
.setJars(SparkUtils.ES_SPARK_TESTING_JAR)
.setJars(SparkUtils.OPENSEARCH_SPARK_TESTING_JAR)
@transient var sc: SparkContext = null
@transient var ssc: StreamingContext = null

Expand Down