From 29e1e9e7c111276e7edcf583e51d3023b4fe6602 Mon Sep 17 00:00:00 2001 From: Chaoqin Li Date: Mon, 30 Oct 2023 10:08:42 -0700 Subject: [PATCH] add source metrics --- .../spark/sql/pulsar/PulsarSource.scala | 26 +++++++++++--- .../spark/sql/pulsar/PulsarSourceRDD.scala | 11 +++--- .../pulsar/PulsarMicroBatchSourceSuite.scala | 34 ++++++++++++++++--- 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala index 8405e65..5379bea 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala @@ -15,8 +15,9 @@ package org.apache.spark.sql.pulsar import java.{util => ju} +import java.util.Optional -import scala.collection.JavaConverters.asScalaBufferConverter +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.pulsar.client.admin.PulsarAdmin @@ -30,11 +31,12 @@ import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.JSONOptionsInRead import org.apache.spark.sql.connector.read.streaming -import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl} +import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, ReportsSourceMetrics, SupportsAdmissionControl} import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset, Source} import org.apache.spark.sql.pulsar.PulsarOptions.ServiceUrlOptionKey import org.apache.spark.sql.pulsar.SpecificPulsarOffset.getTopicOffsets import org.apache.spark.sql.types.StructType +import org.apache.spark.util.LongAccumulator private[pulsar] class PulsarSource( @@ -51,7 +53,8 @@ private[pulsar] class PulsarSource( jsonOptions: JSONOptionsInRead) extends Source with Logging - with SupportsAdmissionControl { + with SupportsAdmissionControl + with ReportsSourceMetrics { import PulsarSourceUtils._ @@ -67,6 +70,11 @@ private[pulsar] class PulsarSource( private var currentTopicOffsets: Option[Map[String, MessageId]] = None + private val rowsBytesAccumulator: LongAccumulator = { + val accumulator = new LongAccumulator + sqlContext.sparkContext.register(accumulator, "pulsarLatestMicroBatchRowsBytesCounter") + accumulator + } private lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema @@ -179,7 +187,8 @@ private[pulsar] class PulsarSource( pollTimeoutMs, failOnDataLoss, subscriptionNamePrefix, - jsonOptions) + jsonOptions, + rowsBytesAccumulator) logInfo( "GetBatch generating RDD of offset range: " + @@ -193,13 +202,20 @@ private[pulsar] class PulsarSource( pulsarHelper.commitCursorToOffset(off) } + override def metrics(optional: Optional[streaming.Offset]): ju.Map[String, String] = { + // This is called during query progress reporting after a batch finishes. + val currBatchMetrics = Seq("numInputRows" -> rowsBytesAccumulator.count.toString, + "numInputBytes" -> rowsBytesAccumulator.value.toString).toMap.asJava + rowsBytesAccumulator.reset() + currBatchMetrics + } + override def stop(): Unit = synchronized { if (!stopped) { pulsarHelper.removeCursor() pulsarHelper.close() stopped = true } - } } diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala index 0e28754..36ced90 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala @@ -24,7 +24,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.JSONOptionsInRead import org.apache.spark.sql.pulsar.PulsarSourceUtils._ -import org.apache.spark.util.{NextIterator, Utils} +import org.apache.spark.util.{LongAccumulator, NextIterator, Utils} private[pulsar] case class PulsarSourceRDDPartition(index: Int, offsetRange: PulsarOffsetRange) extends Partition @@ -53,7 +53,8 @@ private[pulsar] abstract class PulsarSourceRDDBase( topic: String, startOffset: MessageId, endOffset: MessageId, - context: TaskContext): Iterator[InternalRow] = { + context: TaskContext, + rowsBytesAccumulator: Option[LongAccumulator] = None): Iterator[InternalRow] = { val deserializer = new PulsarDeserializer(schemaInfo.si, jsonOptions) val schema: Schema[_] = SchemaUtils.getPSchema(schemaInfo.si) @@ -136,6 +137,7 @@ private[pulsar] abstract class PulsarSourceRDDBase( return null } + rowsBytesAccumulator.foreach(_.add(currentMessage.size())) currentId = currentMessage.getMessageId finished = false @@ -170,7 +172,8 @@ private[pulsar] class PulsarSourceRDD( pollTimeoutMs: Int, failOnDataLoss: Boolean, subscriptionNamePrefix: String, - jsonOptions: JSONOptionsInRead) + jsonOptions: JSONOptionsInRead, + rowsBytesAccumulator: LongAccumulator) extends PulsarSourceRDDBase( sc, schemaInfo, @@ -198,7 +201,7 @@ private[pulsar] class PulsarSourceRDD( return Iterator.empty } - computeInner(tp, start, end, context) + computeInner(tp, start, end, context, Some(rowsBytesAccumulator)) } } diff --git a/src/test/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchSourceSuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchSourceSuite.scala index 7b0b51a..9e2a7dd 100644 --- a/src/test/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchSourceSuite.scala +++ b/src/test/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchSourceSuite.scala @@ -15,15 +15,19 @@ package org.apache.spark.sql.pulsar import java.util.concurrent.ConcurrentLinkedQueue +import scala.collection.JavaConverters._ + import org.apache.pulsar.client.admin.PulsarAdmin import org.apache.spark.SparkException import org.apache.spark.sql.ForeachWriter -import org.apache.spark.sql.execution.streaming.StreamingExecutionRelation +import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingExecutionRelation} import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.pulsar.PulsarOptions.{ServiceUrlOptionKey, TopicPattern} +import org.apache.spark.sql.streaming.StreamingQueryProgress import org.apache.spark.sql.streaming.Trigger.ProcessingTime import org.apache.spark.util.Utils + class PulsarMicroBatchV1SourceSuite extends PulsarMicroBatchSourceSuiteBase { test("V1 Source is used by default") { val topic = newTopic() @@ -63,8 +67,8 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase { test("input row metrics") { val topic = newTopic() + createTopic(topic, 12) sendMessages(topic, Array("-1")) - require(getLatestOffsets(Set(topic)).size === 1) val pulsar = spark.readStream .format("pulsar") @@ -74,6 +78,15 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase { .selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] + def checkSourceMetrics( + progresses: Array[StreamingQueryProgress], + numInputRows: Long): Boolean = { + val sourceMetrics = progresses.map(_.sources.head.metrics) + sourceMetrics.map(_.get("numInputRows").toLong).sum == numInputRows && + sourceMetrics.map(_.get("numInputBytes").toLong).sum >= numInputRows && + progresses.map(_.numInputRows).sum == numInputRows + } + val mapped = pulsar.map(kv => kv._2.toInt + 1) testStream(mapped)( StartStream(trigger = ProcessingTime(1)), @@ -81,9 +94,20 @@ abstract class PulsarMicroBatchSourceSuiteBase extends PulsarSourceSuiteBase { AddPulsarData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), AssertOnQuery { query => - val recordsRead = query.recentProgress.map(_.numInputRows).sum - recordsRead == 3 - } + checkSourceMetrics(query.recentProgress, 3) + }, + AddPulsarData(Set(topic), 4, 5), + CheckAnswer(2, 3, 4, 5, 6), + AssertOnQuery { query => + checkSourceMetrics(query.recentProgress, 5) + }, + StopStream, + StartStream(trigger = ProcessingTime(1)), + AddPulsarData(Set(topic), 6), + CheckAnswer(2, 3, 4, 5, 6, 7), + AssertOnQuery { query => + checkSourceMetrics(query.recentProgress, 1) + }, ) }