Skip to content

Commit

Permalink
PubSub: check size of plain bad rows before sinking (close #510)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy authored and istreeter committed Oct 27, 2021
1 parent fcdb26b commit 7b3e481
Showing 1 changed file with 22 additions and 1 deletion.
Expand Up @@ -144,7 +144,7 @@ object Enrich {
}

def sinkBad[F[_]: Monad](env: Environment[F], bad: BadRow): F[Unit] =
env.metrics.badCount >> env.bad(bad.compact.getBytes(UTF_8))
env.metrics.badCount >> env.bad(badRowResize(bad))

def sinkGood[F[_]: Concurrent: Parallel](env: Environment[F], enriched: EnrichedEvent): F[Unit] =
serializeEnriched(enriched) match {
Expand Down Expand Up @@ -181,6 +181,27 @@ object Enrich {
def sinkResult[F[_]: Concurrent: Parallel](env: Environment[F])(result: Validated[BadRow, EnrichedEvent]): F[Unit] =
result.fold(sinkBad(env, _), sinkGood(env, _))

/**
* Check if plain bad row (such as `enrichment_failure`) exceeds the `MaxRecordSize`
* If it does - turn into size violation with trimmed
*/
def badRowResize(badRow: BadRow): Array[Byte] = {
val asStr = badRow.compact
val originalBytes = asStr.getBytes(UTF_8)
val size = originalBytes.size
if (size > MaxRecordSize) {
val msg = s"event failed enrichment, but resulting bad row exceeds allowed size $MaxRecordSize"
BadRow
.SizeViolation(
Enrich.processor,
Failure.SizeViolation(Instant.now(), MaxRecordSize, size, msg),
BadRowPayload.RawPayload(asStr.take(MaxErrorMessageSize))
)
.compact
.getBytes(UTF_8)
} else originalBytes
}

/**
* The maximum size of a serialized payload that can be written to pubsub.
*
Expand Down

0 comments on commit 7b3e481

Please sign in to comment.