From acfd17dc0bc79052366ea73019c478c4bcff983d Mon Sep 17 00:00:00 2001 From: Nick Date: Fri, 24 May 2024 16:19:45 +0100 Subject: [PATCH] Add the ability to partition TSV by app id --- .../loader/connector/KinesisS3Emitter.scala | 13 +++-- .../snowplowanalytics/s3/loader/package.scala | 5 ++ .../s3/loader/processing/Batch.scala | 13 ++--- .../s3/loader/processing/Common.scala | 49 ++++++++++++++----- .../s3/loader/processing/RowType.scala | 3 ++ 5 files changed, 59 insertions(+), 24 deletions(-) diff --git a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala index ac7e02c..e66dd77 100644 --- a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala +++ b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala @@ -52,6 +52,8 @@ class KinesisS3Emitter(client: AmazonS3, serializer: ISerializer) extends IEmitter[Result] { + private val partitionTsvByApp = output.s3.partitionForPurpose(purpose).exists(_.contains("{app}")) + /** * Reads items from a buffer and saves them to s3. * @@ -67,9 +69,9 @@ class KinesisS3Emitter(client: AmazonS3, val records = buffer.getRecords.asScala.toList val partitionedBatch = - Common.partition(purpose, monitoring.isStatsDEnabled, records) + Common.partition(purpose, partitionTsvByApp, monitoring.isStatsDEnabled, records) - val getBase: Option[RowType.SelfDescribing] => String = + val getBase: Option[RowType] => String = getBaseFilename(output.s3, purpose, buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber, LocalDateTime.now) val afterEmit: () => Unit = () => monitoring.report(partitionedBatch.meta) @@ -78,7 +80,7 @@ class KinesisS3Emitter(client: AmazonS3, case (RowType.Unpartitioned, partitionRecords) if partitionRecords.nonEmpty => emitRecords(partitionRecords, afterEmit, getBase(None)) .map(_.asLeft) - case (data: RowType.SelfDescribing, partitionRecords) if partitionRecords.nonEmpty => + case (data @ (_: RowType.SelfDescribing | _: RowType.TsvPerApp), partitionRecords) if partitionRecords.nonEmpty => emitRecords(partitionRecords, afterEmit, getBase(Some(data))).map(_.asLeft) case _ => records // ReadingError or empty partition - should be handled later by serializer @@ -238,14 +240,17 @@ object KinesisS3Emitter { lastSeq: String, now: LocalDateTime )( - sdj: Option[RowType.SelfDescribing] + row: Option[RowType] ): String = { + val sdj = row.collect { case s: RowType.SelfDescribing => s } + val app = row.collect { case a: RowType.TsvPerApp => a } val partitionPath = s3Config.partitionForPurpose(purpose).map { _.template("vendor", sdj.fold("unknown")(_.vendor)) .template("name", sdj.fold("unknown")(_.name)) .template("schema", sdj.fold("unknown")(_.name)) // allowed synonym .template("format", sdj.fold("unknown")(_.format)) .template("model", sdj.fold(-1)(_.model).toString) + .template("app", app.fold("unknown")(_.appId)) .template("yy+", now.format(DateTimeFormatter.ofPattern("yyyy"))) .template("mm", now.format(DateTimeFormatter.ofPattern("MM"))) .template("dd", now.format(DateTimeFormatter.ofPattern("dd"))) diff --git a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/package.scala b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/package.scala index 5f64b49..00f5699 100644 --- a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/package.scala +++ b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/package.scala @@ -31,4 +31,9 @@ package object loader { * Final result of S3 Loader processing */ type Result = Either[GenericError, RawRecord] + + /** + * The result of S3 Loader processing with a potentially parsed record + */ + type ParsedResult = Either[GenericError, (RawRecord, Option[Array[String]])] } diff --git a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Batch.scala b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Batch.scala index c205088..68d51ca 100644 --- a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Batch.scala +++ b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Batch.scala @@ -13,9 +13,7 @@ package com.snowplowanalytics.s3.loader.processing import java.time.Instant -import java.nio.charset.StandardCharsets.UTF_8 - -import com.snowplowanalytics.s3.loader.Result +import com.snowplowanalytics.s3.loader.{ParsedResult, Result} import com.snowplowanalytics.s3.loader.processing.Batch.Meta /** Content of a KCL buffer with metadata attached */ @@ -34,13 +32,12 @@ object Batch { val EmptyMeta: Meta = Meta(None, 0) - def fromEnriched(inputs: List[Result]): Batch[List[Result]] = { + def fromEnriched(inputs: List[ParsedResult]): Batch[List[ParsedResult]] = { val meta = inputs.foldLeft(EmptyMeta) { case (Meta(tstamp, count), Left(_)) => Meta(tstamp, count + 1) - case (Meta(tstamp, count), Right(raw)) => - val strRecord = new String(raw, UTF_8) - val extracted = Common.getTstamp(strRecord).toOption + case (Meta(tstamp, count), Right((_, array))) => + val extracted = array.flatMap(Common.getTstamp(_).toOption) val min = Common.compareTstamps(tstamp, extracted) Meta(min, count + 1) } @@ -48,6 +45,6 @@ object Batch { Batch(meta, inputs) } - def from(inputs: List[Result]): Batch[List[Result]] = + def from[R](inputs: List[R]): Batch[List[R]] = Batch(EmptyMeta, inputs) } diff --git a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Common.scala b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Common.scala index 6dd56c6..1e89689 100644 --- a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Common.scala +++ b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Common.scala @@ -14,15 +14,11 @@ package com.snowplowanalytics.s3.loader.processing import java.time.Instant import java.nio.charset.StandardCharsets.UTF_8 - import cats.syntax.either._ - import io.circe.parser.parse - import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.iglu.core.circe.implicits._ - -import com.snowplowanalytics.s3.loader.Result +import com.snowplowanalytics.s3.loader.{ParsedResult, Result} import com.snowplowanalytics.s3.loader.Config.Purpose import com.snowplowanalytics.s3.loader.monitoring.StatsD.CollectorTstampIdx @@ -39,15 +35,28 @@ object Common { * @param records raw records themselves */ def partition( - purpose: Purpose, - statsDEnabled: Boolean, - records: List[Result] + purpose: Purpose, + partitionTsvByApp: Boolean, + statsDEnabled: Boolean, + records: List[Result] ): Batch.Partitioned = purpose match { case Purpose.SelfDescribingJson => Batch.from(records).map(rs => partitionByType(rs).toList) - case Purpose.Enriched if statsDEnabled => - Batch.fromEnriched(records).map(rs => List((RowType.Unpartitioned, rs))) + case Purpose.Enriched => + // We need to parse the record from bytes to Array[String] to obtain time stats (for StatsD), + // as well as for partitioning by app id + val parsed = records.map(toParsedRecord(_, actuallyParse = statsDEnabled || partitionTsvByApp)) + val batch = if (statsDEnabled) + Batch.fromEnriched(parsed) + else + Batch.from(parsed) + if (partitionTsvByApp) + batch.map(rs => partitionByApp(rs).toList.map { + case (row, records) => (row, records.map(fromParsedRecord)) + }) + else + batch.map(rs => List((RowType.Unpartitioned, rs.map(fromParsedRecord)))) case _ => Batch.from(records).map(rs => List((RowType.Unpartitioned, rs))) } @@ -70,9 +79,25 @@ object Common { case Left(_) => RowType.ReadingError } + def toParsedRecord(record: Result, actuallyParse: Boolean): ParsedResult = + record.map { byteArray => + val parsed = if (actuallyParse) Some(new String(byteArray, UTF_8).split("\t", -1)) else None + (byteArray, parsed) + } + + def fromParsedRecord(record: ParsedResult): Result = record.map(_._1) + + def partitionByApp(records: List[ParsedResult]): Map[RowType, List[ParsedResult]] = + records.groupBy { + case Right((_, array)) => + // if there are no tabs, avoid returning the whole string + val appId = array.flatMap(_.headOption.filter(_.size > 1)) + appId.fold[RowType](RowType.Unpartitioned)(RowType.TsvPerApp) + case Left(_) => RowType.ReadingError + } + /** Extract a timestamp from enriched TSV line */ - def getTstamp(row: String): Either[RuntimeException, Instant] = { - val array = row.split("\t", -1) + def getTstamp(array: Array[String]): Either[RuntimeException, Instant] = { for { string <- Either .catchOnly[IndexOutOfBoundsException](array(CollectorTstampIdx)) diff --git a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/RowType.scala b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/RowType.scala index 3e9471d..b69684d 100644 --- a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/RowType.scala +++ b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/RowType.scala @@ -23,6 +23,9 @@ object RowType { */ case object Unpartitioned extends RowType + /** TSV line partitioned by app id */ + final case class TsvPerApp(appId: String) extends RowType + /** JSON line with self-describing payload that can be partitioned */ final case class SelfDescribing(vendor: String, name: String, format: String, model: Int) extends RowType