Skip to content

Commit

Permalink
Merge branch release/r105-pompeii
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed May 7, 2018
2 parents d0f7117 + bc53b39 commit 9b67ff4
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class WeatherEnrichmentSpec extends Specification {
object validEvent {
var lat: JFloat = 20.713052f
var lon: JFloat = 70.98224f
var time: DateTime = new DateTime("2017-05-01T23:56:01.003+00:00")
var time: DateTime = new DateTime("2018-04-30T23:56:01.003+00:00")
}

def e1 = {
Expand Down Expand Up @@ -94,7 +94,7 @@ class WeatherEnrichmentSpec extends Specification {
stamp.toEither must beRight.like {
case weather: JValue => {
val temp = weather.findField { case JField("humidity", _) => true; case _ => false }
temp must beSome(("humidity", JDouble(97.0)))
temp must beSome(("humidity", JDouble(92.0)))
}
}
}
Expand Down Expand Up @@ -133,7 +133,7 @@ class WeatherEnrichmentSpec extends Specification {
case weather: JValue => {
val e = (weather \ "data").extractOpt[TransformedWeather]
e.map(_.dt) must beSome.like { // succesfull transformation
case dt => dt must equalTo("2017-05-02T00:00:00.000Z") // closest stamp storing on server
case dt => dt must equalTo("2018-05-01T00:00:00.000Z") // closest stamp storing on server
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions 3-enrich/stream-enrich/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ lazy val commonDependencies = Seq(
lazy val buildSettings = Seq(
organization := "com.snowplowanalytics",
name := "snowplow-stream-enrich",
version := "0.16.0",
version := "0.16.1",
description := "The streaming Snowplow Enrichment process",
scalaVersion := "2.11.11",
scalacOptions := BuildSettings.compilerOptions,
Expand Down Expand Up @@ -72,7 +72,8 @@ lazy val kinesis = project
Dependencies.Libraries.kinesisSdk,
Dependencies.Libraries.s3Sdk,
Dependencies.Libraries.dynamodbSdk,
Dependencies.Libraries.jacksonCbor
Dependencies.Libraries.jacksonCbor,
Dependencies.Libraries.jacksonDatabind
))
.dependsOn(core)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ object Source {

/** Abstract base for the different sources we support. */
abstract class Source(
goodSink: ThreadLocal[Sink],
badSink: ThreadLocal[Sink],
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker],
Expand All @@ -102,6 +100,9 @@ abstract class Source(

implicit val resolver: Resolver = igluResolver

val threadLocalGoodSink: ThreadLocal[Sink]
val threadLocalBadSink: ThreadLocal[Sink]

// Iterate through an enriched EnrichedEvent object and tab separate
// the fields to a string.
def tabSeparateEnrichedEvent(output: EnrichedEvent): String = {
Expand Down Expand Up @@ -176,12 +177,12 @@ abstract class Source(
m <- MaxRecordSize
} yield Source.oversizedSuccessToFailure(value, m) -> key

val successesTriggeredFlush = goodSink.get.storeEnrichedEvents(smallEnoughSuccesses)
val failuresTriggeredFlush = badSink.get.storeEnrichedEvents(failures ++ sizeBasedFailures)
val successesTriggeredFlush = threadLocalGoodSink.get.storeEnrichedEvents(smallEnoughSuccesses)
val failuresTriggeredFlush = threadLocalBadSink.get.storeEnrichedEvents(failures ++ sizeBasedFailures)
if (successesTriggeredFlush == true || failuresTriggeredFlush == true) {
// Block until the records have been sent to Kinesis
goodSink.get.flush
badSink.get.flush
threadLocalGoodSink.get.flush
threadLocalBadSink.get.flush
true
} else {
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import iglu.client.Resolver
import common.enrichments.EnrichmentRegistry
import model.EnrichConfig
import scalatracker.Tracker
import sinks.Sink

/**
* Source to allow the testing framework to enrich events
Expand All @@ -37,10 +38,17 @@ class TestSource(
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker]
) extends Source(null, null, igluResolver, enrichmentRegistry, tracker, "") {
) extends Source(igluResolver, enrichmentRegistry, tracker, "") {

override val MaxRecordSize = None

override val threadLocalGoodSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink = null
}
override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink = null
}

override def run(): Unit =
throw new RuntimeException("run() should not be called on TestSource")
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,31 @@ object KafkaSource {
case c: Kafka => c.success
case _ => "Configured source/sink is not Kafka".failure
}
goodSink = new ThreadLocal[Sink] {
override def initialValue =
new KafkaSink(kafkaConfig, config.buffer, config.out.enriched, tracker)
}
badSink = new ThreadLocal[Sink] {
override def initialValue = new KafkaSink(kafkaConfig, config.buffer, config.out.bad, tracker)
}
} yield new KafkaSource(goodSink, badSink, igluResolver, enrichmentRegistry, tracker, config,
kafkaConfig.brokers)
} yield new KafkaSource(igluResolver, enrichmentRegistry, tracker, config, kafkaConfig)
}
/** Source to read events from a Kafka topic */
class KafkaSource private (
goodSink: ThreadLocal[Sink],
badSink: ThreadLocal[Sink],
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker],
config: StreamsConfig,
brokers: String
) extends Source(
goodSink, badSink, igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {
kafkaConfig: Kafka
) extends Source(igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {

override val MaxRecordSize = None

override val threadLocalGoodSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink =
new KafkaSink(kafkaConfig, config.buffer, config.out.enriched, tracker)
}
override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink =
new KafkaSink(kafkaConfig, config.buffer, config.out.bad, tracker)
}

/** Never-ending processing loop over source stream. */
override def run(): Unit = {
val consumer = createConsumer(brokers, config.appName)
val consumer = createConsumer(kafkaConfig.brokers, config.appName)

log.info(s"Running Kafka consumer group: ${config.appName}.")
log.info(s"Processing raw input Kafka topic: ${config.in.raw}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,7 @@ import scalatracker.Tracker

/** KinesisSink companion object with factory method */
object KinesisSink {
def createAndInitialize(
kinesisConfig: Kinesis,
bufferConfig: BufferConfig,
streamName: String,
tracker: Option[Tracker]
): \/[String, KinesisSink] = for {
def validate(kinesisConfig: Kinesis, streamName: String): \/[String, Unit] = for {
provider <- KinesisEnrich.getProvider(kinesisConfig.aws)
endpointConfiguration =
new EndpointConfiguration(kinesisConfig.streamEndpoint, kinesisConfig.region)
Expand All @@ -56,11 +51,8 @@ object KinesisSink {
.withEndpointConfiguration(endpointConfiguration)
.build()
_ <- streamExists(client, streamName).leftMap(_.getMessage)
.flatMap { b =>
if (b) b.right
else s"Kinesis stream $streamName doesn't exist".left
}
} yield new KinesisSink(client, kinesisConfig.backoffPolicy, bufferConfig, streamName, tracker)
.ensure(s"Kinesis stream $streamName doesn't exist")(_ == true)
} yield ()

/**
* Check whether a Kinesis stream exists
Expand All @@ -78,7 +70,7 @@ object KinesisSink {
}

/** Kinesis Sink for Scala enrichment */
class KinesisSink private (
class KinesisSink(
client: AmazonKinesis,
backoffPolicy: KinesisBackoffPolicyConfig,
buffer: BufferConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import scala.collection.JavaConversions._
import scala.util.control.NonFatal

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
import com.amazonaws.services.kinesis.clientlibrary.interfaces._
import com.amazonaws.services.kinesis.clientlibrary.exceptions._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker._
Expand All @@ -56,37 +58,45 @@ object KinesisSource {
case c: Kinesis => c.success
case _ => "Configured source/sink is not Kinesis".failure
}
goodSink <-
KinesisSink.createAndInitialize(kinesisConfig, config.buffer, config.out.enriched, tracker)
.validation
threadLocalGoodSink = new ThreadLocal[Sink] {
override def initialValue = goodSink
}
badSink <-
KinesisSink.createAndInitialize(kinesisConfig, config.buffer, config.out.bad, tracker)
.validation
threadLocalBadSink = new ThreadLocal[Sink] {
override def initialValue = badSink
}
_ <- (KinesisSink.validate(kinesisConfig, config.out.enriched).validation.leftMap(_.wrapNel) |@|
KinesisSink.validate(kinesisConfig, config.out.bad).validation.leftMap(_.wrapNel)) {
(_, _) => ()
}.leftMap(_.toList.mkString("\n"))
provider <- KinesisEnrich.getProvider(kinesisConfig.aws).validation
} yield new KinesisSource(threadLocalGoodSink, threadLocalBadSink,
igluResolver, enrichmentRegistry, tracker, config, kinesisConfig, provider)
} yield new KinesisSource(igluResolver, enrichmentRegistry, tracker, config, kinesisConfig, provider)
}

/** Source to read events from a Kinesis stream */
class KinesisSource private (
goodSink: ThreadLocal[Sink],
badSink: ThreadLocal[Sink],
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker],
config: StreamsConfig,
kinesisConfig: Kinesis,
provider: AWSCredentialsProvider
) extends Source(goodSink, badSink, igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {
) extends Source(igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {

override val MaxRecordSize = Some(1000000L)

private val client = {
val endpointConfiguration =
new EndpointConfiguration(kinesisConfig.streamEndpoint, kinesisConfig.region)
AmazonKinesisClientBuilder
.standard()
.withCredentials(provider)
.withEndpointConfiguration(endpointConfiguration)
.build()
}

override val threadLocalGoodSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink =
new KinesisSink(client, kinesisConfig.backoffPolicy, config.buffer, config.out.enriched, tracker)
}
override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink =
new KinesisSink(client, kinesisConfig.backoffPolicy, config.buffer, config.out.bad, tracker)
}

/** Never-ending processing loop over source stream. */
override def run(): Unit = {
val workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,27 @@ object NsqSource {
case c: Nsq => c.success
case _ => new IllegalArgumentException("Configured source/sink is not Nsq").failure
}
goodSink = new ThreadLocal[Sink] {
override def initialValue = new NsqSink(nsqConfig, config.out.enriched)
}
badSink = new ThreadLocal[Sink] {
override def initialValue = new NsqSink(nsqConfig, config.out.bad)
}
} yield new NsqSource(goodSink, badSink, igluResolver, enrichmentRegistry, tracker, nsqConfig,
config.in.raw, config.out.partitionKey)
} yield new NsqSource(igluResolver, enrichmentRegistry, tracker, config, nsqConfig)
}

/** Source to read raw events from NSQ. */
class NsqSource private (
goodSink: ThreadLocal[sinks.Sink],
badSink: ThreadLocal[sinks.Sink],
igluResolver: Resolver,
enrichmentRegistry: EnrichmentRegistry,
tracker: Option[Tracker],
nsqConfig: Nsq,
topicName: String,
partitionKey: String
) extends Source(goodSink, badSink, igluResolver, enrichmentRegistry, tracker, partitionKey) {
config: StreamsConfig,
nsqConfig: Nsq
) extends Source(igluResolver, enrichmentRegistry, tracker, config.out.partitionKey) {

override val MaxRecordSize = None

override val threadLocalGoodSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink = new NsqSink(nsqConfig, config.out.enriched)
}
override val threadLocalBadSink: ThreadLocal[Sink] = new ThreadLocal[Sink] {
override def initialValue: Sink = new NsqSink(nsqConfig, config.out.bad)
}

/** Consumer will be started to wait new message. */
override def run(): Unit = {

Expand All @@ -90,14 +87,14 @@ class NsqSource private (

val errorCallback = new NSQErrorCallback {
override def error(e: NSQException): Unit =
log.error(s"Exception while consuming topic $topicName", e)
log.error(s"Exception while consuming topic ${config.in.raw}", e)
}

// use NSQLookupd
val lookup = new DefaultNSQLookup
lookup.addLookupAddress(nsqConfig.lookupHost, nsqConfig.lookupPort)
val consumer = new NSQConsumer(
lookup, topicName, nsqConfig.rawChannel, nsqCallback, new NSQConfig(), errorCallback)
lookup, config.in.raw, nsqConfig.rawChannel, nsqCallback, new NSQConfig(), errorCallback)
consumer.start()
}
}
1 change: 1 addition & 0 deletions 3-enrich/stream-enrich/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ object Dependencies {
val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafkaClients
val nsqClient = "com.snowplowanalytics" % "nsq-java-client" % V.nsqClient
val jacksonCbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.jackson
val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson
val commonsCodec = "commons-codec" % "commons-codec" % V.commonsCodec
val config = "com.typesafe" % "config" % V.config
val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ import utils._

/** GooglePubSubSink companion object with factory method */
object GooglePubSubSink {
def createAndInitialize(
def validateAndCreatePublisher(
googlePubSubConfig: GooglePubSub,
bufferConfig: BufferConfig,
topicName: String
): \/[Throwable, GooglePubSubSink] = for {
): \/[Throwable, Publisher] = for {
batching <- batchingSettings(bufferConfig).right
retry = retrySettings(googlePubSubConfig.backoffPolicy)
publisher <- toEither(
Expand All @@ -55,7 +55,7 @@ object GooglePubSubSink {
if (b) b.right
else new IllegalArgumentException(s"Google PubSub topic $topicName doesn't exist").left
}
} yield new GooglePubSubSink(publisher, topicName)
} yield publisher

/**
* Instantiates a Publisher on an existing topic with the given configuration options.
Expand Down Expand Up @@ -106,7 +106,7 @@ object GooglePubSubSink {
/**
* Google PubSub Sink for the Scala enrichment process
*/
class GooglePubSubSink private (publisher: Publisher, topicName: String) extends Sink {
class GooglePubSubSink(publisher: Publisher, topicName: String) extends Sink {

/**
* Convert event bytes to a PubsubMessage to be published
Expand Down

0 comments on commit 9b67ff4

Please sign in to comment.