Skip to content
This repository has been archived by the owner on Oct 19, 2021. It is now read-only.

Write records to Kinesis in a batch #32

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ object KinesisTee extends Tee {
content
.map(transform)
.filter(filter)
.grouped(100)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we have this parameter as a configuration?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have update PR and made this configurable.

.foreach(route.write)
}

Expand Down
15 changes: 12 additions & 3 deletions src/main/scala/com/snowplowanalytics/kinesistee/StreamWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import java.nio.ByteBuffer
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.{PutRecordsRequestEntry, PutRecordsRequest}
import com.snowplowanalytics.kinesistee.config.TargetAccount
import com.snowplowanalytics.kinesistee.models.{Content, Stream}
import scala.collection.JavaConverters._

/**
* Write a record to a predefined stream
Expand All @@ -31,10 +33,17 @@ class StreamWriter(stream: Stream, targetAccount: Option[TargetAccount], produce

/**
* push the given record to the requested stream
* @param content the record to push
* @param contents the sequence of records to push
*/
def write(content: Content): Unit = {
producer.putRecord(stream.name, ByteBuffer.wrap(content.row.getBytes("UTF-8")), content.partitionKey)
def write(contents: Seq[Content]): Unit = {
val requestEntries = contents.map { content =>
val data = ByteBuffer.wrap(content.row.getBytes("UTF-8"))

new PutRecordsRequestEntry().withData(data).withPartitionKey(content.partitionKey)
}

val request = new PutRecordsRequest().withStreamName(stream.name).withRecords(requestEntries.asJava)
producer.putRecords(request)
}

def flush: Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class KinesisTeeSpec extends Specification with Mockito {
val sampleContent = Seq(Content("a", "p"), Content("a", "p"), Content("a", "p"))
val route = mockRoute
KinesisTee.tee(route, None, None, sampleContent)
there was three (route.mockStreamWriter).write(eqTo(Content("a", "p")))
there was one (route.mockStreamWriter).write(eqTo(sampleContent))
}

"write to the stream writer only if the filter function returns false" in {
Expand All @@ -58,7 +58,7 @@ class KinesisTeeSpec extends Specification with Mockito {
transformationStrategy = None,
filterStrategy = Some(new FilterEverything),
content = sampleContent)
there was no (routeMock.mockStreamWriter).write(any[Content])
there was no (routeMock.mockStreamWriter).write(any[Seq[Content]])
}

"transform stream content using the given transformation strategy" in {
Expand All @@ -73,7 +73,8 @@ class KinesisTeeSpec extends Specification with Mockito {
val routeMock = mockRoute
KinesisTee.tee(routeMock, Some(new MakeEverythingB), None, sampleContent)

there was three (routeMock.mockStreamWriter).write(eqTo(Content("b", "p")))
val expectedContents = Seq(Content("b", "p"), Content("b", "p"), Content("b", "p"))
there was one (routeMock.mockStreamWriter).write(eqTo(expectedContents))
}

"run the transformation strategy prior to the filter strategy" in {
Expand All @@ -97,7 +98,8 @@ class KinesisTeeSpec extends Specification with Mockito {
val routeMock = mockRoute
KinesisTee.tee(routeMock, Some(new MakeEverythingB), Some(new FilterNotB), sampleContent)

there was three (routeMock.mockStreamWriter).write(eqTo(Content("b", "p")))
val expectedContents = Seq(Content("b", "p"), Content("b", "p"), Content("b", "p"))
there was one (routeMock.mockStreamWriter).write(eqTo(expectedContents))
}

"swallow failures in the filter strategy before pushing anything to the stream writer" in {
Expand All @@ -106,7 +108,7 @@ class KinesisTeeSpec extends Specification with Mockito {
}

val routeMock = mockRoute
there was no (routeMock.mockStreamWriter).write(any[Content])
there was no (routeMock.mockStreamWriter).write(any[Seq[Content]])
}

"throw failures in the transformation strategy before pushing anything to the stream writer" in {
Expand All @@ -116,7 +118,7 @@ class KinesisTeeSpec extends Specification with Mockito {

val routeMock = mockRoute
KinesisTee.tee(routeMock, Some(new FailureTransform), None, Seq(Content("b", "p")))
there was one (routeMock.mockStreamWriter).write(any[Content])
there was one (routeMock.mockStreamWriter).write(any[Seq[Content]])
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class ConfigurationBuilderSpec extends Specification with Mockito {
val items:util.List[java.util.Map[java.lang.String,com.amazonaws.services.dynamodbv2.model.AttributeValue]] = new util.ArrayList()

val one:util.Map[String,com.amazonaws.services.dynamodbv2.model.AttributeValue] = new util.HashMap()
one.put("id", new AttributeValue(Some("with-id")))
one.put("configuration", new AttributeValue(Some(sampleGoodConfig)))
one.put("id", new AttributeValue(Some("with-id"), l = Nil))
one.put("configuration", new AttributeValue(Some(sampleGoodConfig), l = Nil))
items.add(one)

res.getItems returns items
Expand All @@ -95,8 +95,8 @@ class ConfigurationBuilderSpec extends Specification with Mockito {
val items:util.List[java.util.Map[java.lang.String,com.amazonaws.services.dynamodbv2.model.AttributeValue]] = new util.ArrayList()

val one:util.Map[String,com.amazonaws.services.dynamodbv2.model.AttributeValue] = new util.HashMap()
one.put("id", new AttributeValue(Some("with-id")))
one.put("this-is-not-config", new AttributeValue(Some("abc")))
one.put("id", new AttributeValue(Some("with-id"), l = Nil))
one.put("this-is-not-config", new AttributeValue(Some("abc"), l = Nil))

items.add(one)
res.getItems returns items
Expand Down