Skip to content

Commit

Permalink
fix seek bug
Browse files Browse the repository at this point in the history
  • Loading branch information
chaoqin-li1123 committed Aug 8, 2023
1 parent 084c561 commit 63bcd10
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -441,25 +441,25 @@ private[pulsar] case class PulsarHelper(
topic: String,
time: Option[Long],
offset: Option[MessageId]): MessageIdImpl = {
val (subscriptionName, _) = extractSubscription(predefinedSubscription, topic)
val consumer =
CachedConsumer.getOrCreate(topic, subscriptionName, client.asInstanceOf[PulsarClient])

// seek to specified offset or time and get the actual corresponding message id
// we don't call consumer.acknowledge() here because the message is not processed yet
if (time.isDefined || offset.isDefined) {
// Call getLastMessageId to reconnect the consumer before seeking.
if (!consumer.isConnected) consumer.getLastMessageId
val reader = client
.newReader()
.subscriptionRolePrefix(driverGroupIdPrefix)
.topic(topic)
.startMessageId(offset.getOrElse(MessageId.earliest))
.startMessageIdInclusive()
.create()
(time, offset) match {
case (None, Some(o)) => consumer.seek(o)
case (Some(t), None) => consumer.seek(t)
case (None, Some(o)) =>
case (Some(t), None) => reader.seek(t)
case _ =>
throw new IllegalArgumentException(
s"one of time and offset should be set. time: $time, offset: $offset")
}
val message = consumer.receive()
// This must be wrapped in UserProvidedMessageId to prevent first message from being skipped
// because start offset is exclusive by default.
val message = reader.readNext()
reader.close()
if (message != null) UserProvidedMessageId(PulsarSourceUtils.mid2Impl(message.getMessageId))
else UserProvidedMessageId(MessageId.earliest)
} else {
Expand Down

0 comments on commit 63bcd10

Please sign in to comment.