Skip to content

Commit

Permalink
Scala Stream Collector: use sbt-tpolecat (closes #4190)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet authored and benjben committed Dec 6, 2019
1 parent 1d7718d commit 1ee0fd5
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 59 deletions.
4 changes: 0 additions & 4 deletions 2-collectors/scala-stream-collector/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ lazy val buildSettings = Seq(
version := "0.17.0",
description := "Scala Stream Collector for Snowplow raw events",
scalaVersion := "2.12.10",
scalacOptions := BuildSettings.compilerOptions,
scalacOptions in (Compile, console) ~= { _.filterNot(Set("-Ywarn-unused-import")) },
scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value,
javacOptions := BuildSettings.javaCompilerOptions,
resolvers ++= Dependencies.resolutionRepos
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ trait CollectorRoute {
} ~
path("""ice\.png""".r | "i".r) { path =>
(get | head) {
val (r,l) = collectorService.cookie(
val (r, _) = collectorService.cookie(
qs,
None,
"/" + path,
Expand Down Expand Up @@ -135,7 +135,7 @@ trait CollectorRoute {
*/
def cookieIfWanted(name: Option[String]): Directive1[Option[HttpCookiePair]] = name match {
case Some(n) => optionalCookie(n)
case None => optionalHeaderValue(x => None)
case None => optionalHeaderValue(_ => None)
}

/**
Expand All @@ -151,13 +151,13 @@ trait CollectorRoute {
}

private def crossDomainRoute: Route = get {
path("""crossdomain\.xml""".r) { path =>
path("""crossdomain\.xml""".r) { _ =>
complete(collectorService.flashCrossDomainPolicy)
}
}

private def healthRoute: Route = get {
path("health".r) { path =>
path("health".r) { _ =>
complete(HttpResponse(200, entity = "OK"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ class CollectorService(
)

val (httpResponse, badRedirectResponses) = buildHttpResponse(
event, partitionKey, queryParams, headers.toList, redirect, pixelExpected, bounce,
config.streams.sink, config.redirectMacro)
event, queryParams, headers.toList, redirect, pixelExpected, bounce, config.redirectMacro)
(httpResponse, badRedirectResponses ++ sinkResponses)
}

Expand Down Expand Up @@ -236,17 +235,15 @@ class CollectorService(
/** Builds the final http response from */
def buildHttpResponse(
event: CollectorPayload,
partitionKey: String,
queryParams: Map[String, String],
headers: List[HttpHeader],
redirect: Boolean,
pixelExpected: Boolean,
bounce: Boolean,
sinkConfig: SinkConfig,
redirectMacroConfig: RedirectMacroConfig
): (HttpResponse, List[Array[Byte]]) =
if (redirect) {
val (r, l) = buildRedirectHttpResponse(event, partitionKey, queryParams, redirectMacroConfig)
val (r, l) = buildRedirectHttpResponse(event, queryParams, redirectMacroConfig)
(r.withHeaders(r.headers ++ headers), l)
} else {
(buildUsualHttpResponse(pixelExpected, bounce).withHeaders(headers), Nil)
Expand All @@ -266,7 +263,6 @@ class CollectorService(
/** Builds the appropriate http response when dealing with click redirects. */
def buildRedirectHttpResponse(
event: CollectorPayload,
partitionKey: String,
queryParams: Map[String, String],
redirectMacroConfig: RedirectMacroConfig
): (HttpResponse, List[Array[Byte]]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import akka.http.scaladsl.server.Route
import akka.util.ByteString

trait MetricsRoute {

def metricsService: MetricsService

def metricsRoute: Route =
Expand All @@ -30,12 +29,9 @@ trait MetricsRoute {
entity = HttpEntity.Strict(MetricsRoute.`text/plain(UTF-8) v0.0.4`, ByteString(metricsService.report()))
))
}

}

object MetricsRoute {

val `text/plain(UTF-8) v0.0.4`: ContentType.WithCharset =
MediaTypes.`text/plain` withParams Map("version" -> "0.0.4") withCharset HttpCharsets.`UTF-8`

}
Original file line number Diff line number Diff line change
Expand Up @@ -204,31 +204,30 @@ class CollectorServiceSpec extends Specification {
}

"buildHttpResponse" in {
val sinkConf = TestUtils.testConf.streams.sink
val redirConf = TestUtils.testConf.redirectMacro
"rely on buildRedirectHttpResponse if redirect is true" in {
val (res, Nil) = service.buildHttpResponse(
event, "k", Map("u" -> "12"), hs, true, true, false, sinkConf, redirConf)
event, Map("u" -> "12"), hs, true, true, false, redirConf)
res shouldEqual HttpResponse(302)
.withHeaders(`RawHeader`("Location", "12") :: hs)
}
"send back a gif if pixelExpected is true" in {
val (res, Nil) = service.buildHttpResponse(
event, "k", Map.empty, hs, false, true, false, sinkConf, redirConf)
event, Map.empty, hs, false, true, false, redirConf)
res shouldEqual HttpResponse(200)
.withHeaders(hs)
.withEntity(HttpEntity(contentType = ContentType(MediaTypes.`image/gif`),
bytes = CollectorService.pixel))
}
"send back a found if pixelExpected and bounce is true" in {
val (res, Nil) = service.buildHttpResponse(
event, "k", Map.empty, hs, false, true, true, sinkConf, redirConf)
event, Map.empty, hs, false, true, true, redirConf)
res shouldEqual HttpResponse(302)
.withHeaders(hs)
}
"send back ok otherwise" in {
val (res, Nil) = service.buildHttpResponse(
event, "k", Map.empty, hs, false, false, false, sinkConf, redirConf)
event, Map.empty, hs, false, false, false, redirConf)
res shouldEqual HttpResponse(200, entity = "ok")
.withHeaders(hs)
}
Expand All @@ -251,39 +250,38 @@ class CollectorServiceSpec extends Specification {
"buildRedirectHttpResponse" in {
val redirConf = TestUtils.testConf.redirectMacro
"give back a 302 if redirecting and there is a u query param" in {
val (res, Nil) = service.buildRedirectHttpResponse(event, "k", Map("u" -> "12"), redirConf)
val (res, Nil) = service.buildRedirectHttpResponse(event, Map("u" -> "12"), redirConf)
res shouldEqual HttpResponse(302).withHeaders(`RawHeader`("Location", "12"))
}
/* scalaz incompat
"give back a 400 if redirecting and there are no u query params" in {
val (res, _) = service.buildRedirectHttpResponse(event, "k", Map.empty, redirConf)
val (res, _) = service.buildRedirectHttpResponse(event, Map.empty, redirConf)
res shouldEqual HttpResponse(400)
}*/
}
"the redirect url should ignore a cookie replacement macro on redirect if not enabled" in {
event.networkUserId = "1234"
val (res, Nil) = service.buildRedirectHttpResponse(
event, "k", Map("u" -> s"http://localhost/?uid=$${SP_NUID}"), redirConf)
event, Map("u" -> s"http://localhost/?uid=$${SP_NUID}"), redirConf)
res shouldEqual HttpResponse(302)
.withHeaders(`RawHeader`("Location", s"http://localhost/?uid=$${SP_NUID}"))
}
"the redirect url should support a cookie replacement macro on redirect if enabled" in {
event.networkUserId = "1234"
val (res, Nil) = service.buildRedirectHttpResponse(event, "k",
val (res, Nil) = service.buildRedirectHttpResponse(event,
Map("u" -> s"http://localhost/?uid=$${SP_NUID}"), redirConf.copy(enabled = true))
res shouldEqual HttpResponse(302)
.withHeaders(`RawHeader`("Location", "http://localhost/?uid=1234"))
}
"the redirect url should allow for custom token placeholders" in {
event.networkUserId = "1234"
val (res, Nil) = service.buildRedirectHttpResponse(
event, "k", Map("u" -> "http://localhost/?uid=[TOKEN]"),
event, Map("u" -> "http://localhost/?uid=[TOKEN]"),
redirConf.copy(enabled = true, Some("[TOKEN]")))
res shouldEqual HttpResponse(302)
.withHeaders(`RawHeader`("Location", "http://localhost/?uid=1234"))
}
"the redirect url should allow for double encoding for return redirects" in {
val (res, Nil) =
service.buildRedirectHttpResponse(event, "k", Map("u" -> "a%3Db"), redirConf)
service.buildRedirectHttpResponse(event, Map("u" -> "a%3Db"), redirConf)
res shouldEqual HttpResponse(302).withHeaders(`RawHeader`("Location", "a%3Db"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object KinesisSink {
val client = for {
provider <- getProvider(kinesisConfig.aws)
client = createKinesisClient(provider, kinesisConfig.endpoint, kinesisConfig.region)
exists <-
_ <-
if (streamExists(client, streamName)) true.asRight
else new IllegalArgumentException(s"Kinesis stream $streamName doesn't exist").asLeft
} yield client
Expand Down Expand Up @@ -124,7 +124,7 @@ object KinesisSink {
val status = describeStreamResult.getStreamDescription.getStreamStatus
status == "ACTIVE" || status == "UPDATING"
} catch {
case rnfe: ResourceNotFoundException => false
case _: ResourceNotFoundException => false
}
}

Expand Down Expand Up @@ -157,9 +157,7 @@ class KinesisSink private (
/**
* Recursively schedule a task to send everthing in EventStorage
* Even if the incoming event flow dries up, all stored events will eventually get sent
*
* Whenever TimeThreshold milliseconds have passed since the last call to flush, call flush.
*
* @param interval When to schedule the next flush
*/
def scheduleFlush(interval: Long = TimeThreshold): Unit = {
Expand All @@ -175,6 +173,7 @@ class KinesisSink private (
}
}
}, interval, MILLISECONDS)
()
}

object EventStorage {
Expand Down Expand Up @@ -224,6 +223,7 @@ class KinesisSink private (
sendBatch(batch, nextBackoff)
}
}, lastBackoff, MILLISECONDS)
()
}

// TODO: limit max retries?
Expand Down Expand Up @@ -274,7 +274,6 @@ class KinesisSink private (

/**
* How long to wait before sending the next request
*
* @param lastBackoff The previous backoff time
* @return Minimum of maxBackoff and a random number between minBackoff and three times lastBackoff
*/
Expand All @@ -283,5 +282,6 @@ class KinesisSink private (
def shutdown(): Unit = {
executorService.shutdown()
executorService.awaitTermination(10000, MILLISECONDS)
()
}
}
22 changes: 0 additions & 22 deletions 2-collectors/scala-stream-collector/project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,6 @@ import sbt._
import Keys._

object BuildSettings {

lazy val compilerOptions = Seq(
"-deprecation",
"-encoding", "UTF-8",
"-feature",
"-language:existentials",
"-language:higherKinds",
"-language:implicitConversions",
"-unchecked",
"-Yno-adapted-args",
"-Ywarn-dead-code",
"-Ywarn-numeric-widen",
"-Ywarn-unused-import",
"-Xfuture",
"-Xlint"
)

lazy val javaCompilerOptions = Seq(
"-source", "1.8",
"-target", "1.8"
)

// sbt-assembly settings for building an executable
import sbtassembly.AssemblyPlugin.autoImport._
lazy val sbtAssemblySettings = Seq(
Expand Down
1 change: 1 addition & 0 deletions 2-collectors/scala-stream-collector/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.7.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.25")
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.6")
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class GooglePubSubSink private (publisher: Publisher, topicName: String) extends
.setData(ByteString.copyFrom(event))
.build()

private val logExecutor = Executors.newSingleThreadExecutor()

/**
* Store raw events in the PubSub topic
* @param events The list of events to send
Expand All @@ -134,7 +136,7 @@ class GooglePubSubSink private (publisher: Publisher, topicName: String) extends
apiEx.getMessage)
case t => log.error(s"Publishing message to $topicName failed with ${t.getMessage}")
}
}, Executors.newSingleThreadExecutor())
}, logExecutor)
}
}
Nil
Expand Down

0 comments on commit 1ee0fd5

Please sign in to comment.