Skip to content

Commit

Permalink
add source metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
chaoqin-li1123 committed Oct 30, 2023
1 parent ed90ada commit 29e1e9e
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
26 changes: 21 additions & 5 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ package org.apache.spark.sql.pulsar


import java.{util => ju}
import java.util.Optional

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.pulsar.client.admin.PulsarAdmin
Expand All @@ -30,11 +31,12 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
import org.apache.spark.sql.connector.read.streaming
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl}
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, ReportsSourceMetrics, SupportsAdmissionControl}
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset, Source}
import org.apache.spark.sql.pulsar.PulsarOptions.ServiceUrlOptionKey
import org.apache.spark.sql.pulsar.SpecificPulsarOffset.getTopicOffsets
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.LongAccumulator


private[pulsar] class PulsarSource(
Expand All @@ -51,7 +53,8 @@ private[pulsar] class PulsarSource(
jsonOptions: JSONOptionsInRead)
extends Source
with Logging
with SupportsAdmissionControl {
with SupportsAdmissionControl
with ReportsSourceMetrics {

import PulsarSourceUtils._

Expand All @@ -67,6 +70,11 @@ private[pulsar] class PulsarSource(

private var currentTopicOffsets: Option[Map[String, MessageId]] = None

private val rowsBytesAccumulator: LongAccumulator = {
val accumulator = new LongAccumulator
sqlContext.sparkContext.register(accumulator, "pulsarLatestMicroBatchRowsBytesCounter")
accumulator
}

private lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema

Expand Down Expand Up @@ -179,7 +187,8 @@ private[pulsar] class PulsarSource(
pollTimeoutMs,
failOnDataLoss,
subscriptionNamePrefix,
jsonOptions)
jsonOptions,
rowsBytesAccumulator)

logInfo(
"GetBatch generating RDD of offset range: " +
Expand All @@ -193,13 +202,20 @@ private[pulsar] class PulsarSource(
pulsarHelper.commitCursorToOffset(off)
}

override def metrics(optional: Optional[streaming.Offset]): ju.Map[String, String] = {
// This is called during query progress reporting after a batch finishes.
val currBatchMetrics = Seq("numInputRows" -> rowsBytesAccumulator.count.toString,
"numInputBytes" -> rowsBytesAccumulator.value.toString).toMap.asJava
rowsBytesAccumulator.reset()
currBatchMetrics
}

override def stop(): Unit = synchronized {
if (!stopped) {
pulsarHelper.removeCursor()
pulsarHelper.close()
stopped = true
}

}
}

Expand Down
11 changes: 7 additions & 4 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
import org.apache.spark.sql.pulsar.PulsarSourceUtils._
import org.apache.spark.util.{NextIterator, Utils}
import org.apache.spark.util.{LongAccumulator, NextIterator, Utils}

private[pulsar] case class PulsarSourceRDDPartition(index: Int, offsetRange: PulsarOffsetRange)
extends Partition
Expand Down Expand Up @@ -53,7 +53,8 @@ private[pulsar] abstract class PulsarSourceRDDBase(
topic: String,
startOffset: MessageId,
endOffset: MessageId,
context: TaskContext): Iterator[InternalRow] = {
context: TaskContext,
rowsBytesAccumulator: Option[LongAccumulator] = None): Iterator[InternalRow] = {

val deserializer = new PulsarDeserializer(schemaInfo.si, jsonOptions)
val schema: Schema[_] = SchemaUtils.getPSchema(schemaInfo.si)
Expand Down Expand Up @@ -136,6 +137,7 @@ private[pulsar] abstract class PulsarSourceRDDBase(
return null
}

rowsBytesAccumulator.foreach(_.add(currentMessage.size()))
currentId = currentMessage.getMessageId

finished = false
Expand Down Expand Up @@ -170,7 +172,8 @@ private[pulsar] class PulsarSourceRDD(
pollTimeoutMs: Int,
failOnDataLoss: Boolean,
subscriptionNamePrefix: String,
jsonOptions: JSONOptionsInRead)
jsonOptions: JSONOptionsInRead,
rowsBytesAccumulator: LongAccumulator)
extends PulsarSourceRDDBase(
sc,
schemaInfo,
Expand Down Expand Up @@ -198,7 +201,7 @@ private[pulsar] class PulsarSourceRDD(
return Iterator.empty
}

computeInner(tp, start, end, context)
computeInner(tp, start, end, context, Some(rowsBytesAccumulator))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ package org.apache.spark.sql.pulsar

import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.spark.SparkException
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingExecutionRelation}
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.pulsar.PulsarOptions.{ServiceUrlOptionKey, TopicPattern}
import org.apache.spark.sql.streaming.StreamingQueryProgress
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
import org.apache.spark.util.Utils


class PulsarMicroBatchV1SourceSuite extends PulsarMicroBatchSourceSuiteBase {
test("V1 Source is used by default") {
val topic = newTopic()
Expand Down Expand Up @@ -63,8 +67,8 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {

test("input row metrics") {
val topic = newTopic()
createTopic(topic, 12)
sendMessages(topic, Array("-1"))
require(getLatestOffsets(Set(topic)).size === 1)

val pulsar = spark.readStream
.format("pulsar")
Expand All @@ -74,16 +78,36 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase {
.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

def checkSourceMetrics(
progresses: Array[StreamingQueryProgress],
numInputRows: Long): Boolean = {
val sourceMetrics = progresses.map(_.sources.head.metrics)
sourceMetrics.map(_.get("numInputRows").toLong).sum == numInputRows &&
sourceMetrics.map(_.get("numInputBytes").toLong).sum >= numInputRows &&
progresses.map(_.numInputRows).sum == numInputRows
}

val mapped = pulsar.map(kv => kv._2.toInt + 1)
testStream(mapped)(
StartStream(trigger = ProcessingTime(1)),
makeSureGetOffsetCalled,
AddPulsarData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
AssertOnQuery { query =>
val recordsRead = query.recentProgress.map(_.numInputRows).sum
recordsRead == 3
}
checkSourceMetrics(query.recentProgress, 3)
},
AddPulsarData(Set(topic), 4, 5),
CheckAnswer(2, 3, 4, 5, 6),
AssertOnQuery { query =>
checkSourceMetrics(query.recentProgress, 5)
},
StopStream,
StartStream(trigger = ProcessingTime(1)),
AddPulsarData(Set(topic), 6),
CheckAnswer(2, 3, 4, 5, 6, 7),
AssertOnQuery { query =>
checkSourceMetrics(query.recentProgress, 1)
},
)
}

Expand Down

0 comments on commit 29e1e9e

Please sign in to comment.