Skip to content

Commit

Permalink
[SPARK-47028][SQL][TESTS] Check SparkUnsupportedOperationException
Browse files Browse the repository at this point in the history
…instead of `UnsupportedOperationException`

### What changes were proposed in this pull request?
In the PR, I propose to use `checkError()` in tests of `sql` to check `SparkUnsupportedOperationException`, and its fields.

### Why are the changes needed?
By checking `SparkUnsupportedOperationException` and its fields like error class and message parameters prevents replacing `SparkUnsupportedOperationException` back to `UnsupportedOperationException`.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the modified test suites.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#45082 from MaxGekk/intercept-UnsupportedOperationException-tests.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
MaxGekk committed Feb 13, 2024
1 parent 33a153a commit 49bcde6
Show file tree
Hide file tree
Showing 17 changed files with 160 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}
import org.apache.spark.sql.types._
Expand All @@ -45,10 +45,10 @@ class RowTest extends AnyFunSpec with Matchers {

describe("Row (without schema)") {
it("throws an exception when accessing by fieldName") {
intercept[UnsupportedOperationException] {
intercept[SparkUnsupportedOperationException] {
noSchemaRow.fieldIndex("col1")
}
intercept[UnsupportedOperationException] {
intercept[SparkUnsupportedOperationException] {
noSchemaRow.getAs("col1")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{typeTag, TypeTag}

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.FooEnum.FooEnum
import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
Expand Down Expand Up @@ -490,17 +490,22 @@ class ScalaReflectionSuite extends SparkFunSuite {
}

test("SPARK-29026: schemaFor for trait without companion object throws exception ") {
val e = intercept[UnsupportedOperationException] {
schemaFor[TraitProductWithoutCompanion]
}
assert(e.getMessage.contains("Unable to find constructor"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
schemaFor[TraitProductWithoutCompanion]
},
errorClass = "_LEGACY_ERROR_TEMP_2144",
parameters = Map("tpe" -> "org.apache.spark.sql.catalyst.TraitProductWithoutCompanion"))
}

test("SPARK-29026: schemaFor for trait with no-constructor companion throws exception ") {
val e = intercept[UnsupportedOperationException] {
schemaFor[TraitProductWithNoConstructorCompanion]
}
assert(e.getMessage.contains("Unable to find constructor"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
schemaFor[TraitProductWithNoConstructorCompanion]
},
errorClass = "_LEGACY_ERROR_TEMP_2144",
parameters = Map("tpe" ->
"org.apache.spark.sql.catalyst.TraitProductWithNoConstructorCompanion"))
}

test("SPARK-27625: annotated data types") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ class EncoderErrorMessageSuite extends SparkFunSuite {
// That is done in Java because Scala cannot create truly private classes.

test("primitive types in encoders using Kryo serialization") {
intercept[UnsupportedOperationException] { Encoders.kryo[Int] }
intercept[UnsupportedOperationException] { Encoders.kryo[Long] }
intercept[UnsupportedOperationException] { Encoders.kryo[Char] }
intercept[SparkUnsupportedOperationException] { Encoders.kryo[Int] }
intercept[SparkUnsupportedOperationException] { Encoders.kryo[Long] }
intercept[SparkUnsupportedOperationException] { Encoders.kryo[Char] }
}

test("primitive types in encoders using Java serialization") {
intercept[UnsupportedOperationException] { Encoders.javaSerialization[Int] }
intercept[UnsupportedOperationException] { Encoders.javaSerialization[Long] }
intercept[UnsupportedOperationException] { Encoders.javaSerialization[Char] }
intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Int] }
intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Long] }
intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Char] }
}

test("nice error message for missing encoder") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,10 +584,12 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes
test("throw exception for tuples with more than 22 elements") {
val encoders = (0 to 22).map(_ => Encoders.scalaInt.asInstanceOf[ExpressionEncoder[_]])

val e = intercept[UnsupportedOperationException] {
ExpressionEncoder.tuple(encoders)
}
assert(e.getMessage.contains("tuple with more than 22 elements are not supported"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
ExpressionEncoder.tuple(encoders)
},
errorClass = "_LEGACY_ERROR_TEMP_2150",
parameters = Map.empty)
}

test("throw exception for unexpected serializer") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.json

import java.io.CharArrayWriter

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -122,7 +122,7 @@ class JacksonGeneratorSuite extends SparkFunSuite {
val input = ArrayBasedMapData(Map("a" -> 1))
val writer = new CharArrayWriter()
val gen = new JacksonGenerator(dataType, writer, option)
intercept[UnsupportedOperationException] {
intercept[SparkUnsupportedOperationException] {
gen.write(input)
}
}
Expand All @@ -132,7 +132,7 @@ class JacksonGeneratorSuite extends SparkFunSuite {
val input = InternalRow(1)
val writer = new CharArrayWriter()
val gen = new JacksonGenerator(dataType, writer, option)
intercept[UnsupportedOperationException] {
intercept[SparkUnsupportedOperationException] {
gen.write(input)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Collections

import scala.jdk.CollectionConverters._

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
Expand Down Expand Up @@ -659,7 +659,7 @@ class CatalogSuite extends SparkFunSuite {

test("purgeTable") {
val catalog = newCatalog()
intercept[UnsupportedOperationException](catalog.purgeTable(testIdent))
intercept[SparkUnsupportedOperationException](catalog.purgeTable(testIdent))
}

test("renameTable") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ import java.util

import scala.jdk.CollectionConverters._

import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, SparkUnsupportedOperationException}

class CaseInsensitiveStringMapSuite extends SparkFunSuite {

test("put and get") {
val options = CaseInsensitiveStringMap.empty()
intercept[UnsupportedOperationException] {
intercept[SparkUnsupportedOperationException] {
options.put("kEy", "valUE")
}
}

test("clear") {
val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava)
intercept[UnsupportedOperationException] {
intercept[SparkUnsupportedOperationException] {
options.clear()
}
}
Expand Down
33 changes: 18 additions & 15 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1883,21 +1883,24 @@ class DatasetSuite extends QueryTest
}

test("SPARK-19896: cannot have circular references in case class") {
val errMsg1 = intercept[UnsupportedOperationException] {
Seq(CircularReferenceClassA(null)).toDS()
}
assert(errMsg1.getMessage.startsWith("cannot have circular references in class, but got the " +
"circular reference of class"))
val errMsg2 = intercept[UnsupportedOperationException] {
Seq(CircularReferenceClassC(null)).toDS()
}
assert(errMsg2.getMessage.startsWith("cannot have circular references in class, but got the " +
"circular reference of class"))
val errMsg3 = intercept[UnsupportedOperationException] {
Seq(CircularReferenceClassD(null)).toDS()
}
assert(errMsg3.getMessage.startsWith("cannot have circular references in class, but got the " +
"circular reference of class"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
Seq(CircularReferenceClassA(null)).toDS()
},
errorClass = "_LEGACY_ERROR_TEMP_2139",
parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassA"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
Seq(CircularReferenceClassC(null)).toDS()
},
errorClass = "_LEGACY_ERROR_TEMP_2139",
parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassC"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
Seq(CircularReferenceClassD(null)).toDS()
},
errorClass = "_LEGACY_ERROR_TEMP_2139",
parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassD"))
}

test("SPARK-20125: option of map") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql

import java.sql.{Date, Timestamp}

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.test.SharedSparkSession

case class ReflectData(
Expand Down Expand Up @@ -159,18 +159,25 @@ class ScalaReflectionRelationSuite extends SparkFunSuite with SharedSparkSession
}

test("better error message when use java reserved keyword as field name") {
val e = intercept[UnsupportedOperationException] {
Seq(InvalidInJava(1)).toDS()
}
assert(e.getMessage.contains(
"`abstract` is not a valid identifier of Java and cannot be used as field name"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
Seq(InvalidInJava(1)).toDS()
},
errorClass = "_LEGACY_ERROR_TEMP_2140",
parameters = Map(
"fieldName" -> "abstract",
"walkedTypePath" -> "- root class: \"org.apache.spark.sql.InvalidInJava\""))
}

test("better error message when use invalid java identifier as field name") {
val e1 = intercept[UnsupportedOperationException] {
Seq(InvalidInJava2(1)).toDS()
}
assert(e1.getMessage.contains(
"`0` is not a valid identifier of Java and cannot be used as field name"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
Seq(InvalidInJava2(1)).toDS()
},
errorClass = "_LEGACY_ERROR_TEMP_2140",
parameters = Map(
"fieldName" -> "0",
"walkedTypePath" ->
"- root class: \"org.apache.spark.sql.ScalaReflectionRelationSuite.InvalidInJava2\""))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
test("scalar function: bad magic method") {
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenBadMagic))
intercept[UnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
intercept[SparkUnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
}

test("scalar function: bad magic method with default impl") {
Expand All @@ -334,7 +334,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
test("scalar function: no implementation found") {
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenNoImpl))
intercept[UnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
intercept[SparkUnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
}

test("scalar function: invalid parameter type or length") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
import org.mockito.Mockito.{mock, when}

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.datasources.PartitionedFile
Expand Down Expand Up @@ -162,12 +162,14 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
test("binary file data source do not support write operation") {
val df = spark.read.format(BINARY_FILE).load(testDir)
withTempDir { tmpDir =>
val thrown = intercept[UnsupportedOperationException] {
df.write
.format(BINARY_FILE)
.save(s"$tmpDir/test_save")
}
assert(thrown.getMessage.contains("Write is not supported for binary file data source"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
df.write
.format(BINARY_FILE)
.save(s"$tmpDir/test_save")
},
errorClass = "_LEGACY_ERROR_TEMP_2075",
parameters = Map.empty)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
Expand Down Expand Up @@ -1164,10 +1165,12 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.createNamespace(testNs, emptyProps)

CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach { p =>
val exc = intercept[UnsupportedOperationException] {
catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p))
}
assert(exc.getMessage.contains(s"Cannot remove reserved property: $p"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p))
},
errorClass = "_LEGACY_ERROR_TEMP_2069",
parameters = Map("property" -> p))

}
catalog.dropNamespace(testNs, cascade = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.io._
import java.nio.charset.StandardCharsets._

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.test.SharedSparkSession

Expand Down Expand Up @@ -242,10 +243,12 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession {
compactibleLog.add(1, Array("some_path_1"))
compactibleLog.add(2, Array("some_path_2"))

val exc = intercept[UnsupportedOperationException] {
compactibleLog.purge(2)
}
assert(exc.getMessage.contains("Cannot purge as it might break internal state"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
compactibleLog.purge(2)
},
errorClass = "_LEGACY_ERROR_TEMP_2260",
parameters = Map.empty)

// Below line would fail with IllegalStateException if we don't prevent purge:
// - purge(2) would delete batch 0 and 1 which batch 1 is compaction batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.sources

import org.scalatest.concurrent.PatienceConfiguration.Timeout

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.functions.spark_partition_id
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
Expand Down Expand Up @@ -193,14 +194,15 @@ class RatePerMicroBatchProviderSuite extends StreamTest {
}

test("user-specified schema given") {
val exception = intercept[UnsupportedOperationException] {
spark.readStream
.format("rate-micro-batch")
.option("rowsPerBatch", "10")
.schema(spark.range(1).schema)
.load()
}
assert(exception.getMessage.contains(
"RatePerMicroBatchProvider source does not support user-specified schema"))
checkError(
exception = intercept[SparkUnsupportedOperationException] {
spark.readStream
.format("rate-micro-batch")
.option("rowsPerBatch", "10")
.schema(spark.range(1).schema)
.load()
},
errorClass = "_LEGACY_ERROR_TEMP_2242",
parameters = Map("provider" -> "RatePerMicroBatchProvider"))
}
}
Loading

0 comments on commit 49bcde6

Please sign in to comment.