Skip to content

Commit

Permalink
Use elephant bird API to write thrift serialized blocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
Phil Kallos committed Mar 19, 2014
1 parent 4af7870 commit 4ef855c
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 30 deletions.
2 changes: 2 additions & 0 deletions 4-storage/kinesis-s3-sink/build.sbt
Expand Up @@ -28,6 +28,8 @@ libraryDependencies += "org.apache.hadoop" % "hadoop-core"

libraryDependencies += "com.hadoop.gplcompression" % "hadoop-lzo" % "0.4.19"

libraryDependencies += "com.twitter.elephantbird" % "elephant-bird-core" % "3.0.6"

assemblySettings

jarName in assembly := "kinesis-s3-sink"
Expand Down
Expand Up @@ -3,11 +3,25 @@ package com.snowplowanalytics.snowplow.sinks
import scala.collection.JavaConverters._

// Java libs
import java.io.{DataOutputStream,ByteArrayInputStream,ByteArrayOutputStream,IOException}
import java.io.{
OutputStream,
DataOutputStream,
ByteArrayInputStream,
ByteArrayOutputStream,
IOException
}

// Java lzo
import org.apache.hadoop.conf.Configuration
import com.hadoop.compression.lzo.LzopCodec;
import com.hadoop.compression.lzo.LzopCodec

// Elephant bird
import com.twitter.elephantbird.mapreduce.io.{
ThriftBlockWriter
}

// Snowplow
import com.snowplowanalytics.snowplow.collectors.thrift.SnowplowRawEvent

// Logging
import org.apache.commons.logging.{Log,LogFactory}
Expand All @@ -29,7 +43,7 @@ import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
*
* Once the buffer is full, the emit function is called.
*/
class S3Emitter(config: KinesisConnectorConfiguration) extends IEmitter[ Array[Byte] ] {
class S3Emitter(config: KinesisConnectorConfiguration) extends IEmitter[ SnowplowRawEvent ] {
val bucket = config.S3_BUCKET
val log = LogFactory.getLog(classOf[S3Emitter])
val client = new AmazonS3Client(config.AWS_CREDENTIALS_PROVIDER)
Expand All @@ -55,17 +69,22 @@ class S3Emitter(config: KinesisConnectorConfiguration) extends IEmitter[ Array[B
* failed to be written out to S3, under the assumption that
* the operation will be retried at some point later.
*/
override def emit(buffer: UnmodifiableBuffer[ Array[Byte] ]): java.util.List[ Array[Byte] ] = {
override def emit(buffer: UnmodifiableBuffer[ SnowplowRawEvent ]): java.util.List[ SnowplowRawEvent ] = {
val records = buffer.getRecords().asScala

val indexOutputStream = new ByteArrayOutputStream()
val outputStream = new ByteArrayOutputStream(config.BUFFER_BYTE_SIZE_LIMIT.toInt)

// This writes to the underlying outputstream and indexoutput stream
val lzoOutputStream = lzoCodec.createIndexedOutputStream(outputStream, new DataOutputStream(indexOutputStream))

// This writes to the underlying lzo stream
val thriftBlockWriter = new ThriftBlockWriter[SnowplowRawEvent](lzoOutputStream, classOf[SnowplowRawEvent], config.BUFFER_BYTE_SIZE_LIMIT.toInt)

// Popular the output stream with records
records.foreach({ record =>
try {
lzoOutputStream.write(record)
thriftBlockWriter.write(record)
} catch {
case e: IOException => {
log.error(e)
Expand All @@ -74,6 +93,8 @@ class S3Emitter(config: KinesisConnectorConfiguration) extends IEmitter[ Array[B
}
})

thriftBlockWriter.close

val filename = getFileName(buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber)
val indexFilename = filename + ".index"
val obj = new ByteArrayInputStream(outputStream.toByteArray)
Expand All @@ -90,7 +111,7 @@ class S3Emitter(config: KinesisConnectorConfiguration) extends IEmitter[ Array[B
log.info("Successfully emitted " + buffer.getRecords.size + " records to S3 in s3://" + bucket + "/" + filename + " with index " + indexFilename)

// Success means we return an empty list i.e. there are no failed items to retry
java.util.Collections.emptyList().asInstanceOf[ java.util.List[ Array[Byte] ] ]
java.util.Collections.emptyList().asInstanceOf[ java.util.List[ SnowplowRawEvent ] ]
} catch {
case e: AmazonServiceException => {
log.error(e)
Expand All @@ -105,10 +126,10 @@ class S3Emitter(config: KinesisConnectorConfiguration) extends IEmitter[ Array[B
client.shutdown
}

override def fail(records: java.util.List[ Array[Byte] ]) {
records.asScala.foreach({ record =>
log.error("Record failed: " + new String(record))
})
override def fail(records: java.util.List[ SnowplowRawEvent ]) {
records.asScala.foreach { record =>
log.error("Record failed: " + record)
}
}

}
Expand Down
Expand Up @@ -17,23 +17,15 @@ import com.snowplowanalytics.snowplow.collectors.thrift.SnowplowRawEvent
/**
* S3Pipeline class sets up the Emitter/Buffer/Transformer/Filter
*/
class S3Pipeline extends IKinesisConnectorPipeline[ SnowplowRawEvent, Array[Byte] ] {
class S3Pipeline extends IKinesisConnectorPipeline[ SnowplowRawEvent, SnowplowRawEvent ] {

override def getEmitter(configuration: KinesisConnectorConfiguration): IEmitter[ Array[Byte] ] = {
new S3Emitter(configuration)
}
override def getEmitter(configuration: KinesisConnectorConfiguration) = new S3Emitter(configuration)

override def getBuffer(configuration: KinesisConnectorConfiguration): IBuffer[SnowplowRawEvent] = {
new BasicMemoryBuffer[SnowplowRawEvent](configuration)
}
override def getBuffer(configuration: KinesisConnectorConfiguration) = new BasicMemoryBuffer[SnowplowRawEvent](configuration)

override def getTransformer(configuration: KinesisConnectorConfiguration): ITransformer[ SnowplowRawEvent, Array[Byte] ] = {
new SnowplowRawEventTransformer()
}
override def getTransformer(c: KinesisConnectorConfiguration) = new SnowplowRawEventTransformer()

override def getFilter(configuration: KinesisConnectorConfiguration): IFilter[ SnowplowRawEvent ] = {
new AllPassFilter[SnowplowRawEvent]()
}
override def getFilter(c: KinesisConnectorConfiguration) = new AllPassFilter[SnowplowRawEvent]()

}

Expand Up @@ -13,11 +13,11 @@ import com.amazonaws.services.kinesis.connectors.{
/**
* Boilerplate class for Kinessis Conenector
*/
class S3SinkExecutor(config: KinesisConnectorConfiguration) extends KinesisConnectorExecutorBase[ SnowplowRawEvent, Array[Byte] ] {
class S3SinkExecutor(config: KinesisConnectorConfiguration) extends KinesisConnectorExecutorBase[ SnowplowRawEvent, SnowplowRawEvent ] {
super.initialize(config)

override def getKinesisConnectorRecordProcessorFactory: KinesisConnectorRecordProcessorFactory[ SnowplowRawEvent, Array[Byte] ] = {
new KinesisConnectorRecordProcessorFactory[ SnowplowRawEvent, Array[Byte] ](new S3Pipeline(), config)
override def getKinesisConnectorRecordProcessorFactory = {
new KinesisConnectorRecordProcessorFactory[ SnowplowRawEvent, SnowplowRawEvent ](new S3Pipeline(), config)
}

}
Expand Up @@ -15,7 +15,7 @@ import com.snowplowanalytics.snowplow.collectors.thrift.SnowplowRawEvent
/**
* Thrift serializer/deserializer class
*/
class SnowplowRawEventTransformer extends ITransformer[ SnowplowRawEvent, Array[Byte] ] {
class SnowplowRawEventTransformer extends ITransformer[ SnowplowRawEvent, SnowplowRawEvent ] {
lazy val serializer = new TSerializer()
lazy val deserializer = new TDeserializer()

Expand All @@ -25,7 +25,5 @@ class SnowplowRawEventTransformer extends ITransformer[ SnowplowRawEvent, Array[
obj
}

override def fromClass(record: SnowplowRawEvent): Array[Byte] = {
serializer.serialize(record)
}
override def fromClass(record: SnowplowRawEvent) = record
}

0 comments on commit 4ef855c

Please sign in to comment.