Skip to content

Commit

Permalink
test: Switch to check posted event
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve Vaughan Jr committed Mar 28, 2024
1 parent 23bb303 commit a07a4db
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.connector.write;

import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
Expand All @@ -27,7 +28,7 @@
* This is patterned after {@code org.apache.spark.util.AccumulatorV2}
* </p>
*/
public class PartitionMetricsWriteInfo {
public class PartitionMetricsWriteInfo implements Serializable {

private final Map<String, PartitionMetrics> metrics = new TreeMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ import scala.collection.mutable
import scala.collection.mutable.HashMap

import org.apache.spark.TestUtils
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerTaskEnd}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanInfo}
import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.V1WriteCommand
import org.apache.spark.sql.connector.write.SparkListenerSQLPartitionMetrics
import org.apache.spark.sql.execution.{SparkPlan, SparkPlanInfo}
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore}
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.util.QueryExecutionListener


trait SQLMetricsTestUtils extends SQLTestUtils {
Expand Down Expand Up @@ -108,49 +106,37 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
assert(totalNumBytes > 0)
}

private class CaptureWriteCommand extends QueryExecutionListener {
private class CapturePartitionMetrics extends SparkListener {

val v1WriteCommands: mutable.Buffer[V1WriteCommand] = mutable.Buffer[V1WriteCommand]()
val events: mutable.Buffer[SparkListenerSQLPartitionMetrics] =
mutable.Buffer[SparkListenerSQLPartitionMetrics]()

override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
if (qe.executedPlan.isInstanceOf[ExecutedCommandExec] ||
qe.executedPlan.isInstanceOf[DataWritingCommandExec]) {
qe.optimizedPlan match {
case _: V1WriteCommand =>
val executedPlanCmd = qe.executedPlan.asInstanceOf[DataWritingCommandExec].cmd
v1WriteCommands += executedPlanCmd.asInstanceOf[V1WriteCommand]

// All other commands
case _ =>
logDebug(f"Query execution data is not currently supported for query: " +
f"${qe.toString} with plan class ${qe.executedPlan.getClass.getName} " +
f" and executed plan : ${qe.executedPlan}")
}
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case metrics: SparkListenerSQLPartitionMetrics => events += metrics
case _ =>
}
}

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}

}

protected def withQueryExecutionListener[L <: QueryExecutionListener]
protected def withSparkListener[L <: SparkListener]
(spark: SparkSession, listener: L)
(body: L => Unit): Unit = {
spark.listenerManager.register(listener)
spark.sparkContext.addSparkListener(listener)
try {
body(listener)
}
finally {
spark.listenerManager.unregister(listener)
spark.sparkContext.removeSparkListener(listener)
}
}


protected def testMetricsNonDynamicPartition(
dataFormat: String,
tableName: String): Unit = {
val listener = new CaptureWriteCommand()
withQueryExecutionListener(spark, listener) { _ =>
val listener = new CapturePartitionMetrics()
withSparkListener(spark, listener) { _ =>
withTable(tableName) {
Seq((1, 2)).toDF("i", "j")
.write.format(dataFormat).mode("overwrite").saveAsTable(tableName)
Expand All @@ -167,19 +153,16 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
}
}

// Verify that there were 2 write command for the entire write process. This test creates the
// table and performs a repartitioning
assert(listener.v1WriteCommands.length == 2)
assert(listener.v1WriteCommands.forall(
v1WriteCommand => v1WriteCommand.partitionMetrics.isEmpty))
// Verify that there are no partition metrics for the entire write process.
assert(listener.events.isEmpty)
}

protected def testMetricsDynamicPartition(
provider: String,
dataFormat: String,
tableName: String): Unit = {
val listener = new CaptureWriteCommand()
withQueryExecutionListener(spark, listener) { _ =>
val listener = new CapturePartitionMetrics()
withSparkListener(spark, listener) { _ =>
withTable(tableName) {
withTempPath { dir =>
spark.sql(
Expand Down Expand Up @@ -208,18 +191,18 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
}
}

// Verify that there was a single write command for the entire write process
assert(listener.v1WriteCommands.length == 1)
val v1WriteCommand = listener.v1WriteCommands.head
// Verify that there a single event for partition metrics for the entire write process. This
// test creates the table and performs a repartitioning, but only 1 action actually results
// in collecting partition metrics.
assert(listener.events.length == 1)
val event = listener.events.head

// Verify the number of partitions
assert(v1WriteCommand.partitionMetrics.keySet.size == 40)
assert(event.metrics.keySet.size == 40)
// Verify the number of files per partition
assert(v1WriteCommand.partitionMetrics.values.forall(
partitionStats => partitionStats.numFiles == 1))
event.metrics.values.forEach(partitionStats => assert(partitionStats.numFiles == 1))
// Verify the number of rows per partition
assert(v1WriteCommand.partitionMetrics.values.forall(
partitionStats => partitionStats.numRows == 2))
event.metrics.values.forEach(partitionStats => assert(partitionStats.numRecords == 2))
}

/**
Expand Down

0 comments on commit a07a4db

Please sign in to comment.