Skip to content

Commit

Permalink
Add the ability to partition TSV by app id
Browse files Browse the repository at this point in the history
  • Loading branch information
stanch committed May 24, 2024
1 parent 6eb910d commit acfd17d
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class KinesisS3Emitter(client: AmazonS3,
serializer: ISerializer)
extends IEmitter[Result] {

private val partitionTsvByApp = output.s3.partitionForPurpose(purpose).exists(_.contains("{app}"))

/**
* Reads items from a buffer and saves them to s3.
*
Expand All @@ -67,9 +69,9 @@ class KinesisS3Emitter(client: AmazonS3,

val records = buffer.getRecords.asScala.toList
val partitionedBatch =
Common.partition(purpose, monitoring.isStatsDEnabled, records)
Common.partition(purpose, partitionTsvByApp, monitoring.isStatsDEnabled, records)

val getBase: Option[RowType.SelfDescribing] => String =
val getBase: Option[RowType] => String =
getBaseFilename(output.s3, purpose, buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber, LocalDateTime.now)
val afterEmit: () => Unit =
() => monitoring.report(partitionedBatch.meta)
Expand All @@ -78,7 +80,7 @@ class KinesisS3Emitter(client: AmazonS3,
case (RowType.Unpartitioned, partitionRecords) if partitionRecords.nonEmpty =>
emitRecords(partitionRecords, afterEmit, getBase(None))
.map(_.asLeft)
case (data: RowType.SelfDescribing, partitionRecords) if partitionRecords.nonEmpty =>
case (data @ (_: RowType.SelfDescribing | _: RowType.TsvPerApp), partitionRecords) if partitionRecords.nonEmpty =>
emitRecords(partitionRecords, afterEmit, getBase(Some(data))).map(_.asLeft)
case _ =>
records // ReadingError or empty partition - should be handled later by serializer
Expand Down Expand Up @@ -238,14 +240,17 @@ object KinesisS3Emitter {
lastSeq: String,
now: LocalDateTime
)(
sdj: Option[RowType.SelfDescribing]
row: Option[RowType]
): String = {
val sdj = row.collect { case s: RowType.SelfDescribing => s }
val app = row.collect { case a: RowType.TsvPerApp => a }
val partitionPath = s3Config.partitionForPurpose(purpose).map {
_.template("vendor", sdj.fold("unknown")(_.vendor))
.template("name", sdj.fold("unknown")(_.name))
.template("schema", sdj.fold("unknown")(_.name)) // allowed synonym
.template("format", sdj.fold("unknown")(_.format))
.template("model", sdj.fold(-1)(_.model).toString)
.template("app", app.fold("unknown")(_.appId))
.template("yy+", now.format(DateTimeFormatter.ofPattern("yyyy")))
.template("mm", now.format(DateTimeFormatter.ofPattern("MM")))
.template("dd", now.format(DateTimeFormatter.ofPattern("dd")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ package object loader {
* Final result of S3 Loader processing
*/
type Result = Either[GenericError, RawRecord]

/**
* The result of S3 Loader processing with a potentially parsed record
*/
type ParsedResult = Either[GenericError, (RawRecord, Option[Array[String]])]
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
package com.snowplowanalytics.s3.loader.processing

import java.time.Instant
import java.nio.charset.StandardCharsets.UTF_8

import com.snowplowanalytics.s3.loader.Result
import com.snowplowanalytics.s3.loader.{ParsedResult, Result}
import com.snowplowanalytics.s3.loader.processing.Batch.Meta

/** Content of a KCL buffer with metadata attached */
Expand All @@ -34,20 +32,19 @@ object Batch {

val EmptyMeta: Meta = Meta(None, 0)

def fromEnriched(inputs: List[Result]): Batch[List[Result]] = {
def fromEnriched(inputs: List[ParsedResult]): Batch[List[ParsedResult]] = {
val meta = inputs.foldLeft(EmptyMeta) {
case (Meta(tstamp, count), Left(_)) =>
Meta(tstamp, count + 1)
case (Meta(tstamp, count), Right(raw)) =>
val strRecord = new String(raw, UTF_8)
val extracted = Common.getTstamp(strRecord).toOption
case (Meta(tstamp, count), Right((_, array))) =>
val extracted = array.flatMap(Common.getTstamp(_).toOption)
val min = Common.compareTstamps(tstamp, extracted)
Meta(min, count + 1)
}

Batch(meta, inputs)
}

def from(inputs: List[Result]): Batch[List[Result]] =
def from[R](inputs: List[R]): Batch[List[R]] =
Batch(EmptyMeta, inputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@ package com.snowplowanalytics.s3.loader.processing

import java.time.Instant
import java.nio.charset.StandardCharsets.UTF_8

import cats.syntax.either._

import io.circe.parser.parse

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.s3.loader.Result
import com.snowplowanalytics.s3.loader.{ParsedResult, Result}
import com.snowplowanalytics.s3.loader.Config.Purpose
import com.snowplowanalytics.s3.loader.monitoring.StatsD.CollectorTstampIdx

Expand All @@ -39,15 +35,28 @@ object Common {
* @param records raw records themselves
*/
def partition(
purpose: Purpose,
statsDEnabled: Boolean,
records: List[Result]
purpose: Purpose,
partitionTsvByApp: Boolean,
statsDEnabled: Boolean,
records: List[Result]
): Batch.Partitioned =
purpose match {
case Purpose.SelfDescribingJson =>
Batch.from(records).map(rs => partitionByType(rs).toList)
case Purpose.Enriched if statsDEnabled =>
Batch.fromEnriched(records).map(rs => List((RowType.Unpartitioned, rs)))
case Purpose.Enriched =>
// We need to parse the record from bytes to Array[String] to obtain time stats (for StatsD),
// as well as for partitioning by app id
val parsed = records.map(toParsedRecord(_, actuallyParse = statsDEnabled || partitionTsvByApp))
val batch = if (statsDEnabled)
Batch.fromEnriched(parsed)
else
Batch.from(parsed)
if (partitionTsvByApp)
batch.map(rs => partitionByApp(rs).toList.map {
case (row, records) => (row, records.map(fromParsedRecord))
})
else
batch.map(rs => List((RowType.Unpartitioned, rs.map(fromParsedRecord))))
case _ =>
Batch.from(records).map(rs => List((RowType.Unpartitioned, rs)))
}
Expand All @@ -70,9 +79,25 @@ object Common {
case Left(_) => RowType.ReadingError
}

def toParsedRecord(record: Result, actuallyParse: Boolean): ParsedResult =
record.map { byteArray =>
val parsed = if (actuallyParse) Some(new String(byteArray, UTF_8).split("\t", -1)) else None
(byteArray, parsed)
}

def fromParsedRecord(record: ParsedResult): Result = record.map(_._1)

def partitionByApp(records: List[ParsedResult]): Map[RowType, List[ParsedResult]] =
records.groupBy {
case Right((_, array)) =>
// if there are no tabs, avoid returning the whole string
val appId = array.flatMap(_.headOption.filter(_.size > 1))
appId.fold[RowType](RowType.Unpartitioned)(RowType.TsvPerApp)
case Left(_) => RowType.ReadingError
}

/** Extract a timestamp from enriched TSV line */
def getTstamp(row: String): Either[RuntimeException, Instant] = {
val array = row.split("\t", -1)
def getTstamp(array: Array[String]): Either[RuntimeException, Instant] = {
for {
string <- Either
.catchOnly[IndexOutOfBoundsException](array(CollectorTstampIdx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ object RowType {
*/
case object Unpartitioned extends RowType

/** TSV line partitioned by app id */
final case class TsvPerApp(appId: String) extends RowType

/** JSON line with self-describing payload that can be partitioned */
final case class SelfDescribing(vendor: String, name: String, format: String, model: Int) extends RowType

Expand Down

0 comments on commit acfd17d

Please sign in to comment.