Skip to content

Commit d36b61e

Browse files
Jackie Zhangyhuang-db
authored andcommitted
[SPARK-51940][SS] Add interface for managing streaming checkpoint metadata
### What changes were proposed in this pull request? Minor refactor to introduce an interface for accessing the metadata (e.g. offset / commit logs) in a streaming checkpoint. ### Why are the changes needed? To standardize the access pattern. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This is a pure refactoring, existing tests should suffice. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50746 from jackierwzhang/spark-51940-checkpoint-metadata-interface. Authored-by: Jackie Zhang <ruowang.zhang+data@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 5c0e61d commit d36b61e

File tree

7 files changed

+157
-49
lines changed

7 files changed

+157
-49
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.{J
3333
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues.JoinSideValues
3434
import org.apache.spark.sql.execution.datasources.v2.state.metadata.{StateMetadataPartitionReader, StateMetadataTableEntry}
3535
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
36-
import org.apache.spark.sql.execution.streaming.{CommitLog, OffsetSeqLog, OffsetSeqMetadata, TimerStateUtils, TransformWithStateOperatorProperties, TransformWithStateVariableInfo}
37-
import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
36+
import org.apache.spark.sql.execution.streaming.{OffsetSeqMetadata, StreamingQueryCheckpointMetadata, TimerStateUtils, TransformWithStateOperatorProperties, TransformWithStateVariableInfo}
37+
import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_STATE
3838
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide}
3939
import org.apache.spark.sql.execution.streaming.state.{InMemoryStateSchemaProvider, KeyStateEncoderSpec, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateSchemaCompatibilityChecker, StateSchemaMetadata, StateSchemaProvider, StateStore, StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreProviderId}
4040
import org.apache.spark.sql.sources.DataSourceRegister
@@ -122,8 +122,7 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
122122
override def supportsExternalMetadata(): Boolean = false
123123

124124
private def buildStateStoreConf(checkpointLocation: String, batchId: Long): StateStoreConf = {
125-
val offsetLog = new OffsetSeqLog(session,
126-
new Path(checkpointLocation, DIR_NAME_OFFSETS).toString)
125+
val offsetLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog
127126
offsetLog.get(batchId) match {
128127
case Some(value) =>
129128
val metadata = value.metadata.getOrElse(
@@ -548,8 +547,7 @@ object StateSourceOptions extends DataSourceOptions {
548547
}
549548

550549
private def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Long = {
551-
val commitLog = new CommitLog(session,
552-
new Path(checkpointLocation, DIR_NAME_COMMITS).toString)
550+
val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog
553551
commitLog.getLatest() match {
554552
case Some((lastId, _)) => lastId
555553
case None => throw StateDataSourceErrors.committedBatchUnavailable(checkpointLocation)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,21 @@ class AsyncProgressTrackingMicroBatchExecution(
8383
}
8484
})
8585

86-
override val offsetLog = new AsyncOffsetSeqLog(
87-
sparkSession,
88-
checkpointFile("offsets"),
89-
asyncWritesExecutorService,
90-
asyncProgressTrackingCheckpointingIntervalMs,
91-
clock = triggerClock
92-
)
93-
94-
override val commitLog =
95-
new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService)
86+
/**
87+
* Manages the metadata from this checkpoint location with async write operations.
88+
*/
89+
private val asyncCheckpointMetadata =
90+
new AsyncStreamingQueryCheckpointMetadata(
91+
sparkSessionForStream,
92+
resolvedCheckpointRoot,
93+
asyncWritesExecutorService,
94+
asyncProgressTrackingCheckpointingIntervalMs,
95+
triggerClock
96+
)
97+
98+
override lazy val offsetLog: AsyncOffsetSeqLog = asyncCheckpointMetadata.offsetLog
99+
100+
override lazy val commitLog: AsyncCommitLog = asyncCheckpointMetadata.commitLog
96101

97102
// perform quick validation to fail faster
98103
validateAndGetTrigger()
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.streaming
18+
19+
import java.util.concurrent.ThreadPoolExecutor
20+
21+
import org.apache.spark.sql.SparkSession
22+
import org.apache.spark.util.Clock
23+
24+
/**
25+
* A version of [[StreamingQueryCheckpointMetadata]] that supports async state checkpointing.
26+
*
27+
* @param sparkSession Spark session
28+
* @param resolvedCheckpointRoot The resolved checkpoint root path
29+
* @param asyncWritesExecutorService The executor service for async writes
30+
* @param asyncProgressTrackingCheckpointingIntervalMs The interval for async progress
31+
* @param triggerClock The clock to use for trigger time
32+
*/
33+
class AsyncStreamingQueryCheckpointMetadata(
34+
sparkSession: SparkSession,
35+
resolvedCheckpointRoot: String,
36+
asyncWritesExecutorService: ThreadPoolExecutor,
37+
asyncProgressTrackingCheckpointingIntervalMs: Long,
38+
triggerClock: Clock)
39+
extends StreamingQueryCheckpointMetadata(sparkSession, resolvedCheckpointRoot) {
40+
41+
override lazy val offsetLog = new AsyncOffsetSeqLog(
42+
sparkSession,
43+
checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS),
44+
asyncWritesExecutorService,
45+
asyncProgressTrackingCheckpointingIntervalMs,
46+
clock = triggerClock
47+
)
48+
49+
override lazy val commitLog = new AsyncCommitLog(
50+
sparkSession,
51+
checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS),
52+
asyncWritesExecutorService
53+
)
54+
55+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,20 @@ abstract class StreamExecution(
151151
*/
152152
protected def sources: Seq[SparkDataStream]
153153

154-
/** Metadata associated with the whole query */
155-
protected val streamMetadata: StreamMetadata = {
156-
val metadataPath = new Path(checkpointFile("metadata"))
157-
val hadoopConf = sparkSession.sessionState.newHadoopConf()
158-
StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
159-
val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
160-
StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
161-
newMetadata
162-
}
163-
}
154+
/** Isolated spark session to run the batches with. */
155+
protected val sparkSessionForStream: SparkSession = sparkSession.cloneSession()
156+
157+
/**
158+
* Manages the metadata from this checkpoint location.
159+
*/
160+
protected val checkpointMetadata =
161+
new StreamingQueryCheckpointMetadata(sparkSessionForStream, resolvedCheckpointRoot)
162+
163+
private val streamMetadata: StreamMetadata = checkpointMetadata.streamMetadata
164+
165+
lazy val offsetLog: OffsetSeqLog = checkpointMetadata.offsetLog
166+
167+
lazy val commitLog: CommitLog = checkpointMetadata.commitLog
164168

165169
/**
166170
* A map of current watermarks, keyed by the position of the watermark operator in the
@@ -209,9 +213,6 @@ abstract class StreamExecution(
209213
lazy val streamMetrics = new MetricsReporter(
210214
this, s"spark.streaming.${Option(name).getOrElse(id)}")
211215

212-
/** Isolated spark session to run the batches with. */
213-
protected val sparkSessionForStream = sparkSession.cloneSession()
214-
215216
/**
216217
* The thread that runs the micro-batches of this stream. Note that this thread must be
217218
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a
@@ -227,21 +228,6 @@ abstract class StreamExecution(
227228
}
228229
}
229230

230-
/**
231-
* A write-ahead-log that records the offsets that are present in each batch. In order to ensure
232-
* that a given batch will always consist of the same data, we write to this log *before* any
233-
* processing is done. Thus, the Nth record in this log indicated data that is currently being
234-
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
235-
*/
236-
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
237-
238-
/**
239-
* A log that records the batch ids that have completed. This is used to check if a batch was
240-
* fully processed, and its output was committed to the sink, hence no need to process it again.
241-
* This is used (for instance) during restart, to help identify which batch to run next.
242-
*/
243-
val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
244-
245231
/** Whether all fields of the query have been initialized */
246232
private def isInitialized: Boolean = state.get != INITIALIZING
247233

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ object StreamingCheckpointConstants {
2121
val DIR_NAME_COMMITS = "commits"
2222
val DIR_NAME_OFFSETS = "offsets"
2323
val DIR_NAME_STATE = "state"
24+
val DIR_NAME_METADATA = "metadata"
2425
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.streaming
18+
19+
import java.util.UUID
20+
21+
import org.apache.hadoop.fs.Path
22+
23+
import org.apache.spark.sql.SparkSession
24+
25+
/**
26+
* An interface for accessing the checkpoint metadata associated with a streaming query.
27+
* @param sparkSession Spark session
28+
* @param resolvedCheckpointRoot The resolved checkpoint root path
29+
*/
30+
class StreamingQueryCheckpointMetadata(sparkSession: SparkSession, resolvedCheckpointRoot: String) {
31+
32+
/**
33+
* A write-ahead-log that records the offsets that are present in each batch. In order to ensure
34+
* that a given batch will always consist of the same data, we write to this log *before* any
35+
* processing is done. Thus, the Nth record in this log indicated data that is currently being
36+
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
37+
*/
38+
lazy val offsetLog =
39+
new OffsetSeqLog(sparkSession, checkpointFile(StreamingCheckpointConstants.DIR_NAME_OFFSETS))
40+
41+
/**
42+
* A log that records the batch ids that have completed. This is used to check if a batch was
43+
* fully processed, and its output was committed to the sink, hence no need to process it again.
44+
* This is used (for instance) during restart, to help identify which batch to run next.
45+
*/
46+
lazy val commitLog =
47+
new CommitLog(sparkSession, checkpointFile(StreamingCheckpointConstants.DIR_NAME_COMMITS))
48+
49+
/** Metadata associated with the whole query */
50+
final lazy val streamMetadata: StreamMetadata = {
51+
val metadataPath = new Path(checkpointFile(StreamingCheckpointConstants.DIR_NAME_METADATA))
52+
val hadoopConf = sparkSession.sessionState.newHadoopConf()
53+
StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
54+
val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
55+
StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
56+
newMetadata
57+
}
58+
}
59+
60+
/** Returns the path of a file with `name` in the checkpoint directory. */
61+
final protected def checkpointFile(name: String): String =
62+
new Path(new Path(resolvedCheckpointRoot), name).toString
63+
64+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ import org.json4s.jackson.Serialization
3131
import org.apache.spark.internal.{Logging, LogKeys, MDC}
3232
import org.apache.spark.sql.SparkSession
3333
import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceErrors
34-
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, CommitLog, MetadataVersionUtil, OffsetSeqLog, StateStoreWriter}
34+
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, CommitLog, MetadataVersionUtil, StateStoreWriter, StreamingQueryCheckpointMetadata}
3535
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
36-
import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS}
36+
import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_OFFSETS
3737
import org.apache.spark.sql.execution.streaming.state.OperatorStateMetadataUtils.{OperatorStateMetadataReader, OperatorStateMetadataWriter}
3838

3939
/**
@@ -172,14 +172,13 @@ object OperatorStateMetadataUtils extends Logging {
172172
}
173173

174174
def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): Long = {
175-
val offsetLog = new OffsetSeqLog(session,
176-
new Path(checkpointLocation, DIR_NAME_OFFSETS).toString)
175+
val offsetLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).offsetLog
177176
offsetLog.getLatest().map(_._1).getOrElse(throw
178177
StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation))
179178
}
180179

181180
def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Option[Long] = {
182-
val commitLog = new CommitLog(session, new Path(checkpointLocation, DIR_NAME_COMMITS).toString)
181+
val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog
183182
commitLog.getLatest().map(_._1)
184183
}
185184
}

0 commit comments

Comments
 (0)