Skip to content

Commit

Permalink
Clean and comment Kinesis enrichment for #489.
Browse files Browse the repository at this point in the history
  • Loading branch information
Brandon Amos committed Jan 16, 2014
1 parent a358330 commit 7168feb
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 273 deletions.
Expand Up @@ -24,24 +24,67 @@ import sources._
import sinks._

// Config
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.config.{Config,ConfigFactory}

// Argot
import org.clapper.argot.ArgotParser

// Java
import java.io.File

// The enrichment process takes input SnowplowRawEvent objects from
// an input source out outputs enriched objects to a sink,
// as defined in the following enumerations.
object Source extends Enumeration {
type Source = Value
val Kinesis, Stdin, Test = Value
}

object Sink extends Enumeration {
type Sink = Value
val Kinesis, Stdouterr, Test = Value
}

// The main entry point of the Scala Kinesis Enricher.
object KinesisEnrichApp extends App {
val parser = new ArgotParser(
programName = generated.Settings.name,
compactUsage = true,
preUsage = Some("%s: Version %s. Copyright (c) 2013, %s.".format(
generated.Settings.name,
generated.Settings.version,
generated.Settings.organization)
)
)

// Optional config argument
val config = parser.option[Config](
List("config"), "filename", """
|Configuration file. Defaults to \"resources/default.conf\"
|(within .jar) if not set""".stripMargin) {
(c, opt) =>
val file = new File(c)
if (file.exists) {
ConfigFactory.parseFile(file)
} else {
parser.usage("Configuration file \"%s\" does not exist".format(c))
ConfigFactory.empty()
}
}

parser.parse(args)
val kinesisEnrichConfig = new KinesisEnrichConfig(
config.value.getOrElse(ConfigFactory.load("default"))
)

val source = kinesisEnrichConfig.source match {
case Source.Kinesis => new KinesisSource(kinesisEnrichConfig)
case Source.Stdin => new StdinSource(kinesisEnrichConfig)
}
source.run
}

// Rigidly load the configuration file here to error when
// the enrichment process starts rather than later.
class KinesisEnrichConfig(config: Config) {
private val enrich = config.resolve.getConfig("enrich")

Expand Down Expand Up @@ -88,41 +131,3 @@ class KinesisEnrichConfig(config: Config) {
val anonIpEnabled = anonIp.getBoolean("enabled")
val anonOctets = anonIp.getInt("anon_octets")
}

object KinesisEnrichApp extends App {
val parser = new ArgotParser(
programName = generated.Settings.name,
compactUsage = true,
preUsage = Some("%s: Version %s. Copyright (c) 2013, %s.".format(
generated.Settings.name,
generated.Settings.version,
generated.Settings.organization)
)
)

// Optional config argument
val config = parser.option[Config](
List("config"), "filename", """
|Configuration file. Defaults to \"resources/default.conf\"
|(within .jar) if not set""".stripMargin) {
(c, opt) =>
val file = new File(c)
if (file.exists) {
ConfigFactory.parseFile(file)
} else {
parser.usage("Configuration file \"%s\" does not exist".format(c))
ConfigFactory.empty()
}
}

parser.parse(args)
val kinesisEnrichConfig = new KinesisEnrichConfig(
config.value.getOrElse(ConfigFactory.load("default"))
)

val source = kinesisEnrichConfig.source match {
case Source.Kinesis => new KinesisSource(kinesisEnrichConfig)
case Source.Stdin => new StdinSource(kinesisEnrichConfig)
}
source.run
}
Expand Up @@ -26,6 +26,7 @@ import common.outputs.CanonicalOutput
// Amazon
import com.amazonaws.auth._

// Define an interface for all sinks to use to store events.
trait ISink {
def storeCanonicalOutput(bytes: String, key: String)
}
Expand Up @@ -18,7 +18,8 @@
*/

package com.snowplowanalytics.snowplow.enrich
package kinesis.sinks
package kinesis
package sinks

// Snowplow
import com.snowplowanalytics.snowplow.collectors.thrift._
Expand All @@ -29,9 +30,7 @@ import java.nio.ByteBuffer

// Amazon
import com.amazonaws.AmazonServiceException
import com.amazonaws.auth.{
AWSCredentialsProvider
}
import com.amazonaws.auth.AWSCredentialsProvider

// Scalazon (for Kinesis interaction)
import io.github.cloudify.scala.aws.kinesis.Client
Expand All @@ -51,38 +50,20 @@ import scala.concurrent.{Future,Await,TimeoutException}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

// Thrift.
import org.apache.thrift.TSerializer
import org.apache.thrift.TDeserializer

// Logging.
import org.slf4j.LoggerFactory

// Mutable data structures.
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.MutableList

/**
* Kinesis Sink for Scala enrichment.
*/
class KinesisSink(provider: AWSCredentialsProvider) extends ISink {
// Kinesis Sink for Scala enrichment.
class KinesisSink(provider: AWSCredentialsProvider,
config: KinesisEnrichConfig) extends ISink {
lazy val log = LoggerFactory.getLogger(getClass())
import log.{error, debug, info, trace}

// Initialize
// Create a Kinesis client for stream interactions.
private implicit val kinesis = Client.fromCredentials(provider)
private var stream: Option[Stream] = None
private val thriftSerializer = new TSerializer()
private val thriftDeserializer = new TDeserializer()

def storeCanonicalOutput(tabDelimCanonicalOutput: String, key: String) = {
storeEnrichedEvent(tabDelimCanonicalOutput.getBytes, key)
}

// Set the current stream to $name.
def loadStream(name: String) {
stream = Some(Kinesis.stream(name))
}
// The output stream for enriched events.
private val enrichedStream = createAndLoadStream()

/**
* Checks if a stream exists.
Expand All @@ -109,87 +90,45 @@ class KinesisSink(provider: AWSCredentialsProvider) extends ISink {
false
}

/**
* Deletes a stream.
*/
def deleteStream(name: String, timeout: Int = 60): Unit = {
val localStream = Kinesis.stream(name)

info(s"Deleting stream $name.")
val deleteStream = for {
s <- localStream.delete
} yield s

Await.result(deleteStream, Duration(timeout, SECONDS))
info("Successfully deleted stream.")
}

/**
* Creates a new stream if one doesn't exist.
* Arguments are optional.
*
* @param name The name of the stream to create
* @param size The number of shards to support for this stream
* @param timeout How long to keep checking if the stream became active,
* in seconds
*
* @return a Boolean, where:
* 1. true means the stream was successfully created or already exists
* 2. false means an error occurred
*/
def createAndLoadStream(name: String, size: Int,
timeout: Int = 60): Boolean = {
// Creates a new stream if one doesn't exist.
def createAndLoadStream(timeout: Int = 60): Stream = {
val name = config.enrichedOutStream
val size = config.enrichedOutStreamShards
if (streamExists(name)) {
loadStream(name)
return true
}

info(s"Creating stream $name of size $size.")
val createStream = for {
s <- Kinesis.streams.create(name)
} yield s

try {
stream = Some(Await.result(createStream, Duration(timeout, SECONDS)))
Await.result(stream.get.waitActive.retrying(timeout),
Duration(timeout, SECONDS))
} catch {
case _: TimeoutException =>
info("Error: Timed out.")
false
Kinesis.stream(name)
} else {
info(s"Creating stream $name of size $size.")
val createStream = for {
s <- Kinesis.streams.create(name)
} yield s

try {
val stream = Await.result(createStream, Duration(timeout, SECONDS))

// Wait until the stream is active.
Await.result(stream.waitActive.retrying(timeout),
Duration(timeout, SECONDS))

stream
} catch {
case _: TimeoutException =>
throw new RuntimeException("Error: Timed out.")
}
}
info("Successfully created stream.")
true
}

def storeEnrichedEvent(rawEvent: Array[Byte], key: String): PutResult = {
//info(s"Writing Thrift record to Kinesis: ${new String(rawEvent)}")
val result = writeRecord(
data = ByteBuffer.wrap(rawEvent),
key = key
)
// Store successfully validated events in tab delimited form
// to the enriched output stream.
def storeCanonicalOutput(tabDelimCanonicalOutput: String, key: String) = {
val putData = for {
p <- enrichedStream.put(
ByteBuffer.wrap(tabDelimCanonicalOutput.getBytes),
key
)
} yield p
val result = Await.result(putData, Duration(60, SECONDS))
info(s"Writing successful.")
info(s" + ShardId: ${result.shardId}")
info(s" + SequenceNumber: ${result.sequenceNumber}")
result
}

/**
* Stores an event to the Kinesis stream.
*
* @param data The data for this record
* @param key The partition key for this record
* @param timeout Time in seconds to wait to put the data.
*
* @return A PutResult containing the ShardId and SequenceNumber
* of the record written to.
*/
private def writeRecord(data: ByteBuffer, key: String,
timeout: Int = 60): PutResult = {
val putData = for {
p <- stream.get.put(data, key)
} yield p
val putResult = Await.result(putData, Duration(timeout, SECONDS))
putResult
}
}

This file was deleted.

0 comments on commit 7168feb

Please sign in to comment.