Skip to content

Commit

Permalink
Fix the issue of getting initial offset with user provided start (#149)
Browse files Browse the repository at this point in the history
* fix initial offset

* add comment

* add comment

* remove unused import

* fix seek bug
  • Loading branch information
chaoqin-li1123 committed Aug 14, 2023
1 parent 0081e16 commit 1840fc1
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import scala.language.postfixOps
import scala.util.control.NonFatal

import org.apache.pulsar.client.api.{MessageId, PulsarClient}
import org.apache.pulsar.client.api.schema.GenericRecord
import org.apache.pulsar.client.impl.{ConsumerImpl, MessageIdImpl, PulsarClientImpl}
import org.apache.pulsar.client.impl.{MessageIdImpl, PulsarClientImpl}
import org.apache.pulsar.client.impl.schema.BytesSchema
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace
import org.apache.pulsar.common.naming.TopicName
Expand Down Expand Up @@ -442,22 +441,27 @@ private[pulsar] case class PulsarHelper(
topic: String,
time: Option[Long],
offset: Option[MessageId]): MessageIdImpl = {
val (subscriptionName, _) = extractSubscription(predefinedSubscription, topic)
val consumer =
CachedConsumer.getOrCreate(topic, subscriptionName, client.asInstanceOf[PulsarClient])

// seek to specified offset or time and get the actual corresponding message id
// we don't call consumer.acknowledge() here because the message is not processed yet
if (consumer.asInstanceOf[ConsumerImpl[GenericRecord]].hasMessageAvailable) {
if (time.isDefined || offset.isDefined) {
val reader = client
.newReader()
.subscriptionRolePrefix(driverGroupIdPrefix)
.topic(topic)
.startMessageId(offset.getOrElse(MessageId.earliest))
.startMessageIdInclusive()
.create()
(time, offset) match {
case (None, Some(o)) => consumer.seek(o)
case (Some(t), None) => consumer.seek(t)
case (None, Some(o)) =>
case (Some(t), None) => reader.seek(t)
case _ =>
throw new IllegalArgumentException(
s"one of time and offset should be set. time: $time, offset: $offset")
}

PulsarSourceUtils.mid2Impl(consumer.receive().getMessageId)
val message = reader.readNext()
reader.close()
if (message != null) UserProvidedMessageId(PulsarSourceUtils.mid2Impl(message.getMessageId))
else UserProvidedMessageId(MessageId.earliest)
} else {
UserProvidedMessageId(MessageId.earliest)
}
Expand Down

0 comments on commit 1840fc1

Please sign in to comment.