diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala index 840d80ffed13f..9854437739439 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala @@ -24,7 +24,7 @@ import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema} import org.apache.spark.sql.types._ @@ -45,10 +45,10 @@ class RowTest extends AnyFunSpec with Matchers { describe("Row (without schema)") { it("throws an exception when accessing by fieldName") { - intercept[UnsupportedOperationException] { + intercept[SparkUnsupportedOperationException] { noSchemaRow.fieldIndex("col1") } - intercept[UnsupportedOperationException] { + intercept[SparkUnsupportedOperationException] { noSchemaRow.getAs("col1") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index bbb62acd02508..daa8d12613f2c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp} import scala.reflect.ClassTag import scala.reflect.runtime.universe.{typeTag, TypeTag} -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.FooEnum.FooEnum import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue @@ -490,17 +490,22 @@ class ScalaReflectionSuite extends SparkFunSuite { } test("SPARK-29026: schemaFor for trait without companion object throws exception ") { - val e = intercept[UnsupportedOperationException] { - schemaFor[TraitProductWithoutCompanion] - } - assert(e.getMessage.contains("Unable to find constructor")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + schemaFor[TraitProductWithoutCompanion] + }, + errorClass = "_LEGACY_ERROR_TEMP_2144", + parameters = Map("tpe" -> "org.apache.spark.sql.catalyst.TraitProductWithoutCompanion")) } test("SPARK-29026: schemaFor for trait with no-constructor companion throws exception ") { - val e = intercept[UnsupportedOperationException] { - schemaFor[TraitProductWithNoConstructorCompanion] - } - assert(e.getMessage.contains("Unable to find constructor")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + schemaFor[TraitProductWithNoConstructorCompanion] + }, + errorClass = "_LEGACY_ERROR_TEMP_2144", + parameters = Map("tpe" -> + "org.apache.spark.sql.catalyst.TraitProductWithNoConstructorCompanion")) } test("SPARK-27625: annotated data types") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala index 2d61f9fbc071d..e852b474aa18c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala @@ -40,15 +40,15 @@ class EncoderErrorMessageSuite extends SparkFunSuite { // That is done in Java because Scala cannot create truly private classes. test("primitive types in encoders using Kryo serialization") { - intercept[UnsupportedOperationException] { Encoders.kryo[Int] } - intercept[UnsupportedOperationException] { Encoders.kryo[Long] } - intercept[UnsupportedOperationException] { Encoders.kryo[Char] } + intercept[SparkUnsupportedOperationException] { Encoders.kryo[Int] } + intercept[SparkUnsupportedOperationException] { Encoders.kryo[Long] } + intercept[SparkUnsupportedOperationException] { Encoders.kryo[Char] } } test("primitive types in encoders using Java serialization") { - intercept[UnsupportedOperationException] { Encoders.javaSerialization[Int] } - intercept[UnsupportedOperationException] { Encoders.javaSerialization[Long] } - intercept[UnsupportedOperationException] { Encoders.javaSerialization[Char] } + intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Int] } + intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Long] } + intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Char] } } test("nice error message for missing encoder") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 35d8327b93086..7c3857ecfc022 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -584,10 +584,12 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes test("throw exception for tuples with more than 22 elements") { val encoders = (0 to 22).map(_ => Encoders.scalaInt.asInstanceOf[ExpressionEncoder[_]]) - val e = intercept[UnsupportedOperationException] { - ExpressionEncoder.tuple(encoders) - } - assert(e.getMessage.contains("tuple with more than 22 elements are not supported")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + ExpressionEncoder.tuple(encoders) + }, + errorClass = "_LEGACY_ERROR_TEMP_2150", + parameters = Map.empty) } test("throw exception for unexpected serializer") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala index 4b8693cf7fd53..dc9a5816a335d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.json import java.io.CharArrayWriter -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.types._ @@ -122,7 +122,7 @@ class JacksonGeneratorSuite extends SparkFunSuite { val input = ArrayBasedMapData(Map("a" -> 1)) val writer = new CharArrayWriter() val gen = new JacksonGenerator(dataType, writer, option) - intercept[UnsupportedOperationException] { + intercept[SparkUnsupportedOperationException] { gen.write(input) } } @@ -132,7 +132,7 @@ class JacksonGeneratorSuite extends SparkFunSuite { val input = InternalRow(1) val writer = new CharArrayWriter() val gen = new JacksonGenerator(dataType, writer, option) - intercept[UnsupportedOperationException] { + intercept[SparkUnsupportedOperationException] { gen.write(input) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index e79fff7479b9e..42756d91b39e0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -22,7 +22,7 @@ import java.util.Collections import scala.jdk.CollectionConverters._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -659,7 +659,7 @@ class CatalogSuite extends SparkFunSuite { test("purgeTable") { val catalog = newCatalog() - intercept[UnsupportedOperationException](catalog.purgeTable(testIdent)) + intercept[SparkUnsupportedOperationException](catalog.purgeTable(testIdent)) } test("renameTable") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala index dfed284bc2b93..98c2a3d1e2726 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala @@ -21,20 +21,20 @@ import java.util import scala.jdk.CollectionConverters._ -import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException} +import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, SparkUnsupportedOperationException} class CaseInsensitiveStringMapSuite extends SparkFunSuite { test("put and get") { val options = CaseInsensitiveStringMap.empty() - intercept[UnsupportedOperationException] { + intercept[SparkUnsupportedOperationException] { options.put("kEy", "valUE") } } test("clear") { val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava) - intercept[UnsupportedOperationException] { + intercept[SparkUnsupportedOperationException] { options.clear() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 5779fe1c62ffe..fe295b0cfa26a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1883,21 +1883,24 @@ class DatasetSuite extends QueryTest } test("SPARK-19896: cannot have circular references in case class") { - val errMsg1 = intercept[UnsupportedOperationException] { - Seq(CircularReferenceClassA(null)).toDS() - } - assert(errMsg1.getMessage.startsWith("cannot have circular references in class, but got the " + - "circular reference of class")) - val errMsg2 = intercept[UnsupportedOperationException] { - Seq(CircularReferenceClassC(null)).toDS() - } - assert(errMsg2.getMessage.startsWith("cannot have circular references in class, but got the " + - "circular reference of class")) - val errMsg3 = intercept[UnsupportedOperationException] { - Seq(CircularReferenceClassD(null)).toDS() - } - assert(errMsg3.getMessage.startsWith("cannot have circular references in class, but got the " + - "circular reference of class")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + Seq(CircularReferenceClassA(null)).toDS() + }, + errorClass = "_LEGACY_ERROR_TEMP_2139", + parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassA")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + Seq(CircularReferenceClassC(null)).toDS() + }, + errorClass = "_LEGACY_ERROR_TEMP_2139", + parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassC")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + Seq(CircularReferenceClassD(null)).toDS() + }, + errorClass = "_LEGACY_ERROR_TEMP_2139", + parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassD")) } test("SPARK-20125: option of map") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala index 66c6fbeabbf55..01b9fdec9be3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.sql.{Date, Timestamp} -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException} import org.apache.spark.sql.test.SharedSparkSession case class ReflectData( @@ -159,18 +159,25 @@ class ScalaReflectionRelationSuite extends SparkFunSuite with SharedSparkSession } test("better error message when use java reserved keyword as field name") { - val e = intercept[UnsupportedOperationException] { - Seq(InvalidInJava(1)).toDS() - } - assert(e.getMessage.contains( - "`abstract` is not a valid identifier of Java and cannot be used as field name")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + Seq(InvalidInJava(1)).toDS() + }, + errorClass = "_LEGACY_ERROR_TEMP_2140", + parameters = Map( + "fieldName" -> "abstract", + "walkedTypePath" -> "- root class: \"org.apache.spark.sql.InvalidInJava\"")) } test("better error message when use invalid java identifier as field name") { - val e1 = intercept[UnsupportedOperationException] { - Seq(InvalidInJava2(1)).toDS() - } - assert(e1.getMessage.contains( - "`0` is not a valid identifier of Java and cannot be used as field name")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + Seq(InvalidInJava2(1)).toDS() + }, + errorClass = "_LEGACY_ERROR_TEMP_2140", + parameters = Map( + "fieldName" -> "0", + "walkedTypePath" -> + "- root class: \"org.apache.spark.sql.ScalaReflectionRelationSuite.InvalidInJava2\"")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala index 141581e75884a..6481a3f3a8910 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala @@ -322,7 +322,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { test("scalar function: bad magic method") { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenBadMagic)) - intercept[UnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect()) + intercept[SparkUnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect()) } test("scalar function: bad magic method with default impl") { @@ -334,7 +334,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { test("scalar function: no implementation found") { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenNoImpl)) - intercept[UnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect()) + intercept[SparkUnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect()) } test("scalar function: invalid parameter type or length") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala index 12f2205cb1db4..c5cf94f8747cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -27,7 +27,7 @@ import com.google.common.io.{ByteStreams, Closeables} import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} import org.mockito.Mockito.{mock, when} -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.datasources.PartitionedFile @@ -162,12 +162,14 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession { test("binary file data source do not support write operation") { val df = spark.read.format(BINARY_FILE).load(testDir) withTempDir { tmpDir => - val thrown = intercept[UnsupportedOperationException] { - df.write - .format(BINARY_FILE) - .save(s"$tmpDir/test_save") - } - assert(thrown.getMessage.contains("Write is not supported for binary file data source")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + df.write + .format(BINARY_FILE) + .save(s"$tmpDir/test_save") + }, + errorClass = "_LEGACY_ERROR_TEMP_2075", + parameters = Map.empty) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index e5473222d4292..2195768e3b08c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -1164,10 +1165,12 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.createNamespace(testNs, emptyProps) CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach { p => - val exc = intercept[UnsupportedOperationException] { - catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p)) - } - assert(exc.getMessage.contains(s"Cannot remove reserved property: $p")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p)) + }, + errorClass = "_LEGACY_ERROR_TEMP_2069", + parameters = Map("property" -> p)) } catalog.dropNamespace(testNs, cascade = false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index c7655af98160d..42eb9fa17a210 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.io._ import java.nio.charset.StandardCharsets._ +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.test.SharedSparkSession @@ -242,10 +243,12 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession { compactibleLog.add(1, Array("some_path_1")) compactibleLog.add(2, Array("some_path_2")) - val exc = intercept[UnsupportedOperationException] { - compactibleLog.purge(2) - } - assert(exc.getMessage.contains("Cannot purge as it might break internal state")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + compactibleLog.purge(2) + }, + errorClass = "_LEGACY_ERROR_TEMP_2260", + parameters = Map.empty) // Below line would fail with IllegalStateException if we don't prevent purge: // - purge(2) would delete batch 0 and 1 which batch 1 is compaction batch diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala index 31fbf9323140a..128b59b26b823 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.sources import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.functions.spark_partition_id import org.apache.spark.sql.streaming.{StreamTest, Trigger} @@ -193,14 +194,15 @@ class RatePerMicroBatchProviderSuite extends StreamTest { } test("user-specified schema given") { - val exception = intercept[UnsupportedOperationException] { - spark.readStream - .format("rate-micro-batch") - .option("rowsPerBatch", "10") - .schema(spark.range(1).schema) - .load() - } - assert(exception.getMessage.contains( - "RatePerMicroBatchProvider source does not support user-specified schema")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + spark.readStream + .format("rate-micro-batch") + .option("rowsPerBatch", "10") + .schema(spark.range(1).schema) + .load() + }, + errorClass = "_LEGACY_ERROR_TEMP_2242", + parameters = Map("provider" -> "RatePerMicroBatchProvider")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index 69dc8c291c0b4..0732e126a0131 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ -import org.apache.spark.{SparkException, SparkRuntimeException} +import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} @@ -345,14 +345,15 @@ class RateStreamProviderSuite extends StreamTest { } test("user-specified schema given") { - val exception = intercept[UnsupportedOperationException] { - spark.readStream - .format("rate") - .schema(spark.range(1).schema) - .load() - } - assert(exception.getMessage.contains( - "RateStreamProvider source does not support user-specified schema")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + spark.readStream + .format("rate") + .schema(spark.range(1).schema) + .load() + }, + errorClass = "_LEGACY_ERROR_TEMP_2242", + parameters = Map("provider" -> "RateStreamProvider")) } test("continuous data") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala index bfeca58511024..87e34601dc098 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit._ import scala.jdk.CollectionConverters._ +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} @@ -193,11 +194,12 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession { StructField("name", StringType) :: StructField("area", StringType) :: Nil) val params = Map("host" -> "localhost", "port" -> "1234") - val exception = intercept[UnsupportedOperationException] { - spark.readStream.schema(userSpecifiedSchema).format("socket").options(params).load() - } - assert(exception.getMessage.contains( - "TextSocketSourceProvider source does not support user-specified schema")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + spark.readStream.schema(userSpecifiedSchema).format("socket").options(params).load() + }, + errorClass = "_LEGACY_ERROR_TEMP_2242", + parameters = Map("provider" -> "TextSocketSourceProvider")) } test("input row metrics") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala index d795e7a3d94c6..050c1a2d7d978 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming import java.sql.Date -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException} import org.apache.spark.api.java.Optional import org.apache.spark.sql.execution.streaming.GroupStateImpl import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP @@ -115,12 +115,12 @@ class GroupStateSuite extends SparkFunSuite { ) for (state <- states) { // for streaming queries - testTimeoutDurationNotAllowed[UnsupportedOperationException](state) - testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) + testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state) + testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state) // for batch queries - testTimeoutDurationNotAllowed[UnsupportedOperationException](state) - testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) + testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state) + testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state) } } } @@ -135,7 +135,7 @@ class GroupStateSuite extends SparkFunSuite { assert(state.getTimeoutTimestampMs.get() === 2000) state.setTimeoutDuration(500) assert(state.getTimeoutTimestampMs.get() === 1500) // can be set without initializing state - testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) + testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state) state.update(5) assert(state.getTimeoutTimestampMs.isPresent()) @@ -144,37 +144,37 @@ class GroupStateSuite extends SparkFunSuite { assert(state.getTimeoutTimestampMs.get() === 2000) state.setTimeoutDuration("2 second") assert(state.getTimeoutTimestampMs.get() === 3000) - testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) + testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state) state.remove() assert(state.getTimeoutTimestampMs.isPresent()) assert(state.getTimeoutTimestampMs.get() === 3000) // does not change state.setTimeoutDuration(500) // can still be set assert(state.getTimeoutTimestampMs.get() === 1500) - testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) + testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state) // for batch queries state = GroupStateImpl.createForBatch( ProcessingTimeTimeout, watermarkPresent = false).asInstanceOf[GroupStateImpl[Int]] assert(!state.getTimeoutTimestampMs.isPresent()) state.setTimeoutDuration(500) - testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) + testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state) state.update(5) state.setTimeoutDuration(1000) state.setTimeoutDuration("2 second") - testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) + testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state) state.remove() state.setTimeoutDuration(500) - testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) + testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state) } test("GroupState - setTimeout - with EventTimeTimeout") { var state = TestGroupState.create[Int]( Optional.empty[Int], EventTimeTimeout, 1000, Optional.of(1000), hasTimedOut = false) assert(!state.getTimeoutTimestampMs.isPresent()) - testTimeoutDurationNotAllowed[UnsupportedOperationException](state) + testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state) state.setTimeoutTimestamp(5000) assert(state.getTimeoutTimestampMs.get() === 5000) // can be set without initializing state @@ -184,29 +184,29 @@ class GroupStateSuite extends SparkFunSuite { assert(state.getTimeoutTimestampMs.get() === 10000) state.setTimeoutTimestamp(new Date(20000)) assert(state.getTimeoutTimestampMs.get() === 20000) - testTimeoutDurationNotAllowed[UnsupportedOperationException](state) + testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state) state.remove() assert(state.getTimeoutTimestampMs.get() === 20000) state.setTimeoutTimestamp(5000) assert(state.getTimeoutTimestampMs.get() === 5000) // can be set after removing state - testTimeoutDurationNotAllowed[UnsupportedOperationException](state) + testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state) // for batch queries state = GroupStateImpl.createForBatch( EventTimeTimeout, watermarkPresent = false).asInstanceOf[GroupStateImpl[Int]] assert(!state.getTimeoutTimestampMs.isPresent()) - testTimeoutDurationNotAllowed[UnsupportedOperationException](state) + testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state) state.setTimeoutTimestamp(5000) state.update(5) state.setTimeoutTimestamp(10000) state.setTimeoutTimestamp(new Date(20000)) - testTimeoutDurationNotAllowed[UnsupportedOperationException](state) + testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state) state.remove() state.setTimeoutTimestamp(5000) - testTimeoutDurationNotAllowed[UnsupportedOperationException](state) + testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state) } test("GroupState - illegal params to setTimeout") { @@ -297,20 +297,19 @@ class GroupStateSuite extends SparkFunSuite { assert(illegalArgument.getMessage.contains("batchProcessingTimeMs must be 0 or positive")) // hasTimedOut cannot be true if there's no timeout configured - var unsupportedOperation = intercept[UnsupportedOperationException] { - TestGroupState.create[Int]( - Optional.of(5), NoTimeout, 100L, Optional.empty[Long], hasTimedOut = true) - } - assert( - unsupportedOperation - .getMessage.contains("hasTimedOut is true however there's no timeout configured")) - unsupportedOperation = intercept[UnsupportedOperationException] { - GroupStateImpl.createForStreaming[Int]( - Some(5), 100L, NO_TIMESTAMP, NoTimeout, true, false) - } - assert( - unsupportedOperation - .getMessage.contains("hasTimedOut is true however there's no timeout configured")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + TestGroupState.create[Int]( + Optional.of(5), NoTimeout, 100L, Optional.empty[Long], hasTimedOut = true) + }, + errorClass = "_LEGACY_ERROR_TEMP_3168", + parameters = Map.empty) + checkError( + exception = intercept[SparkUnsupportedOperationException] { + GroupStateImpl.createForStreaming[Int](Some(5), 100L, NO_TIMESTAMP, NoTimeout, true, false) + }, + errorClass = "_LEGACY_ERROR_TEMP_3168", + parameters = Map.empty) } test("GroupState - hasTimedOut") { @@ -348,9 +347,10 @@ class GroupStateSuite extends SparkFunSuite { } def assertWrongTimeoutError(test: => Unit): Unit = { - val e = intercept[UnsupportedOperationException] { test } - assert(e.getMessage.contains( - "Cannot get event time watermark timestamp without setting watermark")) + checkError( + exception = intercept[SparkUnsupportedOperationException] { test }, + errorClass = "_LEGACY_ERROR_TEMP_2204", + parameters = Map.empty) } for (timeoutConf <- Seq(NoTimeout, EventTimeTimeout, ProcessingTimeTimeout)) {