diff --git a/2-collectors/scala-stream-collector/examples/config.hocon.sample b/2-collectors/scala-stream-collector/examples/config.hocon.sample index e320bd36f5..e238f83efe 100644 --- a/2-collectors/scala-stream-collector/examples/config.hocon.sample +++ b/2-collectors/scala-stream-collector/examples/config.hocon.sample @@ -28,6 +28,19 @@ collector { # records stored in the current stream. production = true + # Defines whether the "No-third party cookie check" should be perfomed. + # If this parameter is not specified, it defaults to false. + # This will create an auto-redirect to itself with `third-party-cookie-param` + # if set to true. + third-party-redirect-enabled = false + + # The name of the request parameter which will be used on redirect if 'third-party-redirect-enabled' + # is set to true. + third-party-cookie-param = "n3pc" + + #The network id which will be set in case the 3rd party cookie was not set. + fallback-network-id = "00000000-0000-4000-A000-000000000000" + # Configure the P3P policy header. p3p { policyref = "/w3c/p3p.xml" diff --git a/2-collectors/scala-stream-collector/project/Dependencies.scala b/2-collectors/scala-stream-collector/project/Dependencies.scala index 494a6c3c3d..5241fb5674 100644 --- a/2-collectors/scala-stream-collector/project/Dependencies.scala +++ b/2-collectors/scala-stream-collector/project/Dependencies.scala @@ -50,6 +50,7 @@ object Dependencies { // Using the newest version of spec (2.3.6) causes // conflicts with `spray` for `com.chuusai.shapeless` val specs2 = "2.2.3" + val mockito = "1.9.5" } object Libraries { @@ -77,6 +78,7 @@ object Dependencies { val json4sJackson = "org.json4s" %% "json4s-jackson" % V.json4s // Scala (test only) + val mockito = "org.mockito" % "mockito-all" % V.mockito % "test" val specs2 = "org.specs2" %% "specs2" % V.specs2 % "test" val sprayTestkit = "io.spray" %% "spray-testkit" % V.spray % "test" } diff --git a/2-collectors/scala-stream-collector/project/ScalaCollectorBuild.scala b/2-collectors/scala-stream-collector/project/ScalaCollectorBuild.scala index 0f42ddd328..b72562aa5a 100644 --- a/2-collectors/scala-stream-collector/project/ScalaCollectorBuild.scala +++ b/2-collectors/scala-stream-collector/project/ScalaCollectorBuild.scala @@ -50,7 +50,8 @@ object ScalaCollectorBuild extends Build { Libraries.scalaz7, Libraries.snowplowRawEvent, Libraries.collectorPayload, - Libraries.json4sJackson + Libraries.json4sJackson, + Libraries.mockito ) ) } diff --git a/2-collectors/scala-stream-collector/src/main/scala/com.snowplowanalytics.snowplow.collectors/scalastream/ResponseHandler.scala b/2-collectors/scala-stream-collector/src/main/scala/com.snowplowanalytics.snowplow.collectors/scalastream/ResponseHandler.scala index 49cda070c5..0a5308dd9f 100644 --- a/2-collectors/scala-stream-collector/src/main/scala/com.snowplowanalytics.snowplow.collectors/scalastream/ResponseHandler.scala +++ b/2-collectors/scala-stream-collector/src/main/scala/com.snowplowanalytics.snowplow.collectors/scalastream/ResponseHandler.scala @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.util.UUID import java.net.URI import org.apache.http.client.utils.URLEncodedUtils +import spray.http.{HttpHeader, HttpResponse, StatusCodes} // Apache Commons import org.apache.commons.codec.binary.Base64 @@ -96,122 +97,141 @@ class ResponseHandler(config: CollectorConfig, sinks: CollectorSinks)(implicit c // When `/i` is requested, this is called and stores an event in the // Kinisis sink and returns an invisible pixel with a cookie. def cookie(queryParams: String, body: String, requestCookie: Option[HttpCookie], - userAgent: Option[String], hostname: String, ip: RemoteAddress, - request: HttpRequest, refererUri: Option[String], path: String, pixelExpected: Boolean): - (HttpResponse, List[Array[Byte]]) = { - - // Make a Tuple2 with the ip address and the shard partition key - val (ipAddress, partitionKey) = ip.toOption.map(_.getHostAddress) match { - case None => ("unknown", UUID.randomUUID.toString) - case Some(ip) => (ip, if (config.useIpAddressAsPartitionKey) ip else UUID.randomUUID.toString) - } - - // Use the same UUID if the request cookie contains `sp`. - val networkUserId: String = requestCookie match { - case Some(rc) => rc.content - case None => UUID.randomUUID.toString - } + userAgent: Option[String], hostname: String, ip: RemoteAddress, + request: HttpRequest, refererUri: Option[String], path: String, pixelExpected: Boolean): + (HttpResponse, List[Array[Byte]]) = { + val thirdPartyCookieParamPresent = Option(queryParams).exists(_.contains(config.thirdPartyCookiesParameter)) + val shouldRedirect = config.n3pcRedirectEnabled && requestCookie.isEmpty && !thirdPartyCookieParamPresent && pixelExpected + + // Make a Tuple2 with the ip address and the shard partition key + val (ipAddress, partitionKey) = ip.toOption.map(_.getHostAddress) match { + case None => ("unknown", UUID.randomUUID.toString) + case Some(someIp) => (someIp, if (config.useIpAddressAsPartitionKey) someIp else UUID.randomUUID.toString) + } - // Construct an event object from the request. - val timestamp: Long = System.currentTimeMillis - - val event = new CollectorPayload( - "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0", - ipAddress, - timestamp, - "UTF-8", - Collector - ) - - event.path = path - event.querystring = queryParams - event.body = body - event.hostname = hostname - event.networkUserId = networkUserId - - userAgent.foreach(event.userAgent = _) - refererUri.foreach(event.refererUri = _) - event.headers = request.headers.flatMap { - case _: `Remote-Address` | _: `Raw-Request-URI` => None - case other => Some(other.toString) - } + // Use the same UUID if the request cookie contains `sp`. + val networkUserId: String = requestCookie match { + case Some(rc) => rc.content + case None => + if (thirdPartyCookieParamPresent) config.fallbackNetworkUserId + else UUID.randomUUID.toString + } - // Set the content type - request.headers.find(_ match {case `Content-Type`(ct) => true; case _ => false}) foreach { + // Construct an event object from the request. + val timestamp: Long = System.currentTimeMillis + + val event = new CollectorPayload( + "iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0", + ipAddress, + timestamp, + "UTF-8", + Collector + ) + + event.path = path + event.querystring = queryParams + event.body = body + event.hostname = hostname + event.networkUserId = networkUserId + + userAgent.foreach(event.userAgent = _) + refererUri.foreach(event.refererUri = _) + event.headers = request.headers.flatMap { + case _: `Remote-Address` | _: `Raw-Request-URI` => None + case other => Some(other.toString) + } - // toLowerCase called because Spray seems to convert "utf" to "UTF" - ct => event.contentType = ct.value.toLowerCase - } + // Set the content type + request.headers.find { case `Content-Type`(ct) => true; case _ => false } foreach { - // Only send to Kinesis if we aren't shutting down - val sinkResponse = if (!KinesisSink.shuttingDown) { + // toLowerCase called because Spray seems to convert "utf" to "UTF" + ct => event.contentType = ct.value.toLowerCase + } - // Split events into Good and Bad - val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) + // Only send to Kinesis if we aren't shutting down or we're expecting a redirect + val sinkResponse = if (!shouldRedirect && !KinesisSink.shuttingDown) { + // Split events into Good and Bad + val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) - // Send events to respective sinks - val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) - val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) + // Send events to respective sinks + val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) + val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) - // Sink Responses for Test Sink - sinkResponseGood ++ sinkResponseBad - } else { - null - } - - val policyRef = config.p3pPolicyRef - val CP = config.p3pCP - - val headersWithoutCookie = List( - RawHeader("P3P", "policyref=\"%s\", CP=\"%s\"".format(policyRef, CP)), - getAccessControlAllowOriginHeader(request), - `Access-Control-Allow-Credentials`(true) - ) - - val headers = config.cookieConfig match { - case Some(cookieConfig) => - val responseCookie = HttpCookie( - cookieConfig.name, networkUserId, - expires=Some(DateTime.now + cookieConfig.expiration), - domain=cookieConfig.domain - ) - `Set-Cookie`(responseCookie) :: headersWithoutCookie - case None => headersWithoutCookie - } + // Sink Responses for Test Sink + sinkResponseGood ++ sinkResponseBad + } else { + Nil + } - val (httpResponse, badQsResponse) = if (path startsWith "/r/") { - // A click redirect - try { - // TODO: log errors to Kinesis as BadRows - val target = URLEncodedUtils.parse(URI.create("?" + queryParams), "UTF-8") - .find(_.getName == "u") - .map(_.getValue) - target match { - case Some(t) => HttpResponse(302).withHeaders(`Location`(t) :: headers) -> Nil - // case None => badRequest -> sinks.bad.storeRawEvents(List("TODO".getBytes), partitionKey) - case None => { - val everythingSerialized = new String(SplitBatch.ThriftSerializer.get().serialize(event)) - badRequest -> sinks.bad.storeRawEvents(List(createBadRow(event, s"Redirect failed due to lack of u parameter")), partitionKey) - } - } - } catch { - case NonFatal(e) => { + val headers = composeHeaders(request, shouldRedirect, networkUserId, queryParams) + + val (httpResponse, badQsResponse) = if (path startsWith "/r/") { + // A click redirect + try { + // TODO: log errors to Kinesis as BadRows + val target = URLEncodedUtils.parse(URI.create("?" + queryParams), "UTF-8") + .find(_.getName == "u") + .map(_.getValue) + target match { + case Some(t) => HttpResponse(302).withHeaders(`Location`(t) :: headers) -> Nil + // case None => badRequest -> sinks.bad.storeRawEvents(List("TODO".getBytes), partitionKey) + case None => { val everythingSerialized = new String(SplitBatch.ThriftSerializer.get().serialize(event)) - badRequest -> sinks.bad.storeRawEvents(List(createBadRow(event, s"Redirect failed due to error $e")), partitionKey) + badRequest -> sinks.bad.storeRawEvents(List(createBadRow(event, s"Redirect failed due to lack of u parameter")), partitionKey) } } - } else if (KinesisSink.shuttingDown) { - // So that the tracker knows the request failed and can try to resend later - notFound -> Nil - } else (if (pixelExpected) { - HttpResponse(entity = HttpEntity(`image/gif`, ResponseHandler.pixel)) + } catch { + case NonFatal(e) => { + val everythingSerialized = new String(SplitBatch.ThriftSerializer.get().serialize(event)) + badRequest -> sinks.bad.storeRawEvents(List(createBadRow(event, s"Redirect failed due to error $e")), partitionKey) + } + } + } else if (KinesisSink.shuttingDown) { + // So that the tracker knows the request failed and can try to resend later + notFound -> Nil + } else { + if (pixelExpected) { + if (shouldRedirect) HttpResponse(StatusCodes.Found) else HttpResponse(entity = HttpEntity(`image/gif`, ResponseHandler.pixel)) } else { // See https://github.com/snowplow/snowplow-javascript-tracker/issues/482 HttpResponse(entity = "ok") - }).withHeaders(headers) -> Nil + } + }.withHeaders(headers) -> Nil - (httpResponse, badQsResponse ++ sinkResponse) + (httpResponse, badQsResponse ++ sinkResponse) + } + + private def composeHeaders(request: HttpRequest, shouldRedirect: Boolean, networkUserId: String, queryParams: String) = { + val headersWithoutCookie = List( + RawHeader("P3P", "policyref=\"%s\", CP=\"%s\"".format(config.p3pPolicyRef, config.p3pCP)), + getAccessControlAllowOriginHeader(request), + `Access-Control-Allow-Credentials`(true) + ) + + val resolvedCookies = config.cookieConfig match { + case Some(cookieConfig) => + val responseCookie = HttpCookie( + cookieConfig.name, networkUserId, + expires = Some(DateTime.now + cookieConfig.expiration), + domain = cookieConfig.domain + ) + `Set-Cookie`(responseCookie) :: headersWithoutCookie + case None => headersWithoutCookie } + lazy val protoValue = request.headers.find(_.name == "X-Forwarded-Proto").map(_.value).getOrElse("http") + + val headers = if (shouldRedirect) { + val originalUri = URLEncodedUtils.parse(URI.create("?" + queryParams), "UTF-8") + .find(_.getName == "url") + .map(_.getValue) + val originalUriScheme = originalUri.map(URI.create(_).getScheme).getOrElse(protoValue) + val uri = request.uri.withQuery(queryParams) + val allParams = uri.query.toMap ++ Map(config.thirdPartyCookiesParameter -> "true") + val redirectUri = request.uri.withQuery(allParams).withScheme(originalUriScheme) + `Location`(redirectUri) :: resolvedCookies + } else resolvedCookies + headers + } /** * Creates a response to the CORS preflight Options request @@ -222,7 +242,7 @@ class ResponseHandler(config: CollectorConfig, sinks: CollectorSinks)(implicit c def preflightResponse(request: HttpRequest) = HttpResponse().withHeaders(List( getAccessControlAllowOriginHeader(request), `Access-Control-Allow-Credentials`(true), - `Access-Control-Allow-Headers`( "Content-Type"))) + `Access-Control-Allow-Headers`("Content-Type"))) def flashCrossDomainPolicy = HttpEntity( contentType = ContentType(MediaTypes.`text/xml`, HttpCharsets.`ISO-8859-1`), @@ -242,10 +262,10 @@ class ResponseHandler(config: CollectorConfig, sinks: CollectorSinks)(implicit c * @return Header */ private def getAccessControlAllowOriginHeader(request: HttpRequest) = - `Access-Control-Allow-Origin`(request.headers.find(_ match { + `Access-Control-Allow-Origin`(request.headers.find { case `Origin`(origin) => true case _ => false - }) match { + } match { case Some(`Origin`(origin)) => SomeOrigins(origin) case _ => AllOrigins }) diff --git a/2-collectors/scala-stream-collector/src/main/scala/com.snowplowanalytics.snowplow.collectors/scalastream/ScalaCollectorApp.scala b/2-collectors/scala-stream-collector/src/main/scala/com.snowplowanalytics.snowplow.collectors/scalastream/ScalaCollectorApp.scala index e645535a4b..d7b7279afc 100644 --- a/2-collectors/scala-stream-collector/src/main/scala/com.snowplowanalytics.snowplow.collectors/scalastream/ScalaCollectorApp.scala +++ b/2-collectors/scala-stream-collector/src/main/scala/com.snowplowanalytics.snowplow.collectors/scalastream/ScalaCollectorApp.scala @@ -125,11 +125,11 @@ object ScalaCollector extends App { // Return Options from the configuration. object Helper { implicit class RichConfig(val underlying: Config) extends AnyVal { - def getOptionalString(path: String): Option[String] = try { - Some(underlying.getString(path)) - } catch { - case e: ConfigException.Missing => None - } + def catchMissing = util.control.Exception.catching(classOf[ConfigException.Missing]) + + def getOptionalString(path: String): Option[String] = catchMissing opt underlying.getString(path) + + def getOptionalBoolean(path: String): Option[Boolean] = catchMissing opt underlying.getBoolean(path) } } @@ -154,6 +154,11 @@ class CollectorConfig(config: Config) { val port = collector.getInt("port") val production = collector.getBoolean("production") + //Third party cookie config params. + val n3pcRedirectEnabled = collector.getOptionalBoolean("third-party-redirect-enabled").getOrElse(false) + val thirdPartyCookiesParameter = collector.getOptionalString("third-party-cookie-param").getOrElse("n3pc") + val fallbackNetworkUserId = collector.getOptionalString("fallback-network-id").getOrElse("00000000-0000-4000-A000-000000000000") + private val p3p = collector.getConfig("p3p") val p3pPolicyRef = p3p.getString("policyref") val p3pCP = p3p.getString("CP") diff --git a/2-collectors/scala-stream-collector/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala b/2-collectors/scala-stream-collector/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala index d5c50c54c0..5f699d01e4 100644 --- a/2-collectors/scala-stream-collector/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala +++ b/2-collectors/scala-stream-collector/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorServiceSpec.scala @@ -17,6 +17,9 @@ package collectors package scalastream // Scala +import org.specs2.mock.Mockito +import spray.http.{Rendering, StatusCodes} + import scala.collection.mutable.MutableList // Akka @@ -25,20 +28,14 @@ import akka.actor.{ActorSystem, Props} // Specs2 and Spray testing import org.specs2.matcher.AnyMatchers import org.specs2.mutable.Specification -import org.specs2.specification.{Scope,Fragments} import spray.testkit.Specs2RouteTest // Spray import spray.http.{DateTime,HttpHeader,HttpRequest,HttpCookie,RemoteAddress} -import spray.http.HttpHeaders.{ - Cookie, - `Set-Cookie`, - `Remote-Address`, - `Raw-Request-URI` -} +import spray.http.HttpHeaders._ // Config -import com.typesafe.config.{ConfigFactory,Config,ConfigException} +import com.typesafe.config.{ConfigFactory,Config} // Thrift import org.apache.thrift.TDeserializer @@ -47,15 +44,17 @@ import org.apache.thrift.TDeserializer import sinks._ import CollectorPayload.thrift.model1.CollectorPayload -class CollectorServiceSpec extends Specification with Specs2RouteTest with +class CollectorServiceSpec extends Specification with Specs2RouteTest with Mockito with AnyMatchers { - val testConf: Config = ConfigFactory.parseString(""" + + + def testConf(n3pcEnabled: Boolean): Config = ConfigFactory.parseString(s""" collector { interface = "0.0.0.0" port = 8080 production = true - + third-party-redirect-enabled = $n3pcEnabled p3p { policyref = "/w3c/p3p.xml" CP = "NOI DSP COR NID PSA OUR IND COM NAV STA" @@ -94,32 +93,172 @@ collector { } } """) - val collectorConfig = new CollectorConfig(testConf) - val sink = new TestSink - val sinks = CollectorSinks(sink, sink) - val responseHandler = new ResponseHandler(collectorConfig, sinks) - val collectorService = new CollectorService(collectorConfig, responseHandler, system) - val thriftDeserializer = new TDeserializer - - // By default, spray will always add Remote-Address to every request - // when running with the `spray.can.server.remote-address-header` - // option. However, the testing does not read this option and a - // remote address always needs to be set. - def CollectorGet(uri: String, cookie: Option[`HttpCookie`] = None, - remoteAddr: String = "127.0.0.1") = { - val headers: MutableList[HttpHeader] = - MutableList(`Remote-Address`(remoteAddr),`Raw-Request-URI`(uri)) - cookie.foreach(headers += `Cookie`(_)) - Get(uri).withHeaders(headers.toList) - } - "Snowplow's Scala collector" should { - "return an invisible pixel" in { + "Snowplow's Scala collector with n3pc redirect " should { + val collectorConfig = new CollectorConfig(testConf(n3pcEnabled = true)) + + // By default, spray will always add Remote-Address to every request + // when running with the `spray.can.server.remote-address-header` + // option. However, the testing does not read this option and a + // remote address always needs to be set. + def CollectorGet(uri: String, cookie: Option[`HttpCookie`] = None, + remoteAddr: String = "127.0.0.1", additionalHeaders: List[HttpHeader] = List()) = { + val headers: MutableList[HttpHeader] = + MutableList(`Remote-Address`(remoteAddr),`Raw-Request-URI`(uri)) ++ additionalHeaders + cookie.foreach(headers += `Cookie`(_)) + Get(uri).withHeaders(headers.toList) + } + "return a redirect with n3pc parameter while keeping request params and using original site schema" in { + val sink = smartMock[AbstractSink] + sink.storeRawEvents(any[List[Array[Byte]]], anyString).answers{(params, mock) => params match { + case Array(firstArg, secondArg) => firstArg.asInstanceOf[List[Array[Byte]]] + } + } + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) + val originalUrl = "/i?uid=someUid&cx=someCx&url=https://example.com/something" + CollectorGet(originalUrl) ~> collectorService.collectorRoute ~> check { + response.status.mustEqual(StatusCodes.Found) + //we're redirecting so we should not save anything to kinesis + there was no(sink).storeRawEvents(any, any) + val locationHeader: String = header("Location").get.value + val httpCookies: List[HttpCookie] = headers.collect { + case `Set-Cookie`(hc) => hc + } + locationHeader must beEqualTo(s"https://example.com$originalUrl&${collectorConfig.thirdPartyCookiesParameter}=true") + locationHeader must contain(collectorConfig.thirdPartyCookiesParameter) + //a cookie must be sent back + httpCookies must not be empty + } + } + + "return a redirect with n3pc parameter with protocol from X-Forwarded-Proto" in { + val sink = smartMock[AbstractSink] + + sink.storeRawEvents(any[List[Array[Byte]]], anyString).answers{(params, mock) => params match { + case Array(firstArg, secondArg) => firstArg.asInstanceOf[List[Array[Byte]]] + } + } + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) + val originalUrl = "/i?uid=someUid&cx=someCx" + CollectorGet(originalUrl, additionalHeaders = List(RawHeader("X-Forwarded-Proto", "https"))) ~> collectorService.collectorRoute ~> check { + response.status.mustEqual(StatusCodes.Found) + //we're redirecting so we should not save anything to kinesis + there was no(sink).storeRawEvents(any, any) + val locationHeader: String = header("Location").get.value + val httpCookies: List[HttpCookie] = headers.collect { + case `Set-Cookie`(hc) => hc + } + locationHeader must beEqualTo(s"https://example.com$originalUrl&${collectorConfig.thirdPartyCookiesParameter}=true") + locationHeader must contain(collectorConfig.thirdPartyCookiesParameter) + //a cookie must be sent back + httpCookies must not be empty + } + } + + "return a redirect with n3pc parameter while keeping request params" in { + val sink = smartMock[AbstractSink] + sink.storeRawEvents(any[List[Array[Byte]]], anyString).answers{(params, mock) => params match { + case Array(firstArg, secondArg) => firstArg.asInstanceOf[List[Array[Byte]]] + } + } + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) + val originalUrl = "/i?uid=someUid&cx=someCx" + CollectorGet(originalUrl) ~> collectorService.collectorRoute ~> check { + response.status.mustEqual(StatusCodes.Found) + //we're redirecting so we should not save anything to kinesis + there was no(sink).storeRawEvents(any, any) + val locationHeader: String = header("Location").get.value + val httpCookies: List[HttpCookie] = headers.collect { + case `Set-Cookie`(hc) => hc + } + locationHeader must beEqualTo(s"http://example.com$originalUrl&${collectorConfig.thirdPartyCookiesParameter}=true") + locationHeader must contain(collectorConfig.thirdPartyCookiesParameter) + //a cookie must be sent back + httpCookies must not be empty + } + } + + "return a redirect with n3pc parameter " in { + val sink = smartMock[AbstractSink] + sink.storeRawEvents(any[List[Array[Byte]]], anyString).answers{(params, mock) => params match { + case Array(firstArg, secondArg) => firstArg.asInstanceOf[List[Array[Byte]]] + } + } + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) + CollectorGet("/i") ~> collectorService.collectorRoute ~> check { + response.status.mustEqual(StatusCodes.Found) + there was no(sink).storeRawEvents(any, any) + val locationHeader: String = header("Location").get.value + locationHeader must contain(collectorConfig.thirdPartyCookiesParameter) + } + } + + "set the fallback cookie value after calling it with n3pc parameter without cookies" in { + val sink = smartMock[AbstractSink] + sink.storeRawEvents(any[List[Array[Byte]]], anyString).answers{(params, mock) => params match { + case Array(firstArg, secondArg) => firstArg.asInstanceOf[List[Array[Byte]]] + } + } + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) + CollectorGet(s"/i?${collectorConfig.thirdPartyCookiesParameter}") ~> collectorService.collectorRoute ~> check { + response.status.mustEqual(StatusCodes.OK) + val httpCookies: List[HttpCookie] = headers.collect { + case `Set-Cookie`(hc) => hc + } + there was two(sink).storeRawEvents(any, any) + httpCookies must not be empty + val httpCookie = httpCookies.head + httpCookie.content mustEqual collectorConfig.fallbackNetworkUserId + } + } + + "return the correct cookie even after calling it with n3pc parameter" in { + val sink = smartMock[AbstractSink] + sink.storeRawEvents(any[List[Array[Byte]]], anyString).answers{(params, mock) => params match { + case Array(firstArg, secondArg) => firstArg.asInstanceOf[List[Array[Byte]]] + } + } + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) + + CollectorGet(s"/i?${collectorConfig.thirdPartyCookiesParameter}", Some(HttpCookie(collectorConfig.cookieName.get, "UUID_Test"))) ~> + collectorService.collectorRoute ~> check { + val httpCookies: List[HttpCookie] = headers.collect { + case `Set-Cookie`(hc) => hc + } + there was two(sink).storeRawEvents(any, any) + val httpCookie = httpCookies.head + httpCookie.content must beEqualTo("UUID_Test") + } + } + + "return an invisible pixel" in { + val sink = new TestSink + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) + CollectorGet(s"/i?${collectorConfig.thirdPartyCookiesParameter}") ~> collectorService.collectorRoute ~> check { responseAs[Array[Byte]] === ResponseHandler.pixel } } + "return a cookie expiring at the correct time" in { + val sink = new TestSink + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) CollectorGet("/i") ~> collectorService.collectorRoute ~> check { headers must not be empty @@ -144,6 +283,10 @@ collector { } } "return the same cookie as passed in" in { + val sink = new TestSink + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) CollectorGet("/i", Some(HttpCookie(collectorConfig.cookieName.get, "UUID_Test"))) ~> collectorService.collectorRoute ~> check { val httpCookies: List[HttpCookie] = headers.collect { @@ -158,6 +301,10 @@ collector { } } "return a P3P header" in { + val sink = new TestSink + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) CollectorGet("/i") ~> collectorService.collectorRoute ~> check { val p3pHeaders = headers.filter { h => h.name.equals("P3P") @@ -171,10 +318,27 @@ collector { "policyref=\"%s\", CP=\"%s\"".format(policyRef, CP)) } } - "store the expected event as a serialized Thrift object in the enabled sink" in { + + "do not store the expected event if there's no cookie" in { + val sink = new TestSink + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val payloadData = "param1=val1¶m2=val2" val storedRecordBytes = responseHandler.cookie(payloadData, null, None, - None, "localhost", RemoteAddress("127.0.0.1"), new HttpRequest(), None, "/i", true)._2 + None, "localhost", RemoteAddress("127.0.0.1"), new HttpRequest(), None, "/i", pixelExpected = true)._2 + storedRecordBytes must beEmpty + } + + "store the expected event as a serialized Thrift object in the enabled sink only if cookie is present" in { + val payloadData = "param1=val1¶m2=val2" + val sink = new TestSink + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val thriftDeserializer = new TDeserializer + val cookie = HttpCookie("SomeName", "SOME_UUID") + val storedRecordBytes = responseHandler.cookie(payloadData, null, Some(cookie), + None, "localhost", RemoteAddress("127.0.0.1"), new HttpRequest(), None, s"/i", pixelExpected = true)._2 val storedEvent = new CollectorPayload this.synchronized { @@ -188,10 +352,111 @@ collector { storedEvent.path must beEqualTo("/i") storedEvent.querystring must beEqualTo(payloadData) } + "report itself as healthy" in { + val sink = smartMock[AbstractSink] + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) CollectorGet("/health") ~> collectorService.collectorRoute ~> check { response.status must beEqualTo(spray.http.StatusCodes.OK) } } } + + "Snowplow's Scala collector with n3pc redirect disabled" should { + val collectorConfig = new CollectorConfig(testConf(n3pcEnabled = false)) + + // By default, spray will always add Remote-Address to every request + // when running with the `spray.can.server.remote-address-header` + // option. However, the testing does not read this option and a + // remote address always needs to be set. + def CollectorGet(uri: String, cookie: Option[`HttpCookie`] = None, + remoteAddr: String = "127.0.0.1") = { + val headers: MutableList[HttpHeader] = + MutableList(`Remote-Address`(remoteAddr),`Raw-Request-URI`(uri)) + cookie.foreach(headers += `Cookie`(_)) + Get(uri).withHeaders(headers.toList) + } + val sink = new TestSink + val sinks = CollectorSinks(sink, sink) + val responseHandler = new ResponseHandler(collectorConfig, sinks) + val collectorService = new CollectorService(collectorConfig, responseHandler, system) + val thriftDeserializer = new TDeserializer + "return an invisible pixel" in { + CollectorGet("/i") ~> collectorService.collectorRoute ~> check { + response.status.mustEqual(StatusCodes.OK) + responseAs[Array[Byte]] === ResponseHandler.pixel + } + } + "return a cookie expiring at the correct time" in { + CollectorGet("/i") ~> collectorService.collectorRoute ~> check { + headers must not be empty + + val httpCookies: List[HttpCookie] = headers.collect { + case `Set-Cookie`(hc) => hc + } + httpCookies must not be empty + + // Assume we only return a single cookie. + // If the collector is modified to return multiple cookies, + // this will need to be changed. + val httpCookie = httpCookies(0) + + httpCookie.name must beEqualTo(collectorConfig.cookieName.get) + httpCookie.domain must beSome + httpCookie.domain.get must be(collectorConfig.cookieDomain.get) + httpCookie.expires must beSome + val expiration = httpCookie.expires.get + val offset = expiration.clicks - collectorConfig.cookieExpiration.get - + DateTime.now.clicks + offset.asInstanceOf[Int] must beCloseTo(0, 3600000) // 1 hour window. + } + } + "return the same cookie as passed in" in { + CollectorGet("/i", Some(HttpCookie(collectorConfig.cookieName.get, "UUID_Test"))) ~> + collectorService.collectorRoute ~> check { + val httpCookies: List[HttpCookie] = headers.collect { + case `Set-Cookie`(hc) => hc + } + // Assume we only return a single cookie. + // If the collector is modified to return multiple cookies, + // this will need to be changed. + val httpCookie = httpCookies(0) + + httpCookie.content must beEqualTo("UUID_Test") + } + } + "return a P3P header" in { + CollectorGet("/i") ~> collectorService.collectorRoute ~> check { + val p3pHeaders = headers.filter { + h => h.name.equals("P3P") + } + p3pHeaders.size must beEqualTo(1) + val p3pHeader = p3pHeaders(0) + + val policyRef = collectorConfig.p3pPolicyRef + val CP = collectorConfig.p3pCP + p3pHeader.value must beEqualTo( + "policyref=\"%s\", CP=\"%s\"".format(policyRef, CP)) + } + } + "store the expected event as a serialized Thrift object in the enabled sink" in { + val payloadData = "param1=val1¶m2=val2" + val storedRecordBytes = responseHandler.cookie(payloadData, null, None, + None, "localhost", RemoteAddress("127.0.0.1"), new HttpRequest(), None, "/i", true)._2 + + val storedEvent = new CollectorPayload + this.synchronized { + thriftDeserializer.deserialize(storedEvent, storedRecordBytes.head) + } + + storedEvent.timestamp must beCloseTo(DateTime.now.clicks, 60000) + storedEvent.encoding must beEqualTo("UTF-8") + storedEvent.ipAddress must beEqualTo("127.0.0.1") + storedEvent.collector must beEqualTo("ssc-0.7.0-test") + storedEvent.path must beEqualTo("/i") + storedEvent.querystring must beEqualTo(payloadData) + } + } }