Skip to content

Commit

Permalink
Stream Enrich: ensure a one-to-one relationship between sink and rec…
Browse files Browse the repository at this point in the history
…ord processor (closes #3745)
  • Loading branch information
BenFradet committed Apr 30, 2018
1 parent 877c7a2 commit c9db048
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 98 deletions.
Expand Up @@ -85,8 +85,6 @@ object Source {

/** Abstract base for the different sources we support. */
abstract class Source(
goodSink: ThreadLocal[Sink],
badSink: ThreadLocal[Sink],
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker],
Expand All @@ -102,6 +100,9 @@ abstract class Source(

implicit val resolver: Resolver = igluResolver

val threadLocalGoodSink: ThreadLocal[Sink]
val threadLocalBadSink: ThreadLocal[Sink]

// Iterate through an enriched EnrichedEvent object and tab separate
// the fields to a string.
def tabSeparateEnrichedEvent(output: EnrichedEvent): String = {
Expand Down Expand Up @@ -176,12 +177,12 @@ abstract class Source(
m <- MaxRecordSize
} yield Source.oversizedSuccessToFailure(value, m) -> key

val successesTriggeredFlush = goodSink.get.storeEnrichedEvents(smallEnoughSuccesses)
val failuresTriggeredFlush = badSink.get.storeEnrichedEvents(failures ++ sizeBasedFailures)
val successesTriggeredFlush = threadLocalGoodSink.get.storeEnrichedEvents(smallEnoughSuccesses)
val failuresTriggeredFlush = threadLocalBadSink.get.storeEnrichedEvents(failures ++ sizeBasedFailures)
if (successesTriggeredFlush == true || failuresTriggeredFlush == true) {
// Block until the records have been sent to Kinesis
goodSink.get.flush
badSink.get.flush
threadLocalGoodSink.get.flush
threadLocalBadSink.get.flush
true
} else {
false
Expand Down
Expand Up @@ -26,6 +26,7 @@ import iglu.client.Resolver
import common.enrichments.EnrichmentRegistry
import model.EnrichConfig
import scalatracker.Tracker
import sinks.Sink

/**
* Source to allow the testing framework to enrich events
Expand All @@ -37,10 +38,17 @@ class TestSource(
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker]
) extends Source(null, null, igluResolver, enrichmentRegistry, tracker, "") {
) extends Source(igluResolver, enrichmentRegistry, tracker, "") {

override val MaxRecordSize = None

override val threadLocalGoodSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink = null
}
override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink = null
}

override def run(): Unit =
throw new RuntimeException("run() should not be called on TestSource")
}
Expand Up @@ -48,33 +48,31 @@ object KafkaSource {
case c: Kafka => c.success
case _ => "Configured source/sink is not Kafka".failure
}
goodSink = new ThreadLocal[Sink] {
override def initialValue =
new KafkaSink(kafkaConfig, config.buffer, config.out.enriched, tracker)
}
badSink = new ThreadLocal[Sink] {
override def initialValue = new KafkaSink(kafkaConfig, config.buffer, config.out.bad, tracker)
}
} yield new KafkaSource(goodSink, badSink, igluResolver, enrichmentRegistry, tracker, config,
kafkaConfig.brokers)
} yield new KafkaSource(igluResolver, enrichmentRegistry, tracker, config, kafkaConfig)
}
/** Source to read events from a Kafka topic */
class KafkaSource private (
goodSink: ThreadLocal[Sink],
badSink: ThreadLocal[Sink],
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker],
config: StreamsConfig,
brokers: String
) extends Source(
goodSink, badSink, igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {
kafkaConfig: Kafka
) extends Source(igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {

override val MaxRecordSize = None

override val threadLocalGoodSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink =
new KafkaSink(kafkaConfig, config.buffer, config.out.enriched, tracker)
}
override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink =
new KafkaSink(kafkaConfig, config.buffer, config.out.bad, tracker)
}

/** Never-ending processing loop over source stream. */
override def run(): Unit = {
val consumer = createConsumer(brokers, config.appName)
val consumer = createConsumer(kafkaConfig.brokers, config.appName)

log.info(s"Running Kafka consumer group: ${config.appName}.")
log.info(s"Processing raw input Kafka topic: ${config.in.raw}")
Expand Down
Expand Up @@ -41,12 +41,7 @@ import scalatracker.Tracker

/** KinesisSink companion object with factory method */
object KinesisSink {
def createAndInitialize(
kinesisConfig: Kinesis,
bufferConfig: BufferConfig,
streamName: String,
tracker: Option[Tracker]
): \/[String, KinesisSink] = for {
def validate(kinesisConfig: Kinesis, streamName: String): \/[String, Boolean] = for {
provider <- KinesisEnrich.getProvider(kinesisConfig.aws)
endpointConfiguration =
new EndpointConfiguration(kinesisConfig.streamEndpoint, kinesisConfig.region)
Expand All @@ -55,12 +50,12 @@ object KinesisSink {
.withCredentials(provider)
.withEndpointConfiguration(endpointConfiguration)
.build()
_ <- streamExists(client, streamName).leftMap(_.getMessage)
exists <- streamExists(client, streamName).leftMap(_.getMessage)
.flatMap { b =>
if (b) b.right
else s"Kinesis stream $streamName doesn't exist".left
}
} yield new KinesisSink(client, kinesisConfig.backoffPolicy, bufferConfig, streamName, tracker)
} yield exists

/**
* Check whether a Kinesis stream exists
Expand All @@ -78,7 +73,7 @@ object KinesisSink {
}

/** Kinesis Sink for Scala enrichment */
class KinesisSink private (
class KinesisSink(
client: AmazonKinesis,
backoffPolicy: KinesisBackoffPolicyConfig,
buffer: BufferConfig,
Expand Down
Expand Up @@ -30,6 +30,8 @@ import scala.collection.JavaConversions._
import scala.util.control.NonFatal

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
import com.amazonaws.services.kinesis.clientlibrary.interfaces._
import com.amazonaws.services.kinesis.clientlibrary.exceptions._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker._
Expand All @@ -56,37 +58,43 @@ object KinesisSource {
case c: Kinesis => c.success
case _ => "Configured source/sink is not Kinesis".failure
}
goodSink <-
KinesisSink.createAndInitialize(kinesisConfig, config.buffer, config.out.enriched, tracker)
.validation
threadLocalGoodSink = new ThreadLocal[Sink] {
override def initialValue = goodSink
}
badSink <-
KinesisSink.createAndInitialize(kinesisConfig, config.buffer, config.out.bad, tracker)
.validation
threadLocalBadSink = new ThreadLocal[Sink] {
override def initialValue = badSink
}
_ <- KinesisSink.validate(kinesisConfig, config.out.enriched).validation
_ <- KinesisSink.validate(kinesisConfig, config.out.bad).validation
provider <- KinesisEnrich.getProvider(kinesisConfig.aws).validation
} yield new KinesisSource(threadLocalGoodSink, threadLocalBadSink,
igluResolver, enrichmentRegistry, tracker, config, kinesisConfig, provider)
} yield new KinesisSource(igluResolver, enrichmentRegistry, tracker, config, kinesisConfig, provider)
}

/** Source to read events from a Kinesis stream */
class KinesisSource private (
goodSink: ThreadLocal[Sink],
badSink: ThreadLocal[Sink],
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker],
config: StreamsConfig,
kinesisConfig: Kinesis,
provider: AWSCredentialsProvider
) extends Source(goodSink, badSink, igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {
) extends Source(igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {

override val MaxRecordSize = Some(1000000L)

private val client = {
val endpointConfiguration =
new EndpointConfiguration(kinesisConfig.streamEndpoint, kinesisConfig.region)
AmazonKinesisClientBuilder
.standard()
.withCredentials(provider)
.withEndpointConfiguration(endpointConfiguration)
.build()
}

override val threadLocalGoodSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink =
new KinesisSink(client, kinesisConfig.backoffPolicy, config.buffer, config.out.enriched, tracker)
}
override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink =
new KinesisSink(client, kinesisConfig.backoffPolicy, config.buffer, config.out.bad, tracker)
}

/** Never-ending processing loop over source stream. */
override def run(): Unit = {
val workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID()
Expand Down
Expand Up @@ -51,30 +51,27 @@ object NsqSource {
case c: Nsq => c.success
case _ => new IllegalArgumentException("Configured source/sink is not Nsq").failure
}
goodSink = new ThreadLocal[Sink] {
override def initialValue = new NsqSink(nsqConfig, config.out.enriched)
}
badSink = new ThreadLocal[Sink] {
override def initialValue = new NsqSink(nsqConfig, config.out.bad)
}
} yield new NsqSource(goodSink, badSink, igluResolver, enrichmentRegistry, tracker, nsqConfig,
config.in.raw, config.out.partitionKey)
} yield new NsqSource(igluResolver, enrichmentRegistry, tracker, config, nsqConfig)
}

/** Source to read raw events from NSQ. */
class NsqSource private (
goodSink: ThreadLocal[sinks.Sink],
badSink: ThreadLocal[sinks.Sink],
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker],
nsqConfig: Nsq,
topicName: String,
partitionKey: String
) extends Source(goodSink, badSink, igluResolver, enrichmentRegistry, tracker, partitionKey) {
config: StreamsConfig,
nsqConfig: Nsq
) extends Source(igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {

override val MaxRecordSize = None

override val threadLocalGoodSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink = new NsqSink(nsqConfig, config.out.enriched)
}
override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink = new NsqSink(nsqConfig, config.out.bad)
}

/** Consumer will be started to wait new message. */
override def run(): Unit = {

Expand All @@ -90,14 +87,14 @@ class NsqSource private (

val errorCallback = new NSQErrorCallback {
override def error(e: NSQException): Unit =
log.error(s"Exception while consuming topic $topicName", e)
log.error(s"Exception while consuming topic $${config.in.raw}", e)
}

// use NSQLookupd
val lookup = new DefaultNSQLookup
lookup.addLookupAddress(nsqConfig.lookupHost, nsqConfig.lookupPort)
val consumer = new NSQConsumer(
lookup, topicName, nsqConfig.rawChannel, nsqCallback, new NSQConfig(), errorCallback)
lookup, config.in.raw, nsqConfig.rawChannel, nsqCallback, new NSQConfig(), errorCallback)
consumer.start()
}
}
Expand Up @@ -41,11 +41,11 @@ import utils._

/** GooglePubSubSink companion object with factory method */
object GooglePubSubSink {
def createAndInitialize(
def validateAndCreatePublisher(
googlePubSubConfig: GooglePubSub,
bufferConfig: BufferConfig,
topicName: String
): \/[Throwable, GooglePubSubSink] = for {
): \/[Throwable, Publisher] = for {
batching <- batchingSettings(bufferConfig).right
retry = retrySettings(googlePubSubConfig.backoffPolicy)
publisher <- toEither(
Expand All @@ -55,7 +55,7 @@ object GooglePubSubSink {
if (b) b.right
else new IllegalArgumentException(s"Google PubSub topic $topicName doesn't exist").left
}
} yield new GooglePubSubSink(publisher, topicName)
} yield publisher

/**
* Instantiates a Publisher on an existing topic with the given configuration options.
Expand Down Expand Up @@ -106,7 +106,7 @@ object GooglePubSubSink {
/**
* Google PubSub Sink for the Scala enrichment process
*/
class GooglePubSubSink private (publisher: Publisher, topicName: String) extends Sink {
class GooglePubSubSink(publisher: Publisher, topicName: String) extends Sink {

/**
* Convert event bytes to a PubsubMessage to be published
Expand Down
Expand Up @@ -55,23 +55,17 @@ object GooglePubSubSource {
case c: GooglePubSub => c.success
case _ => new IllegalArgumentException("Configured source/sink is not Google PubSub").failure
}
goodSink <- GooglePubSubSink
.createAndInitialize(googlePubSubConfig, config.buffer, config.out.enriched)
goodPublisher <- GooglePubSubSink
.validateAndCreatePublisher(googlePubSubConfig, config.buffer, config.out.enriched)
.validation
threadLocalGoodSink = new ThreadLocal[Sink] {
override def initialValue = goodSink
}
badSink <- GooglePubSubSink
.createAndInitialize(googlePubSubConfig, config.buffer, config.out.bad)
badPublisher <- GooglePubSubSink
.validateAndCreatePublisher(googlePubSubConfig, config.buffer, config.out.bad)
.validation
threadLocalBadSink = new ThreadLocal[Sink] {
override def initialValue = badSink
}
topic = ProjectTopicName.of(googlePubSubConfig.googleProjectId, config.in.raw)
subName = ProjectSubscriptionName.of(googlePubSubConfig.googleProjectId, config.appName)
_ <- toEither(createSubscriptionIfNotExist(subName, topic)).validation
} yield new GooglePubSubSource(threadLocalGoodSink, threadLocalBadSink, igluResolver,
enrichmentRegistry, tracker, subName, googlePubSubConfig.threadPoolSize, config.out.partitionKey)
} yield new GooglePubSubSource(goodPublisher, badPublisher, igluResolver,
enrichmentRegistry, tracker, config, googlePubSubConfig, subName)

private def createSubscriptionIfNotExist(
sub: ProjectSubscriptionName,
Expand All @@ -93,19 +87,26 @@ object GooglePubSubSource {

/** Source to read events from a GCP Pub/Sub topic */
class GooglePubSubSource private (
goodSink: ThreadLocal[Sink],
badSink: ThreadLocal[Sink],
goodPubslisher: Publisher,
badPublisher: Publisher,
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker],
subName: ProjectSubscriptionName,
threadPoolSize: Int,
partitionKey: String
) extends Source(goodSink, badSink, igluResolver, enrichmentRegistry, tracker, partitionKey) {
config: StreamsConfig,
pubsubConfig: GooglePubSub,
subName: ProjectSubscriptionName
) extends Source(igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {

override val MaxRecordSize = Some(10000000L)

private val subscriber = createSubscriber(subName, threadPoolSize)
private val subscriber = createSubscriber(subName, pubsubConfig.threadPoolSize)

override val threadLocalGoodSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink = new GooglePubSubSink(goodPubslisher, config.out.enriched)
}
override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink = new GooglePubSubSink(badPublisher, config.out.bad)
}

/** Never-ending processing loop over source stream. */
override def run(): Unit = subscriber.startAsync().awaitRunning()
Expand Down

0 comments on commit c9db048

Please sign in to comment.