Skip to content

Commit

Permalink
fix seek bug
Browse files Browse the repository at this point in the history
  • Loading branch information
chaoqin-li1123 committed Aug 8, 2023
1 parent 154afba commit 3348d5d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
19 changes: 11 additions & 8 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -442,22 +442,25 @@ private[pulsar] case class PulsarHelper(
topic: String,
time: Option[Long],
offset: Option[MessageId]): MessageIdImpl = {
val (subscriptionName, _) = extractSubscription(predefinedSubscription, topic)
val consumer =
CachedConsumer.getOrCreate(topic, subscriptionName, client.asInstanceOf[PulsarClient])

// seek to specified offset or time and get the actual corresponding message id
// we don't call consumer.acknowledge() here because the message is not processed yet
if (time.isDefined || offset.isDefined) {
if (!consumer.isConnected) consumer.getLastMessageId
val reader = client
.newReader()
.subscriptionRolePrefix(driverGroupIdPrefix)
.topic(topic)
.startMessageId(offset.getOrElse(MessageId.earliest))
.startMessageIdInclusive()
.create()
(time, offset) match {
case (None, Some(o)) => consumer.seek(o)
case (Some(t), None) => consumer.seek(t)
case (None, Some(o)) =>
case (Some(t), None) => reader.seek(t)
case _ =>
throw new IllegalArgumentException(
s"one of time and offset should be set. time: $time, offset: $offset")
}
val message = consumer.receive()
val message = reader.readNext()
reader.close()
if (message != null) UserProvidedMessageId(PulsarSourceUtils.mid2Impl(message.getMessageId))
else UserProvidedMessageId(MessageId.earliest)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ abstract class PulsarSourceSuiteBase extends PulsarSourceTest {
}

test(s"assign from time (failOnDataLoss: $failOnDataLoss)") {
sparkContext.setLogLevel("INFO")
val topic = newTopic()
testFromTime(
topic,
Expand Down Expand Up @@ -450,8 +451,8 @@ abstract class PulsarSourceSuiteBase extends PulsarSourceTest {
failOnDataLoss: Boolean,
options: (String, String)*): Unit = {

val time0 = System.currentTimeMillis()
Thread.sleep(5000)
val time0 = System.currentTimeMillis() - 10000

sendMessages(topic, (1 to 3).map { _.toString }.toArray)
require(getLatestOffsets(Set(topic)).size === 1)

Expand Down

0 comments on commit 3348d5d

Please sign in to comment.