Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List state ttl #2

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3530,6 +3530,12 @@
],
"sqlState" : "0A000"
},
"STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE" : {
"message" : [
"State store operation=<operationType> on state=<stateName> does not support TTL in NoTTL() mode."
],
"sqlState" : "42802"
},
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE" : {
"message" : [
"Failed to perform stateful processor operation=<operationType> with invalid handle state=<handleState>."
Expand Down Expand Up @@ -4336,6 +4342,11 @@
"Removing column families with <stateStoreProvider> is not supported."
]
},
"STATE_STORE_TTL" : {
"message" : [
"State TTL with <stateStoreProvider> is not supported. Please use RocksDBStateStoreProvider."
]
},
"TABLE_OPERATION" : {
"message" : [
"Table <tableName> does not support <operation>. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by \"spark.sql.catalog\"."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.ProductEncoder
import org.apache.spark.sql.connect.common.UdfUtils
import org.apache.spark.sql.expressions.ScalarUserDefinedFunction
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, StatefulProcessor, TimeoutMode}
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, StatefulProcessor, TimeoutMode, TTLMode}

/**
* A [[Dataset]] has been logically grouped by a user specified grouping key. Users should not
Expand Down Expand Up @@ -830,12 +830,15 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable {
* Instance of statefulProcessor whose functions will be invoked by the operator.
* @param timeoutMode
* The timeout mode of the stateful processor.
* @param ttlMode
* The ttlMode to evict user state on ttl expiration.
* @param outputMode
* The output mode of the stateful processor. Defaults to APPEND mode.
*/
def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
timeoutMode: TimeoutMode,
ttlMode: TTLMode,
outputMode: OutputMode = OutputMode.Append()): Dataset[U] = {
throw new UnsupportedOperationException
}
Expand Down
2 changes: 2 additions & 0 deletions dev/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
files="sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java"/>
<suppress checks="MethodName"
files="sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
<suppress checks="MethodName"
files="sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java"/>
<suppress checks="LineLength"
files="src/main/java/org/apache/spark/sql/api/java/*"/>
<suppress checks="IllegalImport"
Expand Down
4 changes: 4 additions & 0 deletions docs/sql-error-conditions-unsupported-feature-error-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ Creating multiple column families with `<stateStoreProvider>` is not supported.

Removing column families with `<stateStoreProvider>` is not supported.

## STATE_STORE_TTL

State TTL with `<stateStoreProvider>` is not supported. Please use RocksDBStateStoreProvider.

## TABLE_OPERATION

Table `<tableName>` does not support `<operation>`. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog".
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2162,6 +2162,12 @@ The SQL config `<sqlConf>` cannot be found. Please verify that the config exists

Star (*) is not allowed in a select list when GROUP BY an ordinal position is used.

### STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

State store operation=`<operationType>` on state=`<stateName>` does not support TTL in NoTTL() mode.

### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
49 changes: 49 additions & 0 deletions sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.plans.logical.*;

/**
* Represents the type of ttl modes possible for the Dataset operations
* {@code transformWithState}.
*/
@Experimental
@Evolving
public class TTLMode {

/**
* Specifies that there is no TTL for the user state. User state would not
* be cleaned up by Spark automatically.
*/
public static final TTLMode NoTTL() {
return NoTTL$.MODULE$;
}

/**
* Specifies that all ttl durations for user state are in processing time.
*/
public static final TTLMode ProcessingTimeTTL() { return ProcessingTimeTTL$.MODULE$; }

/**
* Specifies that all ttl durations for user state are in event time.
*/
public static final TTLMode EventTimeTTL() { return EventTimeTTL$.MODULE$; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.streaming.TTLMode

/** TTL types used in tranformWithState operator */
case object NoTTL extends TTLMode

case object ProcessingTimeTTL extends TTLMode

case object EventTimeTTL extends TTLMode
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.streaming

import java.time.Duration

import org.apache.spark.annotation.{Evolving, Experimental}

@Experimental
Expand All @@ -33,13 +35,13 @@ private[sql] trait ListState[S] extends Serializable {
def get(): Iterator[S]

/** Update the value of the list. */
def put(newState: Array[S]): Unit
def put(newState: Array[S], ttlDuration: Duration = Duration.ZERO): Unit

/** Append an entry to the list */
def appendValue(newState: S): Unit
def appendValue(newState: S, ttlDuration: Duration = Duration.ZERO): Unit

/** Append an entire list to the existing value */
def appendList(newState: Array[S]): Unit
def appendList(newState: Array[S], ttlDuration: Duration = Duration.ZERO): Unit

/** Removes this state for the given grouping key. */
def clear(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import org.apache.spark.sql.Encoder
private[sql] trait StatefulProcessorHandle extends Serializable {

/**
* Function to create new or return existing single value state variable of given type
* Function to create new or return existing single value state variable of given type.
* The user must ensure to call this function only within the `init()` method of the
* StatefulProcessor.
* @param stateName - name of the state variable
*
* @param stateName - name of the state variable
* @param valEncoder - SQL encoder for state variable
* @tparam T - type of state variable
* @return - instance of ValueState of type T that can be used to store state persistently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming

import java.io.Serializable
import java.time.Duration

import org.apache.spark.annotation.{Evolving, Experimental}

Expand All @@ -42,8 +43,13 @@ private[sql] trait ValueState[S] extends Serializable {
/** Get the state if it exists as an option and None otherwise */
def getOption(): Option[S]

/** Update the value of the state. */
def update(newState: S): Unit
/**
* Update the value of the state.
* @param newState the new value
* @param ttlDuration set the ttl to current batch processing time (for processing time TTL mode)
* or current watermark (for event time ttl mode) plus ttlDuration
*/
def update(newState: S, ttlDuration: Duration = Duration.ZERO): Unit

/** Remove this state. */
def clear(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, StatefulProcessor, TimeoutMode}
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, StatefulProcessor, TimeoutMode, TTLMode}
import org.apache.spark.sql.types._

object CatalystSerde {
Expand Down Expand Up @@ -574,6 +574,7 @@ object TransformWithState {
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
statefulProcessor: StatefulProcessor[K, V, U],
ttlMode: TTLMode,
timeoutMode: TimeoutMode,
outputMode: OutputMode,
child: LogicalPlan): LogicalPlan = {
Expand All @@ -584,6 +585,7 @@ object TransformWithState {
groupingAttributes,
dataAttributes,
statefulProcessor.asInstanceOf[StatefulProcessor[Any, Any, Any]],
ttlMode,
timeoutMode,
outputMode,
keyEncoder.asInstanceOf[ExpressionEncoder[Any]],
Expand All @@ -600,6 +602,7 @@ case class TransformWithState(
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
statefulProcessor: StatefulProcessor[Any, Any, Any],
ttlMode: TTLMode,
timeoutMode: TimeoutMode,
outputMode: OutputMode,
keyEncoder: ExpressionEncoder[Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.expressions.ReduceAggregator
import org.apache.spark.sql.internal.TypedAggUtils
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, StatefulProcessor, TimeoutMode}
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, StatefulProcessor, TimeoutMode, TTLMode}

/**
* A [[Dataset]] has been logically grouped by a user specified grouping key. Users should not
Expand Down Expand Up @@ -656,19 +656,22 @@ class KeyValueGroupedDataset[K, V] private[sql](
* @param statefulProcessor Instance of statefulProcessor whose functions will be invoked by the
* operator.
* @param timeoutMode The timeout mode of the stateful processor.
* @param ttlMode The ttlMode to evict user state on ttl expiration
* @param outputMode The output mode of the stateful processor. Defaults to APPEND mode.
*
*/
private[sql] def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
timeoutMode: TimeoutMode,
ttlMode: TTLMode,
outputMode: OutputMode = OutputMode.Append()): Dataset[U] = {
Dataset[U](
sparkSession,
TransformWithState[K, V, U](
groupingAttributes,
dataAttributes,
statefulProcessor,
ttlMode,
timeoutMode,
outputMode,
child = logicalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,14 +751,15 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case TransformWithState(
keyDeserializer, valueDeserializer, groupingAttributes,
dataAttributes, statefulProcessor, timeoutMode, outputMode,
dataAttributes, statefulProcessor, ttlMode, timeoutMode, outputMode,
keyEncoder, outputAttr, child) =>
val execPlan = TransformWithStateExec(
keyDeserializer,
valueDeserializer,
groupingAttributes,
dataAttributes,
statefulProcessor,
ttlMode,
timeoutMode,
outputMode,
keyEncoder,
Expand Down Expand Up @@ -917,10 +918,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
hasInitialState, planLater(initialState), planLater(child)
) :: Nil
case logical.TransformWithState(keyDeserializer, valueDeserializer, groupingAttributes,
dataAttributes, statefulProcessor, timeoutMode, outputMode, keyEncoder,
dataAttributes, statefulProcessor, ttlMode, timeoutMode, outputMode, keyEncoder,
outputObjAttr, child) =>
TransformWithStateExec.generateSparkPlanForBatchQueries(keyDeserializer, valueDeserializer,
groupingAttributes, dataAttributes, statefulProcessor, timeoutMode, outputMode,
groupingAttributes, dataAttributes, statefulProcessor, ttlMode, timeoutMode, outputMode,
keyEncoder, outputObjAttr, planLater(child)) :: Nil

case _: FlatMapGroupsInPandasWithState =>
Expand Down
Loading