Skip to content

Commit

Permalink
Merge branch 'release/r67'
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderdean committed Jul 13, 2015
2 parents 1287e6f + 519d04f commit 9ee272a
Show file tree
Hide file tree
Showing 94 changed files with 2,593 additions and 1,790 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object BuildSettings {
// Basic settings for our app
lazy val basicSettings = Seq[Setting[_]](
organization := "com.snowplowanalytics",
version := "0.4.0",
version := "0.5.0",
description := "Scala Stream Collector for Snowplow raw events",
scalaVersion := "2.10.1",
scalacOptions := Seq("-deprecation", "-encoding", "utf8",
Expand Down
10 changes: 7 additions & 3 deletions 2-collectors/scala-stream-collector/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ object Dependencies {
// Java
val mimepull = "1.9.4"
val awsSdk = "1.6.10"

val yodaTime = "2.1"
val yodaConvert = "1.2"
// Scala
val snowplowRawEvent = "0.1.0"
val collectorPayload = "0.0.0"
Expand All @@ -41,7 +42,7 @@ object Dependencies {
val commonsCodec = "1.5"
val scalazon = "0.11"
val argot = "1.0.1"

val json4s = "3.2.11"
// Scala (test only)
// Using the newest version of spec (2.3.6) causes
// conflicts with `spray` for `com.chuusai.shapeless`
Expand All @@ -51,7 +52,9 @@ object Dependencies {
object Libraries {
// Java
val mimepull = "org.jvnet.mimepull" % "mimepull" % V.mimepull
val awsSdk = "com.amazonaws" % "aws-java-sdk" % V.awsSdk
val awsSdk = "com.amazonaws" % "aws-java-sdk" % V.awsSdk
val yodaTime = "joda-time" % "joda-time" % V.yodaTime
val yodaConvert = "org.joda" % "joda-convert" % V.yodaConvert

// Scala
val snowplowRawEvent = "com.snowplowanalytics" % "snowplow-thrift-raw-event" % V.snowplowRawEvent
Expand All @@ -64,6 +67,7 @@ object Dependencies {
val logback = "ch.qos.logback" % "logback-classic" % V.logback
val commonsCodec = "commons-codec" % "commons-codec" % V.commonsCodec
val scalazon = "io.github.cloudify" %% "scalazon" % V.scalazon
val json4sJackson = "org.json4s" %% "json4s-jackson" % V.json4s

// Scala (test only)
val specs2 = "org.specs2" %% "specs2" % V.specs2 % "test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ object ScalaCollectorBuild extends Build {
libraryDependencies ++= Seq(
Libraries.akkaActor,
Libraries.akkaSlf4j,
Libraries.yodaTime,
Libraries.yodaConvert,
Libraries.logback,
Libraries.mimepull,
Libraries.sprayCan,
Expand All @@ -44,7 +46,8 @@ object ScalaCollectorBuild extends Build {
Libraries.awsSdk,
Libraries.argot,
Libraries.snowplowRawEvent,
Libraries.collectorPayload
Libraries.collectorPayload,
Libraries.json4sJackson
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ collector {
# Data will be stored in the following stream.
stream {
region: "{{collectorSinkKinesisStreamRegion}}"
name: "{{collectorSinkKinesisStreamName}}"
good: "{{collectorKinesisStreamGoodName}}"
bad: "{{collectorKinesisStreamBadName}}"
}

# Minimum and maximum backoff periods
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow
package collectors

// Snowplow
import scalastream.sinks._

package object scalastream {

/**
* Whether the sink is for good rows or bad rows
*/
object InputType extends Enumeration {
type InputType = Value
val Good, Bad = Value
}

/**
* Case class for holding both good and
* bad sinks for the Stream Collector.
*
* @param good
* @param bad
*/
case class CollectorSinks(good: AbstractSink, bad: AbstractSink)

/**
* Case class for holding the results of
* splitAndSerializePayload.
*
* @param good All good results
* @param bad All bad results
*/
case class EventSerializeResult(good: List[Array[Byte]], bad: List[Array[Byte]])

/**
* Class for the result of splitting a too-large array of events in the body of a POST request
*
* @param goodBatches List of batches of events
* @param failedBigEvents List of events that were too large
*/
case class SplitBatchResult(goodBatches: List[List[String]], failedBigEvents: List[String])
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors
package com.snowplowanalytics.snowplow
package collectors
package scalastream

// Akka
Expand All @@ -33,12 +34,12 @@ import sinks._

// Actor accepting Http requests for the Scala collector.
class CollectorServiceActor(collectorConfig: CollectorConfig,
sink: AbstractSink) extends Actor with HttpService {
sinks: CollectorSinks) extends Actor with HttpService {
implicit val timeout: Timeout = 1.second // For the actor 'asks'
def actorRefFactory = context

// Deletage responses (content and storing) to the ResponseHandler.
private val responseHandler = new ResponseHandler(collectorConfig, sink)
private val responseHandler = new ResponseHandler(collectorConfig, sinks)

// Use CollectorService so the same route can be accessed differently
// in the testing framework.
Expand Down Expand Up @@ -85,7 +86,7 @@ class CollectorService(
reqCookie,
userAgent,
host,
ip.toString,
ip,
request,
refererURI,
"/" + path1 + "/" + path2,
Expand Down Expand Up @@ -121,7 +122,7 @@ class CollectorService(
reqCookie,
userAgent,
host,
ip.toString,
ip,
request,
refererURI,
"/" + path,
Expand Down Expand Up @@ -161,7 +162,7 @@ class CollectorService(
reqCookie,
userAgent,
host,
ip.toString,
ip,
request,
refererURI,
"/" + path1 + "/" + path2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import spray.http.{
AllOrigins,
ContentType,
MediaTypes,
HttpCharsets
HttpCharsets,
RemoteAddress
}
import spray.http.HttpHeaders.{
`Set-Cookie`,
Expand Down Expand Up @@ -62,6 +63,7 @@ import scala.collection.JavaConversions._
import generated._
import CollectorPayload.thrift.model1.CollectorPayload
import sinks._
import utils.SplitBatch

// Contains an invisible pixel to return for `/i` requests.
object ResponseHandler {
Expand All @@ -71,7 +73,7 @@ object ResponseHandler {
}

// Receive requests and store data into an output sink.
class ResponseHandler(config: CollectorConfig, sink: AbstractSink)(implicit context: ActorRefFactory) {
class ResponseHandler(config: CollectorConfig, sinks: CollectorSinks)(implicit context: ActorRefFactory) {

import context.dispatcher

Expand All @@ -80,14 +82,20 @@ class ResponseHandler(config: CollectorConfig, sink: AbstractSink)(implicit cont
// When `/i` is requested, this is called and stores an event in the
// Kinisis sink and returns an invisible pixel with a cookie.
def cookie(queryParams: String, body: String, requestCookie: Option[HttpCookie],
userAgent: Option[String], hostname: String, ip: String,
userAgent: Option[String], hostname: String, ip: RemoteAddress,
request: HttpRequest, refererUri: Option[String], path: String, pixelExpected: Boolean):
(HttpResponse, Array[Byte]) = {
(HttpResponse, List[Array[Byte]]) = {

if (KinesisSink.shuttingDown) {
(notFound, null)
} else {

// Make a Tuple2 with the ip address and the shard partition key
val ipKey = ip.toOption.map(_.getHostAddress) match {
case None => ("unknown", UUID.randomUUID.toString)
case Some(ip) => (ip, ip)
}

// Use the same UUID if the request cookie contains `sp`.
val networkUserId: String = requestCookie match {
case Some(rc) => rc.content
Expand All @@ -99,7 +107,7 @@ class ResponseHandler(config: CollectorConfig, sink: AbstractSink)(implicit cont

val event = new CollectorPayload(
"iglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0",
ip,
ipKey._1,
timestamp,
"UTF-8",
Collector
Expand All @@ -125,8 +133,15 @@ class ResponseHandler(config: CollectorConfig, sink: AbstractSink)(implicit cont
ct => event.contentType = ct.value.toLowerCase
}

// Only the test sink responds with the serialized object.
val sinkResponse = sink.storeRawEvent(event, ip)
// Split events into Good and Bad
val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes)

// Send events to respective sinks
val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, ipKey._2)
val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, ipKey._2)

// Sink Responses for Test Sink
val sinkResponse = sinkResponseGood ++ sinkResponseBad

val policyRef = config.p3pPolicyRef
val CP = config.p3pCP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream
package com.snowplowanalytics.snowplow
package collectors
package scalastream

// Akka and Spray
import akka.actor.{ActorSystem, Props}
Expand All @@ -21,6 +23,7 @@ import spray.can.Http

// Java
import java.io.File
import java.nio.ByteBuffer

// Argot
import org.clapper.argot._
Expand All @@ -44,7 +47,7 @@ object ScalaCollector extends App {
val parser = new ArgotParser(
programName = generated.Settings.name,
compactUsage = true,
preUsage = Some("%s: Version %s. Copyright (c) 2013, %s.".format(
preUsage = Some("%s: Version %s. Copyright (c) 2015, %s.".format(
generated.Settings.name,
generated.Settings.version,
generated.Settings.organization)
Expand All @@ -62,25 +65,37 @@ object ScalaCollector extends App {
ConfigFactory.empty()
}
}

parser.parse(args)

val rawConf = config.value.getOrElse(throw new RuntimeException("--config option must be provided"))
implicit val system = ActorSystem.create("scala-stream-collector", rawConf)
val collectorConfig = new CollectorConfig(rawConf)
val sink = collectorConfig.sinkEnabled match {
case Sink.Kinesis => KinesisSink.createAndInitialize(collectorConfig)
case Sink.Stdout => new StdoutSink

implicit val system = ActorSystem.create("scala-stream-collector", rawConf)

val sinks = collectorConfig.sinkEnabled match {
case Sink.Kinesis => {
val good = KinesisSink.createAndInitialize(collectorConfig, InputType.Good)
val bad = KinesisSink.createAndInitialize(collectorConfig, InputType.Bad)
CollectorSinks(good, bad)
}
case Sink.Stdout => {
val good = new StdoutSink(InputType.Good)
val bad = new StdoutSink(InputType.Bad)
CollectorSinks(good, bad)
}
}

// The handler actor replies to incoming HttpRequests.
val handler = system.actorOf(
Props(classOf[CollectorServiceActor], collectorConfig, sink),
Props(classOf[CollectorServiceActor], collectorConfig, sinks),
name = "handler"
)

IO(Http) ! Http.Bind(handler,
interface=collectorConfig.interface, port=collectorConfig.port)
}

// Return Options from the configuration.
object Helper {
implicit class RichConfig(val underlying: Config) extends AnyVal {
Expand Down Expand Up @@ -120,6 +135,7 @@ class CollectorConfig(config: Config) {
val cookieDomain = cookie.getOptionalString("domain")

private val sink = collector.getConfig("sink")

// TODO: either change this to ADTs or switch to withName generation
val sinkEnabled = sink.getString("enabled") match {
case "kinesis" => Sink.Kinesis
Expand All @@ -133,7 +149,8 @@ class CollectorConfig(config: Config) {
val awsAccessKey = aws.getString("access-key")
val awsSecretKey = aws.getString("secret-key")
private val stream = kinesis.getConfig("stream")
val streamName = stream.getString("name")
val streamGoodName = stream.getString("good")
val streamBadName = stream.getString("bad")
private val streamRegion = stream.getString("region")
val streamEndpoint = s"https://kinesis.${streamRegion}.amazonaws.com"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* Copyright (c) 2013-2014 Snowplow Analytics Ltd.
* All rights reserved.
*
Expand All @@ -24,23 +24,22 @@ package sinks
// Java
import java.nio.ByteBuffer

// Apache Commons
import org.apache.commons.codec.binary.Base64

// Thrift
import org.apache.thrift.TSerializer

// Config
import com.typesafe.config.Config
// Logging
import org.slf4j.LoggerFactory

// Snowplow
import scalastream._
import CollectorPayload.thrift.model1.CollectorPayload

class StdoutSink extends AbstractSink {
// Print a Base64-encoded event.
def storeRawEvent(event: CollectorPayload, key: String) = {
println(Base64.encodeBase64String(serializeEvent(event)))
null
}
// Define an interface for all sinks to use to store events.
trait AbstractSink {

// Maximum number of bytes that a single record can contain
val MaxBytes: Long

lazy val log = LoggerFactory.getLogger(getClass())

def storeRawEvents(events: List[Array[Byte]], key: String): List[Array[Byte]]
}

0 comments on commit 9ee272a

Please sign in to comment.