From bcb7a1bb17856aa1d61f94352bfeed5e13fb369d Mon Sep 17 00:00:00 2001 From: Joe Ferris Date: Mon, 28 Jan 2019 16:38:29 -0500 Subject: [PATCH] Add Scaladoc --- build.sbt | 9 +++ src/main/scala/fable/Config.scala | 88 ++++++++++++++++++++-- src/main/scala/fable/Consumer.scala | 62 ++++++++++++++- src/main/scala/fable/ConsumerRecords.scala | 7 ++ src/main/scala/fable/Deserializer.scala | 10 +++ src/main/scala/fable/Group.scala | 3 - src/main/scala/fable/GroupId.scala | 15 ++++ src/main/scala/fable/Kafka.scala | 55 +++++++++++++- src/main/scala/fable/Partition.scala | 7 ++ src/main/scala/fable/Topic.scala | 11 ++- src/main/scala/fable/package.scala | 5 ++ src/test/scala/fable/KafkaSpec.scala | 2 +- src/test/scala/fable/TestConfig.scala | 2 - 13 files changed, 258 insertions(+), 18 deletions(-) delete mode 100644 src/main/scala/fable/Group.scala create mode 100644 src/main/scala/fable/GroupId.scala diff --git a/build.sbt b/build.sbt index f6693b8..68aec34 100644 --- a/build.sbt +++ b/build.sbt @@ -8,6 +8,9 @@ val log4CatsVersion = "[0.2.0,0.3.0)" val pureConfigVersion = "[0.10.0,0.11.0)" val scalaTestVersion = "[3.0.5, 3.1.0)" +def findJar(classPath: Seq[Attributed[File]], name: String): File = + classPath.find(_.data.toString.containsSlice(name)).get.data + lazy val fable = (project in file(".")) .settings( inThisBuild(List(scalaVersion := "2.12.8")), @@ -21,6 +24,12 @@ lazy val fable = (project in file(".")) "-Ywarn-unused-import" ), autoAPIMappings := true, + apiMappings ++= Map( + findJar((fullClasspath in Compile).value, "cats-effect") -> url( + "https://typelevel.org/cats-effect/api/cats/effect/"), + findJar((fullClasspath in Compile).value, "kafka-clients") -> url( + "https://kafka.apache.org/21/javadoc/") + ), developers := List( Developer( id = "jferris", diff --git a/src/main/scala/fable/Config.scala b/src/main/scala/fable/Config.scala index 78e2478..7fafedd 100644 --- a/src/main/scala/fable/Config.scala +++ b/src/main/scala/fable/Config.scala @@ -4,13 +4,56 @@ import cats.implicits._ import java.net.URI import java.util.Properties import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig +import pureconfig.ConfigReader +import pureconfig.generic.semiauto.deriveReader import scala.concurrent.duration.FiniteDuration import scala.util.Try +/** + * Configuration objects for Fable. These objects can be constructed manually, + * but they're designed to be read from a + * [[https://github.com/lightbend/config/blob/master/HOCON.md HOCON]] + * configuration file. [[https://pureconfig.github.io/ PureConfig]] readers are + * provided to make this easy. + * + * @example {{{ + * + * // application.conf + * kafka { + * uris = "PLAINTEXT://localhost:9092" + * uris = \${?KAFKA_URL} + * + * test-consumer { + * auto-commit = false + * batch-size = 1024 + * client-id = "fable-test" + * group-id = "fable-test" + * max-poll-records = 1024 + * polling-timeout = 1 second + * session-timeout = 30 seconds + * } + * } + * + * // Main.scala + * val kafkaConfig: Kafka.Config = + * pureconfig.loadConfigOrThrow[Config.Kafka]("kafka") + * + * val consumerConfig: Kafka.Config = + * pureconfig.loadConfigOrThrow[Config.Consumer]("kafka.test-consumer") + * }}} + */ object Config { + /** + * General configuration options for [[Kafka]] instances. + * + * @constructor + * @param prefix optional prefix to apply to topic names and consumer group + * IDs. + * @param uris bootstrap URIs used to connect to a Kafka cluster. + */ case class Kafka(prefix: Option[String], uris: URIList) { - def properties: Properties = { + private[fable] def properties: Properties = { val result = new Properties() result.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, uris.bootstrapServers) @@ -22,13 +65,36 @@ object Config { f(s"${prefix.getOrElse("")}$name") } + object Kafka { + implicit val kafkaConfigReader: ConfigReader[Kafka] = deriveReader + } + + /** + * Configuration options for constructor a Kafka [[Consumer]]. + * + * @see + * [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html + * ConsumerConfig]] for details on consumer configuration. + * + * @constructor + * @param autoCommit whether to automatically commit the previous offset + * @param clientId identifier for tracking which client is making requests + * @param groupId identifier for the consumer group this consumer will join + * each time [[Consumer.poll]] is invoked. + * @param maxPollRecords the maximum number of records to return each time + * [[Consumer.poll]] is invoked. + * @param pollingTimeout how long to wait before giving up when + * [[Consumer.poll]] is invoked. + * @param sessionTimeout how long to wait before assuming a failure has + * occurred when using a consumer group + */ case class Consumer( autoCommit: Boolean, - groupId: Option[Group], + clientId: String, + groupId: Option[GroupId], maxPollRecords: Int, pollingTimeout: FiniteDuration, - sessionTimeout: FiniteDuration, - clientId: String + sessionTimeout: FiniteDuration ) { def properties[K: Deserializer, V: Deserializer]( kafka: Kafka): Properties = { @@ -53,6 +119,16 @@ object Config { } } + object Consumer { + implicit val consumerConfigReader: ConfigReader[Consumer] = deriveReader + } + + /** + * Bootstrap URIs used to connect to a Kafka cluster. + * + * The included PureConfig reader will parse a comma-separated list of URIs + * from a String and infer whether or not SSL is being used. + */ case class URIList(uris: List[URI]) { private final val KAFKA_SSL_SCHEME: String = "kafka+ssl" @@ -72,7 +148,7 @@ object Config { } object URIList { - implicit val uriListReader: pureconfig.ConfigReader[URIList] = + implicit val uriListReader: ConfigReader[URIList] = pureconfig.ConfigReader[String].emap { string => string .split(",") diff --git a/src/main/scala/fable/Consumer.scala b/src/main/scala/fable/Consumer.scala index 69f9037..2dd20e8 100644 --- a/src/main/scala/fable/Consumer.scala +++ b/src/main/scala/fable/Consumer.scala @@ -9,12 +9,39 @@ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.TopicPartition import scala.collection.JavaConverters._ -class Consumer[F[_]: ContextShift: Monad: Sync, K, V]( +/** + * Typesafe, functional API for using Kafka consumers. + * + * Wraps a native KafkaConsumer. + * + * Because KafkaConsumer isn't threadsafe, each consumer bulds its own + * [[scala.concurrent.ExecutionContext]] with a dedicated, single-thread pool. + * Methods invoked on this class will perform their IO on that thread. + * + * @see [[Kafka]] for allocating consumers + * @see [[Config.Consumer]] for configuration options for consumers + * @see [[Deserializer]] for details on deserializing keys and values + * @see [[org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer]] for + * details about Kafka's consumers + */ +class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] ( config: Config.Consumer, kafkaConsumer: KafkaConsumer[K, V]) { + + /** + * Continuously [[poll]] Kafka for new records. + */ def records: Stream[F, ConsumerRecords[K, V]] = Stream.eval(poll).repeat + /** + * Fetch the next batch of records from Kafka. + * + * Polling behavior, including timeouts, batch sizes, and auto commit can be + * configured via [[Config.Consumer]]. + * + * @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration- KafkaConsumer.poll]] + */ def poll: F[ConsumerRecords[K, V]] = for { records <- execute( @@ -25,20 +52,45 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V]( ConsumerRecords(records) } + /** + * Commit the offset for the subscribed partitions for this consumer group. + * + * @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-- KafkaConsumer.commitSync]] + */ def commit: F[Unit] = execute(kafkaConsumer.commitSync) *> - logger.info(s"committed offset") + logger.info(s"Committed offset") + /** + * Disconnect the network client. + * + * If a consumer is acquired by using [[Kafka#consumer]], the consumer is + * closed automatically once the resource is released. + * + * @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-- KafkaConsumer.close]] + */ def close: F[Unit] = execute(kafkaConsumer.close) *> logger.info("Disconnected") + /** + * Subscribe to one or more topics. This will use consumer groups + * feature. Partitions are automatically assigned to consumers within a + * group. + * + * @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe-java.util.Collection- KafkaConsumer.subscribe]] + */ def subscribe(topics: Topic*): F[Unit] = execute(kafkaConsumer.subscribe(topics.map(_.name).asJava)) *> topics.toList .traverse(topic => logger.info(s"Subscribed to ${topic.name}")) .void + /** + * Fetch information about partitions for a specific topic. + * + * @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor-java.lang.String- KafkaConsumer.partitionsFor]] + */ def partitionsFor(topic: Topic): F[Seq[Partition]] = for { infos <- execute(kafkaConsumer.partitionsFor(topic.name)) @@ -46,6 +98,12 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V]( infos.asScala.map(info => Partition(Topic(info.topic), info.partition)) } + /** + * Explicitly assign partitions to this consumer. This doesn't use consumer + * groups. + * + * @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign-java.util.Collection- KafkaConsumer.assign]] + */ def assign(partitions: Seq[Partition]): F[Unit] = execute( kafkaConsumer.assign( diff --git a/src/main/scala/fable/ConsumerRecords.scala b/src/main/scala/fable/ConsumerRecords.scala index 8f459f9..5a28ee2 100644 --- a/src/main/scala/fable/ConsumerRecords.scala +++ b/src/main/scala/fable/ConsumerRecords.scala @@ -2,12 +2,19 @@ package fable import scala.collection.JavaConverters._ +/** + * Wrapper for [[org.apache.kafka.clients.consumer.ConsumerRecords]] using the + * Scala collections API. + */ case class ConsumerRecords[K, V]( records: org.apache.kafka.clients.consumer.ConsumerRecords[K, V]) extends AnyVal { def isEmpty: Boolean = records.isEmpty + def count: Int = + records.count + def toSeq: Seq[ConsumerRecord[K, V]] = records.iterator.asScala.toSeq } diff --git a/src/main/scala/fable/Deserializer.scala b/src/main/scala/fable/Deserializer.scala index 1161157..044e0a6 100644 --- a/src/main/scala/fable/Deserializer.scala +++ b/src/main/scala/fable/Deserializer.scala @@ -2,6 +2,16 @@ package fable import org.apache.kafka.common.serialization +/** + * Wrapper for [[org.apache.kafka.common.serialization.Deserializer]] using + * implicits to locate Deserializers. + * + * The official client expects a class name to be written into the Consumer's + * properties. Fable finds a Deserializer for key and value types using + * implicits and writes the class name into Consumer properties automatically. + * + * @see [[Deserializer$]] for included instances. + */ trait Deserializer[A] { def instance: serialization.Deserializer[A] } diff --git a/src/main/scala/fable/Group.scala b/src/main/scala/fable/Group.scala deleted file mode 100644 index 98b4a6c..0000000 --- a/src/main/scala/fable/Group.scala +++ /dev/null @@ -1,3 +0,0 @@ -package fable - -case class Group(name: String) extends AnyVal diff --git a/src/main/scala/fable/GroupId.scala b/src/main/scala/fable/GroupId.scala new file mode 100644 index 0000000..f61ff1f --- /dev/null +++ b/src/main/scala/fable/GroupId.scala @@ -0,0 +1,15 @@ +package fable + +import pureconfig.ConfigReader +import pureconfig.generic.semiauto.deriveReader + +/** + * Value class for consumer group IDs. + * + * @see [[Kafka.groupId]] + */ +case class GroupId private (name: String) extends AnyVal + +object GroupId { + implicit val groupIdConfigReader: ConfigReader[GroupId] = deriveReader +} diff --git a/src/main/scala/fable/Kafka.scala b/src/main/scala/fable/Kafka.scala index 03dfa35..a13728d 100644 --- a/src/main/scala/fable/Kafka.scala +++ b/src/main/scala/fable/Kafka.scala @@ -5,13 +5,62 @@ import cats.Monad import fs2.Sink import org.apache.kafka.clients.consumer.KafkaConsumer +/** + * Entry point for constructing Kafka consumers. + * + * @example {{{ + * import cats.implicits._ + * import cats.effect._ + * import fable._ + * import pureconfig.generic.auto._ + * + * object Main extends IOApp { + * def run(args: List[String]): IO[ExitCode] = { + * val kafkaConfig: Kafka.Config = + * pureconfig.loadConfigOrThrow[Config.Kafka]("kafka") + * val consumerConfig: Kafka.Config = + * pureconfig.loadConfigOrThrow[Config.Consumer]("kafka.my-consumer") + * val kafka: Kafka = Kafka(kafkaConfig) + * kafka.consumer[String, String](consumerConfig).use { consumer => + * for { + * _ <- consumer.subscribe(kafka.topic("my-topic")) + * records <- consumer.poll + * _ <- IO.delay(println(s"Consumed \${records.count} records")) + * } yield ExitCode.Success + * } + * } + * } + * }}} + * + * @see [[Consumer]] for the consumer API + * @see [[Config.Kafka]] for settings for Kafka connections + * @see [[Config.Consumer]] for settings for consumers + */ class Kafka[F[_]: ContextShift: Monad: Sync](config: Config.Kafka) { + + /** + * Construct a [[Topic]] from the given name. Applies the prefix from + * configuration if one is present. + */ def topic(name: String): Topic = - config.prefix(name)(Topic) + config.prefix(name)(Topic.apply) - def group(name: String): Group = - config.prefix(name)(Group) + /** + * Construct a [[GroupId]] from the given name. Applies the prefix from + * configuration if one is present. + */ + def groupId(name: String): GroupId = + config.prefix(name)(GroupId.apply) + /** + * Allocate a [[Consumer]] as a [[cats.effect.Resource]]. The consumer will + * be closed when the resource is released. + * + * @tparam K the type to deserialize keys into + * @tparam V the type to deserialize values into + * @see [[Consumer]] for details on using consumers + * @see [[Deserializer]] for details on deserializing keys and values + */ def consumer[K: Deserializer, V: Deserializer]( consumerConfig: Config.Consumer): Resource[F, Consumer[F, K, V]] = Resource.make( diff --git a/src/main/scala/fable/Partition.scala b/src/main/scala/fable/Partition.scala index ade1dc6..9c06a87 100644 --- a/src/main/scala/fable/Partition.scala +++ b/src/main/scala/fable/Partition.scala @@ -1,3 +1,10 @@ package fable +/** + * A particular partition in a Kafka topic. Used for inspecting and assigning + * partitions. + * + * @see [[Consumer.assign]] + * @see [[Consumer.partitionsFor]] + */ case class Partition(topic: Topic, number: Int) diff --git a/src/main/scala/fable/Topic.scala b/src/main/scala/fable/Topic.scala index 045d75a..515d87d 100644 --- a/src/main/scala/fable/Topic.scala +++ b/src/main/scala/fable/Topic.scala @@ -1,3 +1,12 @@ package fable -case class Topic(name: String) extends AnyVal +/** + * Value class for topic names. + * + * @see [[Kafka.topic]] + */ +case class Topic private (name: String) extends AnyVal + +private[fable] object Topic { + private[fable] def apply(name: String): Topic = new Topic(name) +} diff --git a/src/main/scala/fable/package.scala b/src/main/scala/fable/package.scala index b66fd7a..8624178 100644 --- a/src/main/scala/fable/package.scala +++ b/src/main/scala/fable/package.scala @@ -1,3 +1,8 @@ +/** + * Functional API for Kafka using Cats, Cats Effect, and fs2. + * + * @see [[Kafka]] to get started. + */ package object fable { type ConsumerRecord[K, V] = org.apache.kafka.clients.consumer.ConsumerRecord[K, V] diff --git a/src/test/scala/fable/KafkaSpec.scala b/src/test/scala/fable/KafkaSpec.scala index df621a4..82ecb25 100644 --- a/src/test/scala/fable/KafkaSpec.scala +++ b/src/test/scala/fable/KafkaSpec.scala @@ -9,7 +9,7 @@ class KafkaSpec extends AsyncFunSuite { implicit val contextShift = IO.contextShift(implicitly[ExecutionContext]) val config = TestConfig.kafka.copy(prefix = Some("example.")) val kafka = new Kafka[IO](config) - val group = kafka.group("test") + val group = kafka.groupId("test") assert(group.name === "example.test") } diff --git a/src/test/scala/fable/TestConfig.scala b/src/test/scala/fable/TestConfig.scala index 086f699..1a5f286 100644 --- a/src/test/scala/fable/TestConfig.scala +++ b/src/test/scala/fable/TestConfig.scala @@ -1,7 +1,5 @@ package fable -import pureconfig.generic.auto._ - object TestConfig { def kafka: Config.Kafka = pureconfig.loadConfigOrThrow[Config.Kafka]("kafka")