Skip to content

Commit

Permalink
Add Scaladoc
Browse files Browse the repository at this point in the history
  • Loading branch information
jferris committed Jan 28, 2019
1 parent c64f13d commit bcb7a1b
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 18 deletions.
9 changes: 9 additions & 0 deletions build.sbt
Expand Up @@ -8,6 +8,9 @@ val log4CatsVersion = "[0.2.0,0.3.0)"
val pureConfigVersion = "[0.10.0,0.11.0)"
val scalaTestVersion = "[3.0.5, 3.1.0)"

def findJar(classPath: Seq[Attributed[File]], name: String): File =
classPath.find(_.data.toString.containsSlice(name)).get.data

lazy val fable = (project in file("."))
.settings(
inThisBuild(List(scalaVersion := "2.12.8")),
Expand All @@ -21,6 +24,12 @@ lazy val fable = (project in file("."))
"-Ywarn-unused-import"
),
autoAPIMappings := true,
apiMappings ++= Map(
findJar((fullClasspath in Compile).value, "cats-effect") -> url(
"https://typelevel.org/cats-effect/api/cats/effect/"),
findJar((fullClasspath in Compile).value, "kafka-clients") -> url(
"https://kafka.apache.org/21/javadoc/")
),
developers := List(
Developer(
id = "jferris",
Expand Down
88 changes: 82 additions & 6 deletions src/main/scala/fable/Config.scala
Expand Up @@ -4,13 +4,56 @@ import cats.implicits._
import java.net.URI
import java.util.Properties
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig
import pureconfig.ConfigReader
import pureconfig.generic.semiauto.deriveReader
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

/**
* Configuration objects for Fable. These objects can be constructed manually,
* but they're designed to be read from a
* [[https://github.com/lightbend/config/blob/master/HOCON.md HOCON]]
* configuration file. [[https://pureconfig.github.io/ PureConfig]] readers are
* provided to make this easy.
*
* @example {{{
*
* // application.conf
* kafka {
* uris = "PLAINTEXT://localhost:9092"
* uris = \${?KAFKA_URL}
*
* test-consumer {
* auto-commit = false
* batch-size = 1024
* client-id = "fable-test"
* group-id = "fable-test"
* max-poll-records = 1024
* polling-timeout = 1 second
* session-timeout = 30 seconds
* }
* }
*
* // Main.scala
* val kafkaConfig: Kafka.Config =
* pureconfig.loadConfigOrThrow[Config.Kafka]("kafka")
*
* val consumerConfig: Kafka.Config =
* pureconfig.loadConfigOrThrow[Config.Consumer]("kafka.test-consumer")
* }}}
*/
object Config {
/**
* General configuration options for [[Kafka]] instances.
*
* @constructor
* @param prefix optional prefix to apply to topic names and consumer group
* IDs.
* @param uris bootstrap URIs used to connect to a Kafka cluster.
*/
case class Kafka(prefix: Option[String], uris: URIList) {
def properties: Properties = {
private[fable] def properties: Properties = {
val result = new Properties()
result.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
uris.bootstrapServers)
Expand All @@ -22,13 +65,36 @@ object Config {
f(s"${prefix.getOrElse("")}$name")
}

object Kafka {
implicit val kafkaConfigReader: ConfigReader[Kafka] = deriveReader
}

/**
* Configuration options for constructor a Kafka [[Consumer]].
*
* @see
* [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html
* ConsumerConfig]] for details on consumer configuration.
*
* @constructor
* @param autoCommit whether to automatically commit the previous offset
* @param clientId identifier for tracking which client is making requests
* @param groupId identifier for the consumer group this consumer will join
* each time [[Consumer.poll]] is invoked.
* @param maxPollRecords the maximum number of records to return each time
* [[Consumer.poll]] is invoked.
* @param pollingTimeout how long to wait before giving up when
* [[Consumer.poll]] is invoked.
* @param sessionTimeout how long to wait before assuming a failure has
* occurred when using a consumer group
*/
case class Consumer(
autoCommit: Boolean,
groupId: Option[Group],
clientId: String,
groupId: Option[GroupId],
maxPollRecords: Int,
pollingTimeout: FiniteDuration,
sessionTimeout: FiniteDuration,
clientId: String
sessionTimeout: FiniteDuration
) {
def properties[K: Deserializer, V: Deserializer](
kafka: Kafka): Properties = {
Expand All @@ -53,6 +119,16 @@ object Config {
}
}

object Consumer {
implicit val consumerConfigReader: ConfigReader[Consumer] = deriveReader
}

/**
* Bootstrap URIs used to connect to a Kafka cluster.
*
* The included PureConfig reader will parse a comma-separated list of URIs
* from a String and infer whether or not SSL is being used.
*/
case class URIList(uris: List[URI]) {
private final val KAFKA_SSL_SCHEME: String = "kafka+ssl"

Expand All @@ -72,7 +148,7 @@ object Config {
}

object URIList {
implicit val uriListReader: pureconfig.ConfigReader[URIList] =
implicit val uriListReader: ConfigReader[URIList] =
pureconfig.ConfigReader[String].emap { string =>
string
.split(",")
Expand Down
62 changes: 60 additions & 2 deletions src/main/scala/fable/Consumer.scala
Expand Up @@ -9,12 +9,39 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._

class Consumer[F[_]: ContextShift: Monad: Sync, K, V](
/**
* Typesafe, functional API for using Kafka consumers.
*
* Wraps a native KafkaConsumer.
*
* Because KafkaConsumer isn't threadsafe, each consumer bulds its own
* [[scala.concurrent.ExecutionContext]] with a dedicated, single-thread pool.
* Methods invoked on this class will perform their IO on that thread.
*
* @see [[Kafka]] for allocating consumers
* @see [[Config.Consumer]] for configuration options for consumers
* @see [[Deserializer]] for details on deserializing keys and values
* @see [[org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer]] for
* details about Kafka's consumers
*/
class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] (
config: Config.Consumer,
kafkaConsumer: KafkaConsumer[K, V]) {

/**
* Continuously [[poll]] Kafka for new records.
*/
def records: Stream[F, ConsumerRecords[K, V]] =
Stream.eval(poll).repeat

/**
* Fetch the next batch of records from Kafka.
*
* Polling behavior, including timeouts, batch sizes, and auto commit can be
* configured via [[Config.Consumer]].
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration- KafkaConsumer.poll]]
*/
def poll: F[ConsumerRecords[K, V]] =
for {
records <- execute(
Expand All @@ -25,27 +52,58 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V](
ConsumerRecords(records)
}

/**
* Commit the offset for the subscribed partitions for this consumer group.
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-- KafkaConsumer.commitSync]]
*/
def commit: F[Unit] =
execute(kafkaConsumer.commitSync) *>
logger.info(s"committed offset")
logger.info(s"Committed offset")

/**
* Disconnect the network client.
*
* If a consumer is acquired by using [[Kafka#consumer]], the consumer is
* closed automatically once the resource is released.
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-- KafkaConsumer.close]]
*/
def close: F[Unit] =
execute(kafkaConsumer.close) *>
logger.info("Disconnected")

/**
* Subscribe to one or more topics. This will use consumer groups
* feature. Partitions are automatically assigned to consumers within a
* group.
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe-java.util.Collection- KafkaConsumer.subscribe]]
*/
def subscribe(topics: Topic*): F[Unit] =
execute(kafkaConsumer.subscribe(topics.map(_.name).asJava)) *>
topics.toList
.traverse(topic => logger.info(s"Subscribed to ${topic.name}"))
.void

/**
* Fetch information about partitions for a specific topic.
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor-java.lang.String- KafkaConsumer.partitionsFor]]
*/
def partitionsFor(topic: Topic): F[Seq[Partition]] =
for {
infos <- execute(kafkaConsumer.partitionsFor(topic.name))
} yield {
infos.asScala.map(info => Partition(Topic(info.topic), info.partition))
}

/**
* Explicitly assign partitions to this consumer. This doesn't use consumer
* groups.
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign-java.util.Collection- KafkaConsumer.assign]]
*/
def assign(partitions: Seq[Partition]): F[Unit] =
execute(
kafkaConsumer.assign(
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/fable/ConsumerRecords.scala
Expand Up @@ -2,12 +2,19 @@ package fable

import scala.collection.JavaConverters._

/**
* Wrapper for [[org.apache.kafka.clients.consumer.ConsumerRecords]] using the
* Scala collections API.
*/
case class ConsumerRecords[K, V](
records: org.apache.kafka.clients.consumer.ConsumerRecords[K, V])
extends AnyVal {
def isEmpty: Boolean =
records.isEmpty

def count: Int =
records.count

def toSeq: Seq[ConsumerRecord[K, V]] =
records.iterator.asScala.toSeq
}
10 changes: 10 additions & 0 deletions src/main/scala/fable/Deserializer.scala
Expand Up @@ -2,6 +2,16 @@ package fable

import org.apache.kafka.common.serialization

/**
* Wrapper for [[org.apache.kafka.common.serialization.Deserializer]] using
* implicits to locate Deserializers.
*
* The official client expects a class name to be written into the Consumer's
* properties. Fable finds a Deserializer for key and value types using
* implicits and writes the class name into Consumer properties automatically.
*
* @see [[Deserializer$]] for included instances.
*/
trait Deserializer[A] {
def instance: serialization.Deserializer[A]
}
Expand Down
3 changes: 0 additions & 3 deletions src/main/scala/fable/Group.scala

This file was deleted.

15 changes: 15 additions & 0 deletions src/main/scala/fable/GroupId.scala
@@ -0,0 +1,15 @@
package fable

import pureconfig.ConfigReader
import pureconfig.generic.semiauto.deriveReader

/**
* Value class for consumer group IDs.
*
* @see [[Kafka.groupId]]
*/
case class GroupId private (name: String) extends AnyVal

object GroupId {
implicit val groupIdConfigReader: ConfigReader[GroupId] = deriveReader
}
55 changes: 52 additions & 3 deletions src/main/scala/fable/Kafka.scala
Expand Up @@ -5,13 +5,62 @@ import cats.Monad
import fs2.Sink
import org.apache.kafka.clients.consumer.KafkaConsumer

/**
* Entry point for constructing Kafka consumers.
*
* @example {{{
* import cats.implicits._
* import cats.effect._
* import fable._
* import pureconfig.generic.auto._
*
* object Main extends IOApp {
* def run(args: List[String]): IO[ExitCode] = {
* val kafkaConfig: Kafka.Config =
* pureconfig.loadConfigOrThrow[Config.Kafka]("kafka")
* val consumerConfig: Kafka.Config =
* pureconfig.loadConfigOrThrow[Config.Consumer]("kafka.my-consumer")
* val kafka: Kafka = Kafka(kafkaConfig)
* kafka.consumer[String, String](consumerConfig).use { consumer =>
* for {
* _ <- consumer.subscribe(kafka.topic("my-topic"))
* records <- consumer.poll
* _ <- IO.delay(println(s"Consumed \${records.count} records"))
* } yield ExitCode.Success
* }
* }
* }
* }}}
*
* @see [[Consumer]] for the consumer API
* @see [[Config.Kafka]] for settings for Kafka connections
* @see [[Config.Consumer]] for settings for consumers
*/
class Kafka[F[_]: ContextShift: Monad: Sync](config: Config.Kafka) {

/**
* Construct a [[Topic]] from the given name. Applies the prefix from
* configuration if one is present.
*/
def topic(name: String): Topic =
config.prefix(name)(Topic)
config.prefix(name)(Topic.apply)

def group(name: String): Group =
config.prefix(name)(Group)
/**
* Construct a [[GroupId]] from the given name. Applies the prefix from
* configuration if one is present.
*/
def groupId(name: String): GroupId =
config.prefix(name)(GroupId.apply)

/**
* Allocate a [[Consumer]] as a [[cats.effect.Resource]]. The consumer will
* be closed when the resource is released.
*
* @tparam K the type to deserialize keys into
* @tparam V the type to deserialize values into
* @see [[Consumer]] for details on using consumers
* @see [[Deserializer]] for details on deserializing keys and values
*/
def consumer[K: Deserializer, V: Deserializer](
consumerConfig: Config.Consumer): Resource[F, Consumer[F, K, V]] =
Resource.make(
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/fable/Partition.scala
@@ -1,3 +1,10 @@
package fable

/**
* A particular partition in a Kafka topic. Used for inspecting and assigning
* partitions.
*
* @see [[Consumer.assign]]
* @see [[Consumer.partitionsFor]]
*/
case class Partition(topic: Topic, number: Int)
11 changes: 10 additions & 1 deletion src/main/scala/fable/Topic.scala
@@ -1,3 +1,12 @@
package fable

case class Topic(name: String) extends AnyVal
/**
* Value class for topic names.
*
* @see [[Kafka.topic]]
*/
case class Topic private (name: String) extends AnyVal

private[fable] object Topic {
private[fable] def apply(name: String): Topic = new Topic(name)
}

0 comments on commit bcb7a1b

Please sign in to comment.