Skip to content

Commit

Permalink
[SPARK-46864][SS] Onboard Arbitrary StateV2 onto New Error Class Fram…
Browse files Browse the repository at this point in the history
…ework

### What changes were proposed in this pull request?

This PR proposes to apply error class framework to the new data source, State API V2.

### Why are the changes needed?

Error class framework is a standard to represent all exceptions in Spark.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Refactored unit tests to check that the right error class was being thrown in certain situations

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#44883 from ericm-db/state-v2-error-class.

Lead-authored-by: Eric Marnadi <eric.marnadi@databricks.com>
Co-authored-by: ericm-db <132308037+ericm-db@users.noreply.github.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
2 people authored and HeartSaVioR committed Feb 1, 2024
1 parent 704e9a0 commit 00e63d6
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 20 deletions.
29 changes: 29 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,12 @@
],
"sqlState" : "XX000"
},
"INTERNAL_ERROR_TWS" : {
"message" : [
"<message>"
],
"sqlState" : "XX000"
},
"INTERVAL_ARITHMETIC_OVERFLOW" : {
"message" : [
"<message>.<alternative>"
Expand Down Expand Up @@ -3235,6 +3241,18 @@
],
"sqlState" : "0A000"
},
"STATE_STORE_MULTIPLE_VALUES_PER_KEY" : {
"message" : [
"Store does not support multiple values per key"
],
"sqlState" : "42802"
},
"STATE_STORE_UNSUPPORTED_OPERATION" : {
"message" : [
"<operationType> operation not supported with <entity>"
],
"sqlState" : "XXKST"
},
"STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : {
"message" : [
"Static partition column <staticName> is also specified in the column list."
Expand Down Expand Up @@ -3388,6 +3406,12 @@
],
"sqlState" : "428EK"
},
"TWS_VALUE_SHOULD_NOT_BE_NULL" : {
"message" : [
"New value should be non-null for <typeOfState>"
],
"sqlState" : "22004"
},
"UDTF_ALIAS_NUMBER_MISMATCH" : {
"message" : [
"The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF.",
Expand Down Expand Up @@ -3921,6 +3945,11 @@
"<variableName> is a VARIABLE and cannot be updated using the SET statement. Use SET VARIABLE <variableName> = ... instead."
]
},
"STATE_STORE_MULTIPLE_COLUMN_FAMILIES" : {
"message" : [
"Creating multiple column families with <stateStoreProvider> is not supported."
]
},
"TABLE_OPERATION" : {
"message" : [
"Table <tableName> does not support <operation>. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by \"spark.sql.catalog\"."
Expand Down
4 changes: 4 additions & 0 deletions docs/sql-error-conditions-unsupported-feature-error-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ set PROPERTIES and DBPROPERTIES at the same time.

`<variableName>` is a VARIABLE and cannot be updated using the SET statement. Use SET VARIABLE `<variableName>` = ... instead.

## STATE_STORE_MULTIPLE_COLUMN_FAMILIES

Creating multiple column families with `<stateStoreProvider>` is not supported.

## TABLE_OPERATION

Table `<tableName>` does not support `<operation>`. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog".
Expand Down
24 changes: 24 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,12 @@ For more details see [INTERNAL_ERROR_METADATA_CATALOG](sql-error-conditions-inte

`<message>`

### INTERNAL_ERROR_TWS

[SQLSTATE: XX000](sql-error-conditions-sqlstates.html#class-XX-internal-error)

`<message>`

### INTERVAL_ARITHMETIC_OVERFLOW

[SQLSTATE: 22015](sql-error-conditions-sqlstates.html#class-22-data-exception)
Expand Down Expand Up @@ -2019,6 +2025,18 @@ The SQL config `<sqlConf>` cannot be found. Please verify that the config exists

Star (*) is not allowed in a select list when GROUP BY an ordinal position is used.

### STATE_STORE_MULTIPLE_VALUES_PER_KEY

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Store does not support multiple values per key

### STATE_STORE_UNSUPPORTED_OPERATION

[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)

`<operationType>` operation not supported with `<entity>`

### STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST

[SQLSTATE: 42713](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down Expand Up @@ -2157,6 +2175,12 @@ Choose a different name, drop or replace the existing view, or add the IF NOT E

CREATE TEMPORARY VIEW or the corresponding Dataset APIs only accept single-part view names, but got: `<actualName>`.

### TWS_VALUE_SHOULD_NOT_BE_NULL

[SQLSTATE: 22004](sql-error-conditions-sqlstates.html#class-22-data-exception)

New value should be non-null for `<typeOfState>`

### UDTF_ALIAS_NUMBER_MISMATCH

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors}
import org.apache.spark.sql.streaming.ValueState
import org.apache.spark.sql.types._

Expand All @@ -47,8 +47,7 @@ class ValueStateImpl[S](
private def encodeKey(): UnsafeRow = {
val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
if (!keyOption.isDefined) {
throw new UnsupportedOperationException("Implicit key not found for operation on" +
s"stateName=$stateName")
throw StateStoreErrors.implicitKeyNotFound(stateName)
}

val toRow = keyExprEnc.createSerializer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

// TODO: add support for multiple col families with HDFSBackedStateStoreProvider
if (useColumnFamilies) {
throw new UnsupportedOperationException("Multiple column families are not supported with " +
s"HDFSBackedStateStoreProvider")
throw StateStoreErrors.multipleColumnFamiliesNotSupported("HDFSStateStoreProvider")
}

require((keySchema.length == 0 && numColsPrefixKey == 0) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ class StateStoreChangelogWriterV1(
}

override def put(key: Array[Byte], value: Array[Byte], colFamilyName: String): Unit = {
throw new UnsupportedOperationException("Operation not supported with state " +
"changelog writer v1")
throw StateStoreErrors.unsupportedOperationException(
operationName = "Put", entity = "changelog writer v1")
}

override def delete(key: Array[Byte]): Unit = {
Expand All @@ -151,8 +151,8 @@ class StateStoreChangelogWriterV1(
}

override def delete(key: Array[Byte], colFamilyName: String): Unit = {
throw new UnsupportedOperationException("Operation not supported with state " +
"changelog writer v1")
throw StateStoreErrors.unsupportedOperationException(
operationName = "Delete", entity = "changelog writer v1")
}

override def commit(): Unit = {
Expand Down Expand Up @@ -189,8 +189,8 @@ class StateStoreChangelogWriterV2(
extends StateStoreChangelogWriter(fm, file, compressionCodec) {

override def put(key: Array[Byte], value: Array[Byte]): Unit = {
throw new UnsupportedOperationException("Operation not supported with state " +
"changelog writer v2")
throw StateStoreErrors.unsupportedOperationException(
operationName = "Put", entity = "changelog writer v2")
}

override def put(key: Array[Byte], value: Array[Byte], colFamilyName: String): Unit = {
Expand All @@ -206,8 +206,8 @@ class StateStoreChangelogWriterV2(
}

override def delete(key: Array[Byte]): Unit = {
throw new UnsupportedOperationException("Operation not supported with state " +
"changelog writer v2")
throw StateStoreErrors.unsupportedOperationException(
operationName = "Delete", entity = "changelog writer v2")
}

override def delete(key: Array[Byte], colFamilyName: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.state

import org.apache.spark.{SparkException, SparkUnsupportedOperationException}

/**
* Object for grouping error messages from (most) exceptions thrown from State API V2
*
* ERROR_CLASS has a prefix of "STATE_STORE_" to indicate where the error is from
*/
object StateStoreErrors {
def implicitKeyNotFound(stateName: String): SparkException = {
SparkException.internalError(
msg = s"Implicit key not found in state store for stateName=$stateName",
category = "TWS"
)
}

def multipleColumnFamiliesNotSupported(stateStoreProvider: String):
StateStoreMultipleColumnFamiliesNotSupportedException = {
new StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider)
}

def unsupportedOperationException(operationName: String, entity: String):
StateStoreUnsupportedOperationException = {
new StateStoreUnsupportedOperationException(operationName, entity)
}
}

class StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: String)
extends SparkUnsupportedOperationException(
errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
messageParameters = Map("stateStoreProvider" -> stateStoreProvider)
)

class StateStoreUnsupportedOperationException(operationType: String, entity: String)
extends SparkUnsupportedOperationException(
errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
messageParameters = Map("operationType" -> operationType, "entity" -> entity)
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MemoryStateStore extends StateStore() {
}

override def createColFamilyIfAbsent(colFamilyName: String): Unit = {
throw new UnsupportedOperationException("Creating multiple column families is not supported")
throw StateStoreErrors.multipleColumnFamiliesNotSupported("MemoryStateStoreProvider")
}

override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = map.get(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.rocksdb.CompressionType
import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager, FileSystemBasedCheckpointFileManager}
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream}
Expand Down Expand Up @@ -689,6 +689,41 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}

testWithChangelogCheckpointingEnabled("RocksDB: Unsupported Operations" +
" with Changelog Checkpointing") {
val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1")
val fileManager = new RocksDBFileManager(
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
val changelogWriter = fileManager.getChangeLogWriter(1)

val ex1 = intercept[SparkUnsupportedOperationException] {
changelogWriter.put("a", "1", "testColFamily")
}

checkError(
ex1,
errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
parameters = Map(
"operationType" -> "Put",
"entity" -> "changelog writer v1"
),
matchPVals = true
)
val ex2 = intercept[SparkUnsupportedOperationException] {
changelogWriter.delete("a", "testColFamily")
}

checkError(
ex2,
errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
parameters = Map(
"operationType" -> "Delete",
"entity" -> "changelog writer v1"
),
matchPVals = true
)
}

testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") {
val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1")
val fileManager = new RocksDBFileManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Random
import org.apache.hadoop.conf.Configuration
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkException
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl}
Expand Down Expand Up @@ -91,14 +92,22 @@ class ValueStateSuite extends SharedSparkSession
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])

val stateName = "testState"
val testState: ValueState[Long] = handle.getValueState[Long]("testState")
assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isEmpty)
val ex = intercept[Exception] {
testState.update(123)
}

assert(ex.isInstanceOf[UnsupportedOperationException])
assert(ex.getMessage.contains("Implicit key not found"))
assert(ex.isInstanceOf[SparkException])
checkError(
ex.asInstanceOf[SparkException],
errorClass = "INTERNAL_ERROR_TWS",
parameters = Map(
"message" -> s"Implicit key not found in state store for stateName=$stateName"
),
matchPVals = true
)
ImplicitGroupingKeyTracker.setImplicitKey("test_key")
assert(ImplicitGroupingKeyTracker.getImplicitKeyOption.isDefined)
testState.update(123)
Expand All @@ -110,9 +119,14 @@ class ValueStateSuite extends SharedSparkSession
val ex1 = intercept[Exception] {
testState.update(123)
}

assert(ex1.isInstanceOf[UnsupportedOperationException])
assert(ex1.getMessage.contains("Implicit key not found"))
checkError(
ex.asInstanceOf[SparkException],
errorClass = "INTERNAL_ERROR_TWS",
parameters = Map(
"message" -> s"Implicit key not found in state store for stateName=$stateName"
),
matchPVals = true
)
}
}

Expand Down Expand Up @@ -184,4 +198,23 @@ class ValueStateSuite extends SharedSparkSession
assert(testState2.get() === null)
}
}

test("colFamily with HDFSBackedStateStoreProvider should fail") {
val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
val provider = new HDFSBackedStateStoreProvider()
val storeConf = new StateStoreConf(new SQLConf())
val ex = intercept[StateStoreMultipleColumnFamiliesNotSupportedException] {
provider.init(
storeId, keySchema, valueSchema, 0, useColumnFamilies = true,
storeConf, new Configuration)
}
checkError(
ex,
errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
parameters = Map(
"stateStoreProvider" -> "HDFSStateStoreProvider"
),
matchPVals = true
)
}
}

0 comments on commit 00e63d6

Please sign in to comment.