Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual commits support #64

Merged
merged 12 commits into from
Feb 19, 2019
5 changes: 4 additions & 1 deletion AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ Kacper Gunia
https://github.com/cakper

Sherwin Chiu
https://github.com/sherwinschiu
https://github.com/sherwinschiu

Mikhail Chugunkov
https://github.com/poslegm
29 changes: 27 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,15 @@ Then start Kafka:
bin/kafka-server-start.sh config/server.properties
```

Create the topic we need for our tests:
Create the topics we need for our tests:

```bash
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic monix-kafka-tests
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic monix-kafka-manual-commit-tests
```

And run the tests:
Expand Down Expand Up @@ -215,18 +218,40 @@ observable

### Consumer

For consuming from Apache Kafka (Version 0.11.x and above):
There are several ways for consuming from Apache Kafka (Version 0.11.x and above):

Consumer which commits offsets itself:
```scala
import monix.kafka._

val consumerCfg = KafkaConsumerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092"),
groupId = "kafka-tests"
// you can use this settings for At Most Once semantics:
// observableCommitOrder = ObservableCommitOrder.BeforeAck
)

val observable =
KafkaConsumerObservable[String,String](consumerCfg, List("my-topic"))
.take(10000)
.map(_.value())
```

Consumer which allows you to commit offsets manually:
```scala
import monix.kafka._

val consumerCfg = KafkaConsumerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092"),
groupId = "kafka-tests"
)

val observable =
KafkaConsumerObservable.manualCommit[String,String](consumerCfg, List("my-topic"))
.map(message => message.record.value() -> message.committableOffset)
.mapTask { case (value, offset) => performBusinessLogic(value).map(_ => offset) }
.bufferTimedAndCounted(1.second, 1000)
.mapTask(offsets => CommittableOffsetBatch(offsets).commitSync())
```

Enjoy!
Expand Down
41 changes: 41 additions & 0 deletions kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2014-2019 by The Monix Project Developers.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.kafka

import monix.eval.Task
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.OffsetCommitCallback

/**
* Callback for batched commit realized as closure
* in [[KafkaConsumerObservable]] context. This decision was made for
* thread-safety reasons.
* */
trait Commit {
def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit]
def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit]
final def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = commitBatchAsync(batch, null)
}

private[kafka] object Commit {

val empty: Commit = new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit
override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
}
}
24 changes: 24 additions & 0 deletions kafka-0.10.x/src/main/scala/monix/kafka/CommittableMessage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2014-2019 by The Monix Project Developers.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord

/**
* Represents data consumed from Kafka and [[CommittableOffset]] built from it
* */
final case class CommittableMessage[K, V](record: ConsumerRecord[K, V], committableOffset: CommittableOffset)
65 changes: 65 additions & 0 deletions kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffset.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2014-2019 by The Monix Project Developers.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.kafka

import monix.eval.Task
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition

/** Represents offset for specified topic and partition that can be
* committed synchronously by [[commitSync]] method call or asynchronously by one of commitAsync methods.
* To achieve good performance it is recommended to use batched commit with
* [[CommittableOffsetBatch]] class.
*
* @param topicPartition is the topic and partition identifier
*
* @param offset is the offset to be committed
*
* @param commitCallback is the set of callbacks for batched commit realized as closures
* in [[KafkaConsumerObservable]] context. This decision was made for
* thread-safety reasons.
*/
final class CommittableOffset private[kafka] (
val topicPartition: TopicPartition,
val offset: Long,
private[kafka] val commitCallback: Commit) {

/**
* Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
* */
def commitSync(): Task[Unit] = commitCallback.commitBatchSync(Map(topicPartition -> offset))

/**
* Asynchronously commits [[offset]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
* */
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset))

/**
* Asynchronously commits [[offset]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
* */
def commitAsync(callback: OffsetCommitCallback): Task[Unit] =
commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback)
}

object CommittableOffset {

private[kafka] def apply(topicPartition: TopicPartition, offset: Long, commitCallback: Commit): CommittableOffset =
new CommittableOffset(topicPartition, offset, commitCallback)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2014-2019 by The Monix Project Developers.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.kafka

import monix.eval.Task
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition

/** Batch of Kafka offsets which can be committed together.
* Can be built from offsets sequence by [[CommittableOffsetBatch#apply]] method.
* Besides you can use [[CommittableOffsetBatch#empty]] method to create empty batch and
* add offsets to it by [[updated]] method.
*
* Anyway offsets order make sense! Only last added offset for topic and partition will
* be committed to Kafka.
*
* @param offsets is the offsets batch for a few topics and partitions.
* Make sure that each of them was received from one [[KafkaConsumerObservable]].
*
* @param commitCallback is the set of callbacks for batched commit realized as closure
* in [[KafkaConsumerObservable]] context. This decision was made for
* thread-safety reasons. This parameter is obtained from last [[CommittableOffset]]
* added to batch.
*/
final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartition, Long], commitCallback: Commit) {

/**
* Synchronously commits [[offsets]] to Kafka
* */
def commitSync(): Task[Unit] = commitCallback.commitBatchSync(offsets)

/**
* Asynchronously commits [[offsets]] to Kafka
* */
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets)

/**
* Asynchronously commits [[offsets]] to Kafka
* */
def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(offsets)

/**
* Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified
* for same topic and partition.
* */
def updated(committableOffset: CommittableOffset): CommittableOffsetBatch =
new CommittableOffsetBatch(
offsets.updated(committableOffset.topicPartition, committableOffset.offset),
committableOffset.commitCallback
)
}

object CommittableOffsetBatch {

/**
* Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold:
* {{{
* offsets.foldLeft(CommittableOffsetBatch.empty)(_ updated _)
* }}}
* */
val empty: CommittableOffsetBatch = new CommittableOffsetBatch(Map.empty, Commit.empty)

/**
* Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with
* sequence order. If there is more than once offset for a topic and partition in the
* sequence then the last one will remain.
* */
def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch =
if (offsets.nonEmpty) {
val aggregatedOffsets = offsets.foldLeft(Map.empty[TopicPartition, Long]) { (acc, o) =>
acc.updated(o.topicPartition, o.offset)
}
new CommittableOffsetBatch(aggregatedOffsets, offsets.head.commitCallback)
} else {
empty
}
}
Loading