Skip to content

Commit

Permalink
[SPARK-46922][CORE][SQL] Do not wrap runtime user-facing errors
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

It's not user-friendly to always wrap task runtime errors with `SparkException("Job aborted ...")`, as users need to scroll down quite a bit to find the real error. This PR throws the user-facing runtime errors directly, which means the error defines error class and is not internal error.

This PR also fixes some error wrapping issues.

### Why are the changes needed?

Report errors better.

### Does this PR introduce _any_ user-facing change?

Yes, now users can see the actual error directly instead of looking for the cause of "job aborted" error.

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#44953 from cloud-fan/user-error.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Feb 7, 2024
1 parent becc04a commit 5789316
Show file tree
Hide file tree
Showing 60 changed files with 712 additions and 638 deletions.
11 changes: 6 additions & 5 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,12 @@
],
"sqlState" : "22018"
},
"FAILED_READ_FILE" : {
"message" : [
"Encountered error while reading file <path>."
],
"sqlState" : "KD001"
},
"FAILED_REGISTER_CLASS_WITH_KRYO" : {
"message" : [
"Failed to register classes with Kryo."
Expand Down Expand Up @@ -5854,11 +5860,6 @@
"Parquet column cannot be converted in file <filePath>. Column: <column>, Expected: <logicalType>, Found: <physicalType>."
]
},
"_LEGACY_ERROR_TEMP_2064" : {
"message" : [
"Encountered error while reading file <path>. Details:"
]
},
"_LEGACY_ERROR_TEMP_2065" : {
"message" : [
"Cannot create columnar reader."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,12 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
dataFileWriter.flush()
dataFileWriter.close()

val ex = intercept[SparkException] {
spark.read.format("avro").load(s"$dir.avro").collect()
}
assert(ex.getErrorClass == "FAILED_READ_FILE")
checkError(
exception = intercept[SparkException] {
spark.read.format("avro").load(s"$dir.avro").collect()
}.getCause.getCause.asInstanceOf[SparkArithmeticException],
exception = ex.getCause.asInstanceOf[SparkArithmeticException],
errorClass = "NUMERIC_VALUE_OUT_OF_RANGE",
parameters = Map(
"value" -> "0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.exception.ExceptionUtils

import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException}
import org.apache.spark.TestUtils.assertExceptionMsg
Expand Down Expand Up @@ -862,23 +861,19 @@ abstract class AvroSuite
sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
// With the flag disabled, we will throw an exception if there is a mismatch
withSQLConf(confKey -> "false") {
val e = intercept[SparkException] {
val ex = intercept[SparkException] {
spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect()
}
ExceptionUtils.getRootCause(e) match {
case ex: AnalysisException =>
checkError(
exception = ex,
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "decimal\\(12,10\\)",
"sqlType" -> "\"DECIMAL\\(4,3\\)\""),
matchPVals = true
)
case other =>
fail(s"Received unexpected exception", other)
}
assert(ex.getErrorClass == "FAILED_READ_FILE")
checkError(
exception = ex.getCause.asInstanceOf[AnalysisException],
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "decimal\\(12,10\\)",
"sqlType" -> "\"DECIMAL\\(4,3\\)\""),
matchPVals = true
)
}
// The following used to work, so it should still work with the flag enabled
checkAnswer(
Expand Down Expand Up @@ -911,24 +906,19 @@ abstract class AvroSuite

withSQLConf(confKey -> "false") {
Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
val e = intercept[SparkException] {
val ex = intercept[SparkException] {
spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect()
}

ExceptionUtils.getRootCause(e) match {
case ex: AnalysisException =>
checkError(
exception = ex,
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "interval day to second",
"sqlType" -> s""""$sqlType""""),
matchPVals = true
)
case other =>
fail(s"Received unexpected exception", other)
}
assert(ex.getErrorClass == "FAILED_READ_FILE")
checkError(
exception = ex.getCause.asInstanceOf[AnalysisException],
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "interval day to second",
"sqlType" -> s""""$sqlType""""),
matchPVals = true
)
}
}

Expand All @@ -953,24 +943,19 @@ abstract class AvroSuite

withSQLConf(confKey -> "false") {
Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
val e = intercept[SparkException] {
val ex = intercept[SparkException] {
spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect()
}

ExceptionUtils.getRootCause(e) match {
case ex: AnalysisException =>
checkError(
exception = ex,
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "interval year to month",
"sqlType" -> s""""$sqlType""""),
matchPVals = true
)
case other =>
fail(s"Received unexpected exception", other)
}
assert(ex.getErrorClass == "FAILED_READ_FILE")
checkError(
exception = ex.getCause.asInstanceOf[AnalysisException],
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "interval year to month",
"sqlType" -> s""""$sqlType""""),
matchPVals = true
)
}
}

Expand Down Expand Up @@ -1465,13 +1450,15 @@ abstract class AvroSuite
df.write.format("avro").option("avroSchema", avroSchema).save(tempSaveDir)
checkAvroSchemaEquals(avroSchema, getAvroSchemaStringFromFiles(tempSaveDir))

val message = intercept[SparkException] {
val ex = intercept[SparkException] {
spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row(2, null))), catalystSchema)
.write.format("avro").option("avroSchema", avroSchema)
.save(s"$tempDir/${UUID.randomUUID()}")
}.getMessage
assert(message.contains("Caused by: java.lang.NullPointerException: "))
assert(message.contains("null value for (non-nullable) string at test_schema.Name"))
}
assert(ex.getErrorClass == "TASK_WRITE_FAILED")
assert(ex.getCause.isInstanceOf[java.lang.NullPointerException])
assert(ex.getCause.getMessage.contains(
"null value for (non-nullable) string at test_schema.Name"))
}
}

Expand Down Expand Up @@ -2380,7 +2367,8 @@ abstract class AvroSuite
val e = intercept[SparkException] {
df.write.format("avro").option("avroSchema", avroSchema).save(path3_x)
}
assert(e.getCause.getCause.isInstanceOf[SparkUpgradeException])
assert(e.getErrorClass == "TASK_WRITE_FAILED")
assert(e.getCause.isInstanceOf[SparkUpgradeException])
}
checkDefaultLegacyRead(oldPath)

Expand All @@ -2405,10 +2393,6 @@ abstract class AvroSuite
}
}

def failInRead(path: String): Unit = {
val e = intercept[SparkException](spark.read.format("avro").load(path).collect())
assert(e.getCause.isInstanceOf[SparkUpgradeException])
}
def successInRead(path: String): Unit = spark.read.format("avro").load(path).collect()
Seq(
// By default we should not fail to read ancient datetime values when parquet files don't
Expand Down Expand Up @@ -2638,7 +2622,8 @@ abstract class AvroSuite
val e = intercept[SparkException] {
df.write.format("avro").option("avroSchema", avroSchema).save(dir.getCanonicalPath)
}
val errMsg = e.getCause.getCause.asInstanceOf[SparkUpgradeException].getMessage
assert(e.getErrorClass == "TASK_WRITE_FAILED")
val errMsg = e.getCause.asInstanceOf[SparkUpgradeException].getMessage
assert(errMsg.contains("You may get a different result due to the upgrading"))
}
}
Expand All @@ -2648,7 +2633,8 @@ abstract class AvroSuite
val e = intercept[SparkException] {
df.write.format("avro").save(dir.getCanonicalPath)
}
val errMsg = e.getCause.getCause.asInstanceOf[SparkUpgradeException].getMessage
assert(e.getErrorClass == "TASK_WRITE_FAILED")
val errMsg = e.getCause.asInstanceOf[SparkUpgradeException].getMessage
assert(errMsg.contains("You may get a different result due to the upgrading"))
}
}
Expand All @@ -2659,11 +2645,10 @@ abstract class AvroSuite
"before_1582_timestamp_micros_v2_4_5.avro",
"before_1582_timestamp_millis_v2_4_5.avro"
).foreach { fileName =>
val e = intercept[SparkException] {
val e = intercept[SparkUpgradeException] {
spark.read.format("avro").load(getResourceAvroFilePath(fileName)).collect()
}
val errMsg = e.getCause.asInstanceOf[SparkUpgradeException].getMessage
assert(errMsg.contains("You may get a different result due to the upgrading"))
assert(e.getMessage.contains("You may get a different result due to the upgrading"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,8 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging {
assert(!exception.getMessageParameters().get("endOffset").isEmpty)
assert(exception.getCause.isInstanceOf[SparkException])
assert(exception.getCause.getCause.isInstanceOf[SparkException])
assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException])
assert(
exception.getCause.getCause.getCause.getMessage
exception.getCause.getCause.getMessage
.contains("java.lang.RuntimeException: Number 2 encountered!"))
} finally {
spark.streams.resetTerminated()
Expand Down Expand Up @@ -248,9 +247,8 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging {
assert(!exception.getMessageParameters().get("endOffset").isEmpty)
assert(exception.getCause.isInstanceOf[SparkException])
assert(exception.getCause.getCause.isInstanceOf[SparkException])
assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException])
assert(
exception.getCause.getCause.getCause.getMessage
exception.getCause.getCause.getMessage
.contains("java.lang.RuntimeException: Number 2 encountered!"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,9 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark

test("SPARK-20427/SPARK-20921: read table use custom schema by jdbc api") {
// default will throw IllegalArgumentException
val e = intercept[org.apache.spark.SparkException] {
val e = intercept[org.apache.spark.SparkArithmeticException] {
spark.read.jdbc(jdbcUrl, "tableWithCustomSchema", new Properties()).collect()
}
assert(e.getCause().isInstanceOf[ArithmeticException])
assert(e.getMessage.contains("Decimal precision 39 exceeds max precision 38"))

// custom schema can read data
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2831,10 +2831,16 @@ private[spark] class DAGScheduler(
failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
updateStageInfoForPushBasedShuffle(failedStage)
for (job <- dependentJobs) {
failJobAndIndependentStages(
job,
val finalException = exception.collect {
// If the error is user-facing (defines error class and is not internal error), we don't
// wrap it with "Job aborted" and expose this error to the end users directly.
case st: Exception with SparkThrowable if st.getErrorClass != null &&
!SparkThrowableHelper.isInternalError(st.getErrorClass) =>
st
}.getOrElse {
new SparkException(s"Job aborted due to stage failure: $reason", cause = exception.orNull)
)
}
failJobAndIndependentStages(job, finalException)
}
if (dependentJobs.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
Expand Down Expand Up @@ -2895,7 +2901,7 @@ private[spark] class DAGScheduler(
/** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
private def failJobAndIndependentStages(
job: ActiveJob,
error: SparkException): Unit = {
error: Exception): Unit = {
if (cancelRunningIndependentStages(job, error.getMessage)) {
// SPARK-15783 important to cleanup state first, just for tests where we have some asserts
// against the state. Otherwise we have a *little* bit of flakiness in the tests.
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,12 @@ For more details see [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.

Failed parsing struct: `<raw>`.

### FAILED_READ_FILE

SQLSTATE: KD001

Encountered error while reading file `<path>`.

### FAILED_REGISTER_CLASS_WITH_KRYO

SQLSTATE: KD000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import breeze.stats.distributions.{Multinomial => BrzMultinomial}
import breeze.stats.distributions.Rand.FixedSeed.randBasis
import org.scalatest.exceptions.TestFailedException

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.{SparkException, SparkFunSuite, SparkRuntimeException}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
Expand Down Expand Up @@ -253,23 +253,23 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext {
LabeledPoint(0.0, Vectors.dense(-1.0)),
LabeledPoint(1.0, Vectors.dense(1.0)),
LabeledPoint(1.0, Vectors.dense(0.0)))
intercept[SparkException] {
intercept[SparkRuntimeException] {
NaiveBayes.train(sc.makeRDD(dense, 2))
}
val sparse = Seq(
LabeledPoint(1.0, Vectors.sparse(1, Array(0), Array(1.0))),
LabeledPoint(0.0, Vectors.sparse(1, Array(0), Array(-1.0))),
LabeledPoint(1.0, Vectors.sparse(1, Array(0), Array(1.0))),
LabeledPoint(1.0, Vectors.sparse(1, Array.empty, Array.empty)))
intercept[SparkException] {
intercept[SparkRuntimeException] {
NaiveBayes.train(sc.makeRDD(sparse, 2))
}
val nan = Seq(
LabeledPoint(1.0, Vectors.sparse(1, Array(0), Array(1.0))),
LabeledPoint(0.0, Vectors.sparse(1, Array(0), Array(Double.NaN))),
LabeledPoint(1.0, Vectors.sparse(1, Array(0), Array(1.0))),
LabeledPoint(1.0, Vectors.sparse(1, Array.empty, Array.empty)))
intercept[SparkException] {
intercept[SparkRuntimeException] {
NaiveBayes.train(sc.makeRDD(nan, 2))
}
}
Expand All @@ -281,7 +281,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext {
LabeledPoint(1.0, Vectors.dense(1.0)),
LabeledPoint(1.0, Vectors.dense(0.0)))

intercept[SparkException] {
intercept[SparkRuntimeException] {
NaiveBayes.train(sc.makeRDD(badTrain, 2), 1.0, Bernoulli)
}

Expand Down
8 changes: 3 additions & 5 deletions python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import re
import unittest

from py4j.protocol import Py4JJavaError

from pyspark.errors import PySparkTypeError, PySparkValueError
from pyspark.errors import PySparkTypeError, PySparkValueError, SparkRuntimeException
from pyspark.sql import Row, Window, functions as F, types
from pyspark.sql.column import Column
from pyspark.testing.sqlutils import ReusedSQLTestCase, SQLTestUtils
Expand Down Expand Up @@ -1073,7 +1071,7 @@ def test_datetime_functions(self):
self.assertEqual(datetime.date(2017, 1, 22), parse_result["to_date(dateCol)"])

def test_assert_true(self):
self.check_assert_true(Py4JJavaError)
self.check_assert_true(SparkRuntimeException)

def check_assert_true(self, tpe):
df = self.spark.range(3)
Expand All @@ -1099,7 +1097,7 @@ def check_assert_true(self, tpe):
)

def test_raise_error(self):
self.check_raise_error(Py4JJavaError)
self.check_raise_error(SparkRuntimeException)

def check_raise_error(self, tpe):
df = self.spark.createDataFrame([Row(id="foobar")])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class FailureSafeParser[IN](
case StringAsDataTypeException(fieldName, fieldValue, dataType) =>
throw QueryExecutionErrors.cannotParseStringAsDataTypeError(e.record().toString,
fieldName, fieldValue, dataType)
case _ => throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
toResultRow(e.partialResults().headOption, e.record).toString, e)
case other => throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
toResultRow(e.partialResults().headOption, e.record).toString, other)
}
}
}
Expand Down
Loading

0 comments on commit 5789316

Please sign in to comment.