Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cookie bounce #2697 #2755

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions 2-collectors/scala-stream-collector/examples/config.hocon.sample
Expand Up @@ -28,6 +28,19 @@ collector {
# records stored in the current stream.
production = true

# Defines whether the "No-third party cookie check" should be perfomed.
# If this parameter is not specified, it defaults to false.
# This will create an auto-redirect to itself with `third-party-cookie-param`
# if set to true.
third-party-redirect-enabled = false

# The name of the request parameter which will be used on redirect if 'third-party-redirect-enabled'
# is set to true.
third-party-cookie-param = "n3pc"

#The network id which will be set in case the 3rd party cookie was not set.
fallback-network-id = "00000000-0000-4000-A000-000000000000"

# Configure the P3P policy header.
p3p {
policyref = "/w3c/p3p.xml"
Expand Down
Expand Up @@ -50,6 +50,7 @@ object Dependencies {
// Using the newest version of spec (2.3.6) causes
// conflicts with `spray` for `com.chuusai.shapeless`
val specs2 = "2.2.3"
val mockito = "1.9.5"
}

object Libraries {
Expand Down Expand Up @@ -77,6 +78,7 @@ object Dependencies {
val json4sJackson = "org.json4s" %% "json4s-jackson" % V.json4s

// Scala (test only)
val mockito = "org.mockito" % "mockito-all" % V.mockito % "test"
val specs2 = "org.specs2" %% "specs2" % V.specs2 % "test"
val sprayTestkit = "io.spray" %% "spray-testkit" % V.spray % "test"
}
Expand Down
Expand Up @@ -50,7 +50,8 @@ object ScalaCollectorBuild extends Build {
Libraries.scalaz7,
Libraries.snowplowRawEvent,
Libraries.collectorPayload,
Libraries.json4sJackson
Libraries.json4sJackson,
Libraries.mockito
)
)
}
Expand Up @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets.UTF_8
import java.util.UUID
import java.net.URI
import org.apache.http.client.utils.URLEncodedUtils
import spray.http.{HttpHeader, HttpResponse, StatusCodes}

// Apache Commons
import org.apache.commons.codec.binary.Base64
Expand Down Expand Up @@ -96,122 +97,141 @@ class ResponseHandler(config: CollectorConfig, sinks: CollectorSinks)(implicit c
// When `/i` is requested, this is called and stores an event in the
// Kinisis sink and returns an invisible pixel with a cookie.
def cookie(queryParams: String, body: String, requestCookie: Option[HttpCookie],
userAgent: Option[String], hostname: String, ip: RemoteAddress,
request: HttpRequest, refererUri: Option[String], path: String, pixelExpected: Boolean):
(HttpResponse, List[Array[Byte]]) = {

// Make a Tuple2 with the ip address and the shard partition key
val (ipAddress, partitionKey) = ip.toOption.map(_.getHostAddress) match {
case None => ("unknown", UUID.randomUUID.toString)
case Some(ip) => (ip, if (config.useIpAddressAsPartitionKey) ip else UUID.randomUUID.toString)
}

// Use the same UUID if the request cookie contains `sp`.
val networkUserId: String = requestCookie match {
case Some(rc) => rc.content
case None => UUID.randomUUID.toString
}
userAgent: Option[String], hostname: String, ip: RemoteAddress,
request: HttpRequest, refererUri: Option[String], path: String, pixelExpected: Boolean):
(HttpResponse, List[Array[Byte]]) = {
val thirdPartyCookieParamPresent = Option(queryParams).exists(_.contains(config.thirdPartyCookiesParameter))
val shouldRedirect = config.n3pcRedirectEnabled && requestCookie.isEmpty && !thirdPartyCookieParamPresent && pixelExpected

// Make a Tuple2 with the ip address and the shard partition key
val (ipAddress, partitionKey) = ip.toOption.map(_.getHostAddress) match {
case None => ("unknown", UUID.randomUUID.toString)
case Some(someIp) => (someIp, if (config.useIpAddressAsPartitionKey) someIp else UUID.randomUUID.toString)
}

// Construct an event object from the request.
val timestamp: Long = System.currentTimeMillis

val event = new CollectorPayload(
"iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0",
ipAddress,
timestamp,
"UTF-8",
Collector
)

event.path = path
event.querystring = queryParams
event.body = body
event.hostname = hostname
event.networkUserId = networkUserId

userAgent.foreach(event.userAgent = _)
refererUri.foreach(event.refererUri = _)
event.headers = request.headers.flatMap {
case _: `Remote-Address` | _: `Raw-Request-URI` => None
case other => Some(other.toString)
}
// Use the same UUID if the request cookie contains `sp`.
val networkUserId: String = requestCookie match {
case Some(rc) => rc.content
case None =>
if (thirdPartyCookieParamPresent) config.fallbackNetworkUserId
else UUID.randomUUID.toString
}

// Set the content type
request.headers.find(_ match {case `Content-Type`(ct) => true; case _ => false}) foreach {
// Construct an event object from the request.
val timestamp: Long = System.currentTimeMillis

val event = new CollectorPayload(
"iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0",
ipAddress,
timestamp,
"UTF-8",
Collector
)

event.path = path
event.querystring = queryParams
event.body = body
event.hostname = hostname
event.networkUserId = networkUserId

userAgent.foreach(event.userAgent = _)
refererUri.foreach(event.refererUri = _)
event.headers = request.headers.flatMap {
case _: `Remote-Address` | _: `Raw-Request-URI` => None
case other => Some(other.toString)
}

// toLowerCase called because Spray seems to convert "utf" to "UTF"
ct => event.contentType = ct.value.toLowerCase
}
// Set the content type
request.headers.find { case `Content-Type`(ct) => true; case _ => false } foreach {

// Only send to Kinesis if we aren't shutting down
val sinkResponse = if (!KinesisSink.shuttingDown) {
// toLowerCase called because Spray seems to convert "utf" to "UTF"
ct => event.contentType = ct.value.toLowerCase
}

// Split events into Good and Bad
val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes)
// Only send to Kinesis if we aren't shutting down or we're expecting a redirect
val sinkResponse = if (!shouldRedirect && !KinesisSink.shuttingDown) {
// Split events into Good and Bad
val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes)

// Send events to respective sinks
val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey)
val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)
// Send events to respective sinks
val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey)
val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey)

// Sink Responses for Test Sink
sinkResponseGood ++ sinkResponseBad
} else {
null
}

val policyRef = config.p3pPolicyRef
val CP = config.p3pCP

val headersWithoutCookie = List(
RawHeader("P3P", "policyref=\"%s\", CP=\"%s\"".format(policyRef, CP)),
getAccessControlAllowOriginHeader(request),
`Access-Control-Allow-Credentials`(true)
)

val headers = config.cookieConfig match {
case Some(cookieConfig) =>
val responseCookie = HttpCookie(
cookieConfig.name, networkUserId,
expires=Some(DateTime.now + cookieConfig.expiration),
domain=cookieConfig.domain
)
`Set-Cookie`(responseCookie) :: headersWithoutCookie
case None => headersWithoutCookie
}
// Sink Responses for Test Sink
sinkResponseGood ++ sinkResponseBad
} else {
Nil
}

val (httpResponse, badQsResponse) = if (path startsWith "/r/") {
// A click redirect
try {
// TODO: log errors to Kinesis as BadRows
val target = URLEncodedUtils.parse(URI.create("?" + queryParams), "UTF-8")
.find(_.getName == "u")
.map(_.getValue)
target match {
case Some(t) => HttpResponse(302).withHeaders(`Location`(t) :: headers) -> Nil
// case None => badRequest -> sinks.bad.storeRawEvents(List("TODO".getBytes), partitionKey)
case None => {
val everythingSerialized = new String(SplitBatch.ThriftSerializer.get().serialize(event))
badRequest -> sinks.bad.storeRawEvents(List(createBadRow(event, s"Redirect failed due to lack of u parameter")), partitionKey)
}
}
} catch {
case NonFatal(e) => {
val headers = composeHeaders(request, shouldRedirect, networkUserId, queryParams)

val (httpResponse, badQsResponse) = if (path startsWith "/r/") {
// A click redirect
try {
// TODO: log errors to Kinesis as BadRows
val target = URLEncodedUtils.parse(URI.create("?" + queryParams), "UTF-8")
.find(_.getName == "u")
.map(_.getValue)
target match {
case Some(t) => HttpResponse(302).withHeaders(`Location`(t) :: headers) -> Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// case None => badRequest -> sinks.bad.storeRawEvents(List("TODO".getBytes), partitionKey)
case None => {
val everythingSerialized = new String(SplitBatch.ThriftSerializer.get().serialize(event))
badRequest -> sinks.bad.storeRawEvents(List(createBadRow(event, s"Redirect failed due to error $e")), partitionKey)
badRequest -> sinks.bad.storeRawEvents(List(createBadRow(event, s"Redirect failed due to lack of u parameter")), partitionKey)
}
}
} else if (KinesisSink.shuttingDown) {
// So that the tracker knows the request failed and can try to resend later
notFound -> Nil
} else (if (pixelExpected) {
HttpResponse(entity = HttpEntity(`image/gif`, ResponseHandler.pixel))
} catch {
case NonFatal(e) => {
val everythingSerialized = new String(SplitBatch.ThriftSerializer.get().serialize(event))
badRequest -> sinks.bad.storeRawEvents(List(createBadRow(event, s"Redirect failed due to error $e")), partitionKey)
}
}
} else if (KinesisSink.shuttingDown) {
// So that the tracker knows the request failed and can try to resend later
notFound -> Nil
} else {
if (pixelExpected) {
if (shouldRedirect) HttpResponse(StatusCodes.Found) else HttpResponse(entity = HttpEntity(`image/gif`, ResponseHandler.pixel))
} else {
// See https://github.com/snowplow/snowplow-javascript-tracker/issues/482
HttpResponse(entity = "ok")
}).withHeaders(headers) -> Nil
}
}.withHeaders(headers) -> Nil

(httpResponse, badQsResponse ++ sinkResponse)
(httpResponse, badQsResponse ++ sinkResponse)
}

private def composeHeaders(request: HttpRequest, shouldRedirect: Boolean, networkUserId: String, queryParams: String) = {
val headersWithoutCookie = List(
RawHeader("P3P", "policyref=\"%s\", CP=\"%s\"".format(config.p3pPolicyRef, config.p3pCP)),
getAccessControlAllowOriginHeader(request),
`Access-Control-Allow-Credentials`(true)
)

val resolvedCookies = config.cookieConfig match {
case Some(cookieConfig) =>
val responseCookie = HttpCookie(
cookieConfig.name, networkUserId,
expires = Some(DateTime.now + cookieConfig.expiration),
domain = cookieConfig.domain
)
`Set-Cookie`(responseCookie) :: headersWithoutCookie
case None => headersWithoutCookie
}
lazy val protoValue = request.headers.find(_.name == "X-Forwarded-Proto").map(_.value).getOrElse("http")

val headers = if (shouldRedirect) {
val originalUri = URLEncodedUtils.parse(URI.create("?" + queryParams), "UTF-8")
.find(_.getName == "url")
.map(_.getValue)
val originalUriScheme = originalUri.map(URI.create(_).getScheme).getOrElse(protoValue)
val uri = request.uri.withQuery(queryParams)
val allParams = uri.query.toMap ++ Map(config.thirdPartyCookiesParameter -> "true")
val redirectUri = request.uri.withQuery(allParams).withScheme(originalUriScheme)
`Location`(redirectUri) :: resolvedCookies
} else resolvedCookies
headers
}

/**
* Creates a response to the CORS preflight Options request
Expand All @@ -222,7 +242,7 @@ class ResponseHandler(config: CollectorConfig, sinks: CollectorSinks)(implicit c
def preflightResponse(request: HttpRequest) = HttpResponse().withHeaders(List(
getAccessControlAllowOriginHeader(request),
`Access-Control-Allow-Credentials`(true),
`Access-Control-Allow-Headers`( "Content-Type")))
`Access-Control-Allow-Headers`("Content-Type")))

def flashCrossDomainPolicy = HttpEntity(
contentType = ContentType(MediaTypes.`text/xml`, HttpCharsets.`ISO-8859-1`),
Expand All @@ -242,10 +262,10 @@ class ResponseHandler(config: CollectorConfig, sinks: CollectorSinks)(implicit c
* @return Header
*/
private def getAccessControlAllowOriginHeader(request: HttpRequest) =
`Access-Control-Allow-Origin`(request.headers.find(_ match {
`Access-Control-Allow-Origin`(request.headers.find {
case `Origin`(origin) => true
case _ => false
}) match {
} match {
case Some(`Origin`(origin)) => SomeOrigins(origin)
case _ => AllOrigins
})
Expand Down
Expand Up @@ -125,11 +125,11 @@ object ScalaCollector extends App {
// Return Options from the configuration.
object Helper {
implicit class RichConfig(val underlying: Config) extends AnyVal {
def getOptionalString(path: String): Option[String] = try {
Some(underlying.getString(path))
} catch {
case e: ConfigException.Missing => None
}
def catchMissing = util.control.Exception.catching(classOf[ConfigException.Missing])

def getOptionalString(path: String): Option[String] = catchMissing opt underlying.getString(path)

def getOptionalBoolean(path: String): Option[Boolean] = catchMissing opt underlying.getBoolean(path)
}
}

Expand All @@ -154,6 +154,11 @@ class CollectorConfig(config: Config) {
val port = collector.getInt("port")
val production = collector.getBoolean("production")

//Third party cookie config params.
val n3pcRedirectEnabled = collector.getOptionalBoolean("third-party-redirect-enabled").getOrElse(false)
val thirdPartyCookiesParameter = collector.getOptionalString("third-party-cookie-param").getOrElse("n3pc")
val fallbackNetworkUserId = collector.getOptionalString("fallback-network-id").getOrElse("00000000-0000-4000-A000-000000000000")

private val p3p = collector.getConfig("p3p")
val p3pPolicyRef = p3p.getString("policyref")
val p3pCP = p3p.getString("CP")
Expand Down