Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Improvements #3189

Closed
wants to merge 8 commits into from
Expand Up @@ -21,7 +21,8 @@ collector {
# The collector runs as a web service specified on the following
# interface and port.
interface = "0.0.0.0"
port = {{collectorPort}}
port = 8080
port = ${?COLLECTOR_PORT}

# Production mode disables additional services helpful for configuring and
# initializing the collector, such as a path '/dump' to view all
Expand All @@ -38,18 +39,19 @@ collector {
# with the following domain and expiration.
cookie {
enabled = true
expiration = {{collectorCookieExpiration}} # e.g. "365 days"
expiration = ${COLLECTOR_COOKIE_EXPIRATION}# e.g. "365 days"
# Network cookie name
name = {{collectorCookieName}}
name = ${COLLECTOR_COOKIE_NAME}
# The domain is optional and will make the cookie accessible to other
# applications on the domain. Comment out this line to tie cookies to
# the collector's full domain
domain = "{{collectorCookieDomain}}"
domain = ${COLLECTOR_COOKIE_DOMAIN}
}

# The collector has a configurable sink for storing data in
# different formats for the enrichment process.
sink {

# Sinks currently supported are:
# 'kinesis' for writing Thrift-serialized records to a Kinesis stream
# 'kafka' for writing Thrift-serialized records to kafka
Expand All @@ -63,7 +65,7 @@ collector {
enabled = "kinesis"

kinesis {
thread-pool-size: 10 # Thread pool size for Kinesis API requests
thread-pool-size = 10 # Thread pool size for Kinesis API requests

# The following are used to authenticate for the Amazon Kinesis sink.
#
Expand All @@ -74,31 +76,40 @@ collector {
#
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
access-key: "iam"
secret-key: "iam"
access-key = "iam"
secret-key = "iam"
}

# Data will be stored in the following stream.
stream {
region: "{{collectorSinkKinesisStreamRegion}}"
good: "{{collectorKinesisStreamGoodName}}"
bad: "{{collectorKinesisStreamBadName}}"
region = ${COLLECTOR_SINK_KINESIS_STREAM_REGION}
good = ${COLLECTOR_SINK_KINESIS_STREAM_GOOD}
bad = ${COLLECTOR_SINK_KINESIS_STREAM_BAD}
}

# Minimum and maximum backoff periods
backoffPolicy: {
minBackoff: {{collectorSinkKinesisMinBackoffMillis}}
maxBackoff: {{collectorSinkKinesisMaxBackoffMillis}}
backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
minBackoff = ${?COLLECTOR_SINK_KINESIS_BACKOFFPOLICY_MIN}
maxBackoff = ${?COLLECTOR_SINK_KINESIS_BACKOFFPOLICY_MIN}
}
}

kafka {
brokers: "{{collectorKafkaBrokers}}"

# Data will be stored in the following topics
topic {
good: "{{collectorKafkaTopicGoodName}}"
bad: "{{collectorKafkaTopicBadName}}"
good = "snowplow-collector-good"
bad = "snowplow-collector-bad"
good = ${?COLLECTOR_SINK_KAFKA_TOPIC_GOOD}
bad = ${?COLLECTOR_SINK_KAFKA_TOPIC_BAD}
}

producer {
bootstrap.servers = "localhost:9092"
# store any additional Kafka native configuration here
# example:
# security.protocol = SSL
# timeout.ms = 60000
}
}

Expand All @@ -108,9 +119,9 @@ collector {
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byte-limit: {{collectorSinkBufferByteThreshold}}
record-limit: {{collectorSinkBufferRecordThreshold}} # Not supported by Kafka; will be ignored
time-limit: {{collectorSinkBufferTimeThreshold}}
byte-limit = 4000000
record-limit = 500
time-limit = 60000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you leave the config as is?

}
}
}
Expand Down
Expand Up @@ -33,7 +33,7 @@ object Dependencies {
val awsSdk = "1.6.10"
val yodaTime = "2.1"
val yodaConvert = "1.2"
val kafka = "0.10.1.0"
val kafka = "0.10.2.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be handled in a separate ticket

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BenFradet no not really. If you don't support this upgrade, you can't support security by passing the jaas conf as a property from producerProps (sasl.jaas.config added in 10.2)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that there is a ticket dedicated to the upgrade and it's separate from the tickets you're trying to solve (e.g. #3325).

Adding the security feature is a separate concern from updating the library.

// Scala
val snowplowCommonEnrich = "0.22.0"
val igluClient = "0.3.2"
Expand Down
Expand Up @@ -17,9 +17,12 @@ package collectors
package scalastream

// Akka and Spray
import java.util.Properties

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.io.IO
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import spray.can.Http

// Scala Futures
Expand Down Expand Up @@ -153,6 +156,7 @@ case class CookieConfig(name: String, expiration: Long, domain: Option[String])
// the collector process starts rather than later.
class CollectorConfig(config: Config) {
import Helper.RichConfig
import CollectorConfig._

private val collector = config.getConfig("collector")
val interface = collector.getString("interface")
Expand Down Expand Up @@ -200,11 +204,42 @@ class CollectorConfig(config: Config) {
val minBackoff = backoffPolicy.getLong("minBackoff")
val maxBackoff = backoffPolicy.getLong("maxBackoff")

private val kafka = sink.getConfig("kafka")
val kafkaBrokers = kafka.getString("brokers")
private val kafkaTopic = kafka.getConfig("topic")
val kafkaTopicGoodName = kafkaTopic.getString("good")
val kafkaTopicBadName = kafkaTopic.getString("bad")
object Kafka {
private val kafkaConfig = sink.getConfig("kafka")

object Topic {
private val topicConfig = kafkaConfig.getConfig("topic")
val topicGoodName: String = topicConfig.getString("good")
val topicBadName: String = topicConfig.getString("bad")
}

// get producer related configurations
object Producer {
private val producerConfig = kafkaConfig.getConfig("producer")
assert(producerConfig.hasPath("bootstrap.servers"), "bootstrap.servers is required")

private val defaultProperties: Properties = {
val props = new Properties()
props.put("acks", "all")
props.put("buffer.memory", byteLimit.toString)
props.put("batch.size", recordLimit.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't translate, same as in the other PR

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah true, that's bytes. byteLimit.toString then, right? And buffer.memory removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any current buffer setting applies to batch.size, and it doesn't make sense to have a batch.size equals to buffer.memory.

props.put("linger.ms", timeLimit.toString)
props.put("key.serializer", classOf[StringSerializer])
props.put("value.serializer", classOf[ByteArraySerializer])
props.put("retries", "3")
props
}

val getProps: Properties = {
val producerProps = propsFromConfig(producerConfig, blacklist = Set("key.serialized","value.serializer"))
val merged = new Properties()
merged.putAll(defaultProperties)
merged.putAll(producerProps)
merged
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be overhauled in a separate ticket

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's important that it's this there if you intend to allow dynamic settings and therefore support security



private val buffer = sink.getConfig("buffer")
val byteLimit = buffer.getInt("byte-limit")
Expand All @@ -217,3 +252,19 @@ class CollectorConfig(config: Config) {
def cookieDomain = cookieConfig.flatMap(_.domain)
def cookieExpiration = cookieConfig.map(_.expiration)
}

object CollectorConfig {
def propsFromConfig(config: Config, blacklist: Set[String]): Properties = {
import scala.collection.JavaConversions._

val props = new Properties()

val map: Map[String, String] = config.entrySet().map({ entry =>
entry.getKey.trim -> entry.getValue.unwrapped().toString.trim
})(collection.breakOut)

// apply blacklist
props.putAll(map.filterNot(elem => blacklist.contains(elem._1)))
props
}
}
Expand Up @@ -17,6 +17,8 @@ package sinks
// Java
import java.util.Properties

import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

// Kafka
import org.apache.kafka.clients.producer._;

Expand All @@ -37,8 +39,8 @@ class KafkaSink(config: CollectorConfig, inputType: InputType.InputType) extends
val MaxBytes = 1000000L

private val topicName = inputType match {
case InputType.Good => config.kafkaTopicGoodName
case InputType.Bad => config.kafkaTopicBadName
case InputType.Good => config.Kafka.Topic.topicGoodName
case InputType.Bad => config.Kafka.Topic.topicBadName
}

private var kafkaProducer = createProducer
Expand All @@ -51,20 +53,9 @@ class KafkaSink(config: CollectorConfig, inputType: InputType.InputType) extends
*/
private def createProducer: KafkaProducer[String, Array[Byte]] = {

info(s"Create Kafka Producer to brokers: ${config.kafkaBrokers}")

val props = new Properties()
props.put("bootstrap.servers", config.kafkaBrokers)
props.put("acks", "all")
props.put("retries", "0")
props.put("batch.size", config.byteLimit.toString)
props.put("linger.ms", config.timeLimit.toString)
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep your changes to a minimum and modify the configuration where it originally was, ie here


new KafkaProducer[String, Array[Byte]](props)
val kafkaProps = config.Kafka.Producer.getProps
info(s"Create Kafka Producer to brokers: ${kafkaProps.getProperty("bootstrap.servers")}")
new KafkaProducer[String, Array[Byte]](kafkaProps)
}

/**
Expand All @@ -77,14 +68,13 @@ class KafkaSink(config: CollectorConfig, inputType: InputType.InputType) extends
debug(s"Writing ${events.size} Thrift records to Kafka topic ${topicName} at key ${key}")
events foreach {
event => {
try {
kafkaProducer.send(new ProducerRecord(topicName, key, event))
} catch {
case e: Exception => {
error(s"Unable to send event, see kafka log for more details: ${e.getMessage}")
e.printStackTrace()
kafkaProducer.send(new ProducerRecord(topicName, key, event), new Callback() {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if(exception != null)
error(s"Unable to send event, see kafka log for more details: ${exception.getMessage}")
exception.printStackTrace()
}
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was done in #3317, sorry I wasn't aware of this PR

}
}
Nil
Expand Down
@@ -0,0 +1,59 @@
collector {
interface = "0.0.0.0"
port = 8080

production = true

p3p {
policyref = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}

cookie {
enabled = true
expiration = 365 days
name = sp
domain = "test-domain.com"
}

sink {
enabled = "test"

kinesis {
aws {
access-key: "cpf"
secret-key: "cpf"
}
stream {
region: "us-east-1"
good: "snowplow_collector_example"
bad: "snowplow_collector_example"
}
backoffPolicy {
minBackoff: 3000 # 3 seconds
maxBackoff: 600000 # 5 minutes
}
}

kafka {
topic {
good: "good-topic"
bad: "bad-topic"
}

producer {
bootstrap.servers = "localhost:9092"
# override test - default is all
acks = 1
# additional property test
timeout.ms = 60000
}
}

buffer {
byte-limit: 4000000 # 4MB
record-limit: 500 # 500 records
time-limit: 60000 # 1 minute
}
}
}
@@ -0,0 +1,64 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import java.util.Properties

import com.typesafe.config.{Config, ConfigFactory}
import org.specs2.matcher.AnyMatchers
import org.specs2.mutable.Specification
import spray.testkit.Specs2RouteTest

class CollectorConfigSpec extends Specification with Specs2RouteTest with
AnyMatchers {

val testConf: Config = ConfigFactory.load("application.test.conf")
val collectorConfig = new CollectorConfig(testConf)
val props = collectorConfig.Kafka.Producer.getProps


"Snowplow's Collector Configuration" should {
"correctly parse Kafka configs" in {
props.getProperty("bootstrap.servers") must not be null
// default override
props.getProperty("acks") must beEqualTo ("1")
// timeout.ms is an additional property
props.getProperty("timeout.ms") must beEqualTo ("60000")
// default assertion
props.getProperty("retries") must beEqualTo ("3")
}

"correctly convert a typesafe config to a java properties object" in {
val config: Config = ConfigFactory.parseString(
"""
|object {
| string = "hello"
| boolean = false
| integer = 1
|}
""".stripMargin).getConfig("object")

val properties = new Properties()
properties.put("string", "hello")
properties.put("boolean", "false")
properties.put("integer", "1")

CollectorConfig.propsFromConfig(config, Set()) must beEqualTo(properties)
}

"correctly blacklist" in {
val config: Config = ConfigFactory.parseString(
"""
|object {
| string = "hello"
| boolean = false
| integer = 1
|}
""".stripMargin).getConfig("object")

val properties = new Properties()
properties.put("integer", "1")

CollectorConfig.propsFromConfig(config, Set("string", "boolean")) must beEqualTo(properties)
}
}

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same