Skip to content

Commit

Permalink
Add badrows serializer checking max size (close #49)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jan 3, 2024
1 parent 2aeb575 commit cc9a736
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 0 deletions.
1 change: 1 addition & 0 deletions modules/kafka/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ snowplow.defaults {
producerConf: {
"client.id": null # invalid value MUST be overriden by the application
}
maxRecordSize: 1000000
}
}
}
1 change: 1 addition & 0 deletions modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ snowplow.defaults: {
}
recordLimit: 500
byteLimit: 5242880
maxRecordSize: 1000000
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.snowplowanalytics.snowplow.loaders.transform

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload, Processor}

import java.nio.charset.StandardCharsets
import java.time.Instant

object BadRowsSerializer {

/**
* If input bad row exceeds provided max size in bytes, return serialized SizeViolation bad row
* with trimmed original payload. If not, return original serialized bad row.
*/
def withMaxSize(
badRow: BadRow,
processor: Processor,
maxSize: Int
): Array[Byte] = {
val originalSerialized = badRow.compactByteArray
val originalSizeBytes = originalSerialized.length

if (originalSizeBytes >= maxSize) {
val trimmedPayload = new String(originalSerialized, 0, maxSize / 10, StandardCharsets.UTF_8)
BadRow
.SizeViolation(
processor,
Failure.SizeViolation(Instant.now(), maxSize, originalSizeBytes, "Badrow exceeds allowed max size"),
Payload.RawPayload(trimmedPayload)
)
.compactByteArray
} else {
originalSerialized
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.snowplowanalytics.snowplow.loaders.transform

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.badrows.Payload
import com.snowplowanalytics.snowplow.badrows.Processor
import io.circe.parser.decode
import org.specs2.Specification

import java.nio.charset.StandardCharsets
import java.time.Instant
import java.util.UUID

class BadRowsSerializerSpec extends Specification {

private val processor = Processor("test-app", "0.1.0")
private val maxSize = 3000

def is = s2"""
Bad row serialized should
return original bad row if max size is not exceeded $e1
return SizeViolation bad row if max size is exceeded $e2
"""

def e1 = {
val inputBadRow = loaderError(Event.minimal(UUID.randomUUID(), Instant.now(), "0.1.0", "0.1.0"))
val output = serialize(inputBadRow)

decode[SelfDescribingData[BadRow]](output).map(_.data) must beRight(inputBadRow)
}

def e2 = {
val inputBadRow = loaderError(Event.minimal(UUID.randomUUID(), Instant.now(), "0.1.0", "0.1.0").copy(mkt_source = Some("A" * 1000)))
val output = serialize(inputBadRow)

decode[SelfDescribingData[BadRow]](output).map(_.data) must beRight.like { case sizeViolation: BadRow.SizeViolation =>
sizeViolation.failure.maximumAllowedSizeBytes must beEqualTo(maxSize) and
(sizeViolation.payload.event.size must beEqualTo(300)) // Max value divided by 10
}
}

private def serialize(badRow: BadRow): String = {
val output = BadRowsSerializer.withMaxSize(badRow, processor, maxSize)
new String(output, StandardCharsets.UTF_8)
}

private def loaderError(event: Event) =
BadRow.LoaderRuntimeError(processor, failure = "Some runtime loader error message", payload = Payload.LoaderPayload(event))
}
1 change: 1 addition & 0 deletions modules/pubsub/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ snowplow.defaults: {
pubsub: {
batchSize: 1000
requestByteThreshold: 1000000
maxRecordSize: 10000000
gcpUserAgent: {
productName: "Snowplow OSS"
}
Expand Down

0 comments on commit cc9a736

Please sign in to comment.