Skip to content

Commit

Permalink
Accept SSL certificates for connection
Browse files Browse the repository at this point in the history
Allow authenticating and encrypting the connection via SSL.
  • Loading branch information
jferris committed Jan 29, 2019
1 parent e039dec commit 170eb3c
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 37 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Expand Up @@ -2,6 +2,7 @@ import ReleaseTransformations._

val catsEffectVersion = "[1.0.0,1.1.0)"
val catsVersion = "[1.4.0,1.5.0)"
val envKeyStoreVersion = "[1.1.0,1.2.0)"
val fs2Version = "[1.0.0,1.1.0)"
val kafkaVersion = "[2.1.0,2.2.0)"
val log4CatsVersion = "[0.2.0,0.3.0)"
Expand Down Expand Up @@ -74,6 +75,7 @@ lazy val fable = (project in file("."))
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % fs2Version,
"com.github.pureconfig" %% "pureconfig" % pureConfigVersion,
"com.heroku.sdk" % "env-keystore" % envKeyStoreVersion,
"io.chrisdavenport" %% "log4cats-slf4j" % log4CatsVersion,
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
Expand Down
100 changes: 84 additions & 16 deletions src/main/scala/fable/Config.scala
@@ -1,10 +1,13 @@
package fable

import cats.effect.Sync
import cats.implicits._
import com.heroku.sdk.BasicKeyStore
import java.net.URI
import java.util.Properties
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.SslConfigs
import pureconfig.ConfigReader
import pureconfig.generic.semiauto.deriveReader
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -54,7 +57,9 @@ object Config {
*
* @constructor
* @param autoCommit whether to automatically commit the previous offset
* @param clientCertificate SSL certificate used to authenticate the client
* @param clientId identifier for tracking which client is making requests
* @param clientKey SSL private key used to authenticate the client
* @param groupId identifier for the consumer group this consumer will join
* each time [[Consumer.poll]] is invoked.
* @param maxPollRecords the maximum number of records to return each time
Expand All @@ -63,40 +68,103 @@ object Config {
* [[Consumer.poll]] is invoked.
* @param sessionTimeout how long to wait before assuming a failure has
* occurred when using a consumer group
* @param sslIdentificationAlgorithm the algorithm used to identify the
* server hostname
* @param trustedCertificate SSL certificate used to authenticate the server
* @param uris bootstrap URIs used to connect to a Kafka cluster.
*/
case class Consumer(
autoCommit: Boolean,
clientCertificate: Option[String],
clientId: String,
clientKey: Option[String],
groupId: Option[GroupId],
maxPollRecords: Int,
pollingTimeout: FiniteDuration,
sessionTimeout: FiniteDuration,
sslIdentificationAlgorithm: Option[String],
trustedCertificate: Option[String],
uris: URIList
) {
def properties[K: Deserializer, V: Deserializer]: Properties = {
def properties[F[_]: Sync, K: Deserializer, V: Deserializer]
: F[Properties] = {
val result = new Properties()
result.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
addCommonProperties(result)
addConsumerProperties(result)
addDeserializationProperties[K, V](result)

addTrustStoreProperties[F](result) *>
addClientStoreProperties[F](result) *>
Sync[F].pure(result)
}

private def addCommonProperties(target: Properties): Unit = {
target.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
uris.bootstrapServers)
result.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, uris.scheme)
result.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
result.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
result.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString)
result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer[K].instance.getClass.getName)
result.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
target.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, uris.scheme)
target.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
sslIdentificationAlgorithm.getOrElse(""))
}

private def addConsumerProperties[F[_]: Sync](target: Properties): Unit = {
target.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
target.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
target.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString)
target.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
new Integer(maxPollRecords))
result.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
target.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
sessionTimeout.toMillis.toString)
result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer[V].instance.getClass.getName)
groupId
.map(_.name)
.foreach(
result.put(ConsumerConfig.GROUP_ID_CONFIG, _)
target.put(ConsumerConfig.GROUP_ID_CONFIG, _)
)
result
}

private def addDeserializationProperties[K: Deserializer, V: Deserializer](
target: Properties) = {
target.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer[K].instance.getClass.getName)
target.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer[V].instance.getClass.getName)
}

private def addTrustStoreProperties[F[_]: Sync](
target: Properties): F[Unit] =
trustedCertificate.traverse { certificate =>
for {
password <- Sync[F].delay(
new java.math.BigInteger(130, new java.security.SecureRandom)
.toString(32))
memoryStore = new BasicKeyStore(certificate, password)
fileStore <- Sync[F].delay { memoryStore.storeTemp }
} yield {
target.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, memoryStore.`type`)
target.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
fileStore.getAbsolutePath)
target.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
memoryStore.password)
}
}.void

private def addClientStoreProperties[F[_]: Sync](
target: Properties): F[Unit] =
(clientCertificate, clientKey).tupled.traverse {
case (certificate, key) =>
for {
password <- Sync[F].delay(
new java.math.BigInteger(130, new java.security.SecureRandom)
.toString(32))
memoryStore = new BasicKeyStore(key, certificate, password)
fileStore <- Sync[F].delay { memoryStore.storeTemp }
} yield {
target.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, memoryStore.`type`)
target.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
fileStore.getAbsolutePath)
target.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
memoryStore.password)
}
}.void
}

object Consumer {
Expand All @@ -113,12 +181,12 @@ object Config {
private final val KAFKA_SSL_SCHEME: String = "kafka+ssl"

def scheme: String =
if (needKeys)
if (usesSSL)
"SSL"
else
"PLAINTEXT"

def needKeys: Boolean =
private def usesSSL: Boolean =
uris.headOption.map(_.getScheme).getOrElse("") == KAFKA_SSL_SCHEME

def bootstrapServers: String =
Expand Down
21 changes: 16 additions & 5 deletions src/main/scala/fable/Consumer.scala
Expand Up @@ -84,7 +84,7 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] (
/**
* Disconnect the network client.
*
* If a consumer is acquired by using [[Kafka#consumer]], the consumer is
* If a consumer is acquired by using [[Consumer$.resource]], the consumer is
* closed automatically once the resource is released.
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-- KafkaConsumer.close]]
Expand Down Expand Up @@ -152,13 +152,24 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] (
}

object Consumer {

/**
* Construct a consumer using the given key type, value type, and
* configuration as a [[cats.effect.Resource]] which will be closed when the
* resource is released.
*
* @tparam K keys will be deserialized as this type
* @tparam V values will be deserialized as this type
* @see [[Deserializer]] for information on deserializing keys and values
*/
def resource[F[_]: ContextShift: Monad: Sync,
K: Deserializer,
V: Deserializer](
config: Config.Consumer): Resource[F, Consumer[F, K, V]] =
Resource.make(
Monad[F].pure(
new Consumer[F, K, V](
config,
new KafkaConsumer[K, V](config.properties[K, V]))))(_.close)
config
.properties[F, K, V]
.map(properties =>
new Consumer[F, K, V](config, new KafkaConsumer[K, V](properties))))(
_.close)
}
2 changes: 0 additions & 2 deletions src/main/scala/fable/GroupId.scala
Expand Up @@ -5,8 +5,6 @@ import pureconfig.generic.semiauto.deriveReader

/**
* Value class for consumer group IDs.
*
* @see [[Kafka.groupId]]
*/
case class GroupId private (name: String) extends AnyVal

Expand Down
2 changes: 0 additions & 2 deletions src/main/scala/fable/Topic.scala
Expand Up @@ -2,8 +2,6 @@ package fable

/**
* Value class for topic names.
*
* @see [[Kafka.topic]]
*/
case class Topic private (name: String) extends AnyVal

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/fable/package.scala
@@ -1,7 +1,8 @@
/**
* Functional API for Kafka using Cats, Cats Effect, and fs2.
*
* @see [[Kafka]] to get started.
* @see [[Consumer]] for more information on building Kafka consumers with
* Fable.
*/
package object fable {
type ConsumerRecord[K, V] =
Expand Down
31 changes: 20 additions & 11 deletions src/test/scala/fable/ConsumerSpec.scala
Expand Up @@ -2,6 +2,8 @@ package fable

import cats.Eval
import cats.effect.IO
import java.util.Properties
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
import org.apache.kafka.clients.producer.{
KafkaProducer,
Expand All @@ -17,7 +19,7 @@ class ConsumerSpec extends AsyncFunSuite {

test("poll") {
val topic = Topic("fable-test-example")
val consumer = Consumer.resource[IO, String, String](consumerConfig)
val consumer = Consumer.resource[IO, String, String](config)

(for {
_ <- createTopic(topic.name)
Expand All @@ -30,16 +32,15 @@ class ConsumerSpec extends AsyncFunSuite {
}
} yield {
assert(
records.map(record => (record.key, record.value)) === Seq(
"one" -> "1",
"two" -> "2"))
records.map(record => (record.key, record.value)) === Seq("one" -> "1",
"two" -> "2"))
}).unsafeToFuture
}

test("commit") {
val topic = Topic("fable-test-example")
val consumer = Eval.always {
Consumer.resource[IO, String, String](consumerConfig)
Consumer.resource[IO, String, String](config)
}

(for {
Expand Down Expand Up @@ -70,7 +71,7 @@ class ConsumerSpec extends AsyncFunSuite {
test("records") {
val topic = Topic("fable-test-example")
val consumer =
Consumer.resource[IO, String, String](consumerConfig.copy(maxPollRecords = 2))
Consumer.resource[IO, String, String](config.copy(maxPollRecords = 2))

(for {
_ <- createTopic(topic.name)
Expand All @@ -93,7 +94,7 @@ class ConsumerSpec extends AsyncFunSuite {

test("partitionsFor") {
val topic = Topic("fable-test-example")
val consumer = Consumer.resource[IO, String, String](consumerConfig)
val consumer = Consumer.resource[IO, String, String](config)

(for {
_ <- createTopic(topic.name)
Expand All @@ -106,7 +107,7 @@ class ConsumerSpec extends AsyncFunSuite {
test("assign") {
val first = Topic("fable-test-one")
val second = Topic("fable-test-two")
val consumer = Consumer.resource[IO, String, String](consumerConfig)
val consumer = Consumer.resource[IO, String, String](config)

(for {
_ <- createTopic(first.name)
Expand All @@ -127,7 +128,11 @@ class ConsumerSpec extends AsyncFunSuite {

private def createTopic(topic: String): IO[Unit] =
IO.delay {
val properties = consumerConfig.properties
val properties = new Properties
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
config.uris.bootstrapServers)
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
config.uris.scheme)
val adminClient = AdminClient.create(properties)
val newTopic = new NewTopic(topic, 1, 1)

Expand All @@ -139,7 +144,11 @@ class ConsumerSpec extends AsyncFunSuite {

private def sendRecords(topic: String, records: (String, String)*): IO[Unit] =
IO.delay {
val properties = consumerConfig.properties
val properties = new Properties
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
config.uris.bootstrapServers)
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
config.uris.scheme)
properties.put(ProducerConfig.ACKS_CONFIG, "all")
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "fable-test")
properties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
Expand All @@ -164,5 +173,5 @@ class ConsumerSpec extends AsyncFunSuite {
producer.close
}

val consumerConfig = TestConfig.consumer
val config = TestConfig.consumer
}

0 comments on commit 170eb3c

Please sign in to comment.