From 39c8173b8229658834995b35f5ca45def87032bd Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Wed, 12 Jul 2023 11:12:06 -0700 Subject: [PATCH 1/3] close producer during cleanup --- .../org/apache/spark/sql/pulsar/PulsarWriteTask.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala index 4f5ee2db..fc6eb69b 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala @@ -15,11 +15,8 @@ package org.apache.spark.sql.pulsar import java.{util => ju} import java.util.function.BiConsumer - import scala.collection.mutable - import org.apache.pulsar.client.api.{MessageId, Producer} - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} import org.apache.spark.sql.types._ @@ -209,6 +206,11 @@ private[pulsar] abstract class PulsarRowWriter( protected def producerClose(): Unit = { producerFlush() + if (singleProducer != null) { + singleProducer.close() + } else { + topic2Producer.foreach(_._2.close()) + } topic2Producer.clear() } } From a80001059b2926e8ad79054a689c64785c0e8e9f Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Wed, 12 Jul 2023 11:23:59 -0700 Subject: [PATCH 2/3] code style --- .../scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala index fc6eb69b..83173bc5 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala @@ -15,7 +15,9 @@ package org.apache.spark.sql.pulsar import java.{util => ju} import java.util.function.BiConsumer + import scala.collection.mutable + import org.apache.pulsar.client.api.{MessageId, Producer} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} From e88f7f680449df92f58f47749148886ce5681027 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Wed, 12 Jul 2023 11:29:38 -0700 Subject: [PATCH 3/3] code style --- src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala index 83173bc5..0df05296 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarWriteTask.scala @@ -19,6 +19,7 @@ import java.util.function.BiConsumer import scala.collection.mutable import org.apache.pulsar.client.api.{MessageId, Producer} + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} import org.apache.spark.sql.types._