Skip to content

Commit

Permalink
Add the ability to partition TSV by app id
Browse files Browse the repository at this point in the history
  • Loading branch information
stanch committed May 24, 2024
1 parent 6eb910d commit 047c211
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class KinesisS3Emitter(client: AmazonS3,
serializer: ISerializer)
extends IEmitter[Result] {

private val partitionTsvByApp = output.s3.partitionForPurpose(purpose).exists(_.contains("{app}"))

/**
* Reads items from a buffer and saves them to s3.
*
Expand All @@ -67,9 +69,9 @@ class KinesisS3Emitter(client: AmazonS3,

val records = buffer.getRecords.asScala.toList
val partitionedBatch =
Common.partition(purpose, monitoring.isStatsDEnabled, records)
Common.partition(purpose, partitionTsvByApp, monitoring.isStatsDEnabled, records)

val getBase: Option[RowType.SelfDescribing] => String =
val getBase: Option[RowType] => String =
getBaseFilename(output.s3, purpose, buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber, LocalDateTime.now)
val afterEmit: () => Unit =
() => monitoring.report(partitionedBatch.meta)
Expand All @@ -78,7 +80,7 @@ class KinesisS3Emitter(client: AmazonS3,
case (RowType.Unpartitioned, partitionRecords) if partitionRecords.nonEmpty =>
emitRecords(partitionRecords, afterEmit, getBase(None))
.map(_.asLeft)
case (data: RowType.SelfDescribing, partitionRecords) if partitionRecords.nonEmpty =>
case (data @ (_: RowType.SelfDescribing | _: RowType.Tsv), partitionRecords) if partitionRecords.nonEmpty =>
emitRecords(partitionRecords, afterEmit, getBase(Some(data))).map(_.asLeft)
case _ =>
records // ReadingError or empty partition - should be handled later by serializer
Expand Down Expand Up @@ -238,14 +240,17 @@ object KinesisS3Emitter {
lastSeq: String,
now: LocalDateTime
)(
sdj: Option[RowType.SelfDescribing]
row: Option[RowType]
): String = {
val sdj = row.collect { case s: RowType.SelfDescribing => s }
val app = row.collect { case a: RowType.Tsv => a }
val partitionPath = s3Config.partitionForPurpose(purpose).map {
_.template("vendor", sdj.fold("unknown")(_.vendor))
.template("name", sdj.fold("unknown")(_.name))
.template("schema", sdj.fold("unknown")(_.name)) // allowed synonym
.template("format", sdj.fold("unknown")(_.format))
.template("model", sdj.fold(-1)(_.model).toString)
.template("app", app.fold("unknown")(_.appId))
.template("yy+", now.format(DateTimeFormatter.ofPattern("yyyy")))
.template("mm", now.format(DateTimeFormatter.ofPattern("MM")))
.template("dd", now.format(DateTimeFormatter.ofPattern("dd")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ package object loader {
* Final result of S3 Loader processing
*/
type Result = Either[GenericError, RawRecord]

/**
* The result of S3 Loader processing with a potentially parsed record
*/
type ParsedResult = Either[GenericError, (RawRecord, Option[Array[String]])]
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
package com.snowplowanalytics.s3.loader.processing

import java.time.Instant
import java.nio.charset.StandardCharsets.UTF_8

import com.snowplowanalytics.s3.loader.Result
import com.snowplowanalytics.s3.loader.{ParsedResult, Result}
import com.snowplowanalytics.s3.loader.processing.Batch.Meta

/** Content of a KCL buffer with metadata attached */
Expand All @@ -34,20 +32,19 @@ object Batch {

val EmptyMeta: Meta = Meta(None, 0)

def fromEnriched(inputs: List[Result]): Batch[List[Result]] = {
def fromEnriched(inputs: List[ParsedResult]): Batch[List[ParsedResult]] = {
val meta = inputs.foldLeft(EmptyMeta) {
case (Meta(tstamp, count), Left(_)) =>
Meta(tstamp, count + 1)
case (Meta(tstamp, count), Right(raw)) =>
val strRecord = new String(raw, UTF_8)
val extracted = Common.getTstamp(strRecord).toOption
case (Meta(tstamp, count), Right((_, array))) =>
val extracted = array.flatMap(Common.getTstamp(_).toOption)
val min = Common.compareTstamps(tstamp, extracted)
Meta(min, count + 1)
}

Batch(meta, inputs)
}

def from(inputs: List[Result]): Batch[List[Result]] =
def from[R](inputs: List[R]): Batch[List[R]] =
Batch(EmptyMeta, inputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@ package com.snowplowanalytics.s3.loader.processing

import java.time.Instant
import java.nio.charset.StandardCharsets.UTF_8

import cats.syntax.either._

import io.circe.parser.parse

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.s3.loader.Result
import com.snowplowanalytics.s3.loader.{ParsedResult, Result}
import com.snowplowanalytics.s3.loader.Config.Purpose
import com.snowplowanalytics.s3.loader.monitoring.StatsD.CollectorTstampIdx

Expand All @@ -40,14 +36,29 @@ object Common {
*/
def partition(
purpose: Purpose,
partitionTsvByApp: Boolean,
statsDEnabled: Boolean,
records: List[Result]
): Batch.Partitioned =
purpose match {
case Purpose.SelfDescribingJson =>
Batch.from(records).map(rs => partitionByType(rs).toList)
case Purpose.Enriched if statsDEnabled =>
Batch.fromEnriched(records).map(rs => List((RowType.Unpartitioned, rs)))
case Purpose.Enriched =>
// We need to parse the record from bytes to Array[String] to obtain time stats (for StatsD),
// as well as for partitioning by app id
val parsed = records.map(toParsedRecord(_, actuallyParse = statsDEnabled || partitionTsvByApp))
val batch =
if (statsDEnabled)
Batch.fromEnriched(parsed)
else
Batch.from(parsed)
if (partitionTsvByApp)
batch.map(rs =>
partitionByApp(rs).toList.map { case (row, records) =>
(row, records.map(fromParsedRecord))
})
else
batch.map(rs => List((RowType.Unpartitioned, rs.map(fromParsedRecord))))
case _ =>
Batch.from(records).map(rs => List((RowType.Unpartitioned, rs)))
}
Expand All @@ -70,16 +81,31 @@ object Common {
case Left(_) => RowType.ReadingError
}

def toParsedRecord(record: Result, actuallyParse: Boolean): ParsedResult =
record.map { byteArray =>
val parsed = if (actuallyParse) Some(new String(byteArray, UTF_8).split("\t", -1)) else None
(byteArray, parsed)
}

def fromParsedRecord(record: ParsedResult): Result = record.map(_._1)

def partitionByApp(records: List[ParsedResult]): Map[RowType, List[ParsedResult]] =
records.groupBy {
case Right((_, array)) =>
// if there are no tabs, avoid returning the whole string
val appId = array.flatMap(_.headOption.filter(_.size > 1))
appId.fold[RowType](RowType.Unpartitioned)(RowType.Tsv)
case Left(_) => RowType.ReadingError
}

/** Extract a timestamp from enriched TSV line */
def getTstamp(row: String): Either[RuntimeException, Instant] = {
val array = row.split("\t", -1)
def getTstamp(array: Array[String]): Either[RuntimeException, Instant] =
for {
string <- Either
.catchOnly[IndexOutOfBoundsException](array(CollectorTstampIdx))
.map(_.replaceAll(" ", "T") + "Z")
tstamp <- Either.catchOnly[DateTimeParseException](Instant.parse(string))
} yield tstamp
}

def compareTstamps(a: Option[Instant], b: Option[Instant]): Option[Instant] =
(a, b) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ object RowType {
*/
case object Unpartitioned extends RowType

/** TSV line with payload that can be partitioned */
final case class Tsv(appId: String) extends RowType

/** JSON line with self-describing payload that can be partitioned */
final case class SelfDescribing(vendor: String, name: String, format: String, model: Int) extends RowType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@ package com.snowplowanalytics.s3.loader.processing

import java.time.Instant
import java.util.UUID

import cats.data.NonEmptyList
import cats.syntax.either._

import com.snowplowanalytics.snowplow.badrows.BadRow.GenericError
import com.snowplowanalytics.snowplow.badrows.Failure.GenericFailure
import com.snowplowanalytics.snowplow.badrows.Payload.RawPayload

import com.snowplowanalytics.s3.loader.{Result, S3Loader}

import com.snowplowanalytics.s3.loader.{ParsedResult, Result, S3Loader}
import org.specs2.mutable.Specification

class BatchSpec extends Specification {
Expand All @@ -37,20 +33,22 @@ class BatchSpec extends Specification {

"fromEnriched" should {
"extract the earliest timestamp" in {
val input: List[Result] = List(
val input: List[ParsedResult] = List(
BatchSpec.getEvent("2020-11-26 00:02:05"),
BatchSpec.getEvent("2020-11-26 00:01:05"),
BatchSpec.getEvent("2020-11-26 00:03:05")
).map(_.getBytes.asRight)
).map(_.getBytes.asRight).map(Common.toParsedRecord(_, actuallyParse = true))

val expected = Batch.Meta(Some(Instant.parse("2020-11-26T00:01:05Z")), 3)

Batch.fromEnriched(input).meta must beEqualTo(expected)
}

"ignore invalid TSVs for timestamps, but preserve for count" in {
val input: List[Result] =
List("invalid event", "rubbish").map(_.getBytes.asRight)
val input: List[ParsedResult] =
List("invalid event", "rubbish")
.map(_.getBytes.asRight)
.map(Common.toParsedRecord(_, actuallyParse = true))

val expected = Batch(Batch.Meta(None, 2), input)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CommonSpec extends Specification {

"getTimestamp" should {
"parse timestamp in proper format" in {
val input = List.fill(4)("2020-11-26 00:01:05").mkString("\t")
val input = List.fill(4)("2020-11-26 00:01:05").toArray
val expected = Instant.parse("2020-11-26T00:01:05Z")
Common.getTstamp(input) must beRight(expected)
}
Expand All @@ -79,13 +79,13 @@ class CommonSpec extends Specification {
"partition" should {
"add metadata for enriched if statsd is enabled" in {
val input = List("".getBytes.asRight)
val result = Common.partition(Config.Purpose.Enriched, true, input)
val result = Common.partition(Config.Purpose.Enriched, false, true, input)
result.meta should beEqualTo(Batch.Meta(None, 1))
}

"not add metadata for enriched if statsd is disabled" in {
val input = List("".getBytes.asRight)
val result = Common.partition(Config.Purpose.Enriched, false, input)
val result = Common.partition(Config.Purpose.Enriched, false, false, input)
result.meta should beEqualTo(Batch.EmptyMeta)
}

Expand All @@ -94,7 +94,7 @@ class CommonSpec extends Specification {

val input = List(dataType11.asRight, dataType21.asRight)
val result =
Common.partition(Config.Purpose.SelfDescribingJson, false, input)
Common.partition(Config.Purpose.SelfDescribingJson, false, false, input)
result should beEqualTo(
Batch(
Batch.EmptyMeta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class GZipSerializerSpec extends Specification {
cleanup()

val binaryInputs = List(
(List("A", "B", 1000, "a", "b"):List[Any]).mkString("\t").getBytes.asRight,
(List("X", "Y", 2000, "x", "y"):List[Any]).mkString("\t").getBytes.asRight
(List("A", "B", 1000, "a", "b"): List[Any]).mkString("\t").getBytes.asRight,
(List("X", "Y", 2000, "x", "y"): List[Any]).mkString("\t").getBytes.asRight
)

val serializationResult =
Expand Down

0 comments on commit 047c211

Please sign in to comment.