diff --git a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala index ac7e02c..283627f 100644 --- a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala +++ b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala @@ -52,6 +52,8 @@ class KinesisS3Emitter(client: AmazonS3, serializer: ISerializer) extends IEmitter[Result] { + private val partitionTsvByApp = output.s3.partitionForPurpose(purpose).exists(_.contains("{app}")) + /** * Reads items from a buffer and saves them to s3. * @@ -67,9 +69,9 @@ class KinesisS3Emitter(client: AmazonS3, val records = buffer.getRecords.asScala.toList val partitionedBatch = - Common.partition(purpose, monitoring.isStatsDEnabled, records) + Common.partition(purpose, partitionTsvByApp, monitoring.isStatsDEnabled, records) - val getBase: Option[RowType.SelfDescribing] => String = + val getBase: Option[RowType] => String = getBaseFilename(output.s3, purpose, buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber, LocalDateTime.now) val afterEmit: () => Unit = () => monitoring.report(partitionedBatch.meta) @@ -78,7 +80,7 @@ class KinesisS3Emitter(client: AmazonS3, case (RowType.Unpartitioned, partitionRecords) if partitionRecords.nonEmpty => emitRecords(partitionRecords, afterEmit, getBase(None)) .map(_.asLeft) - case (data: RowType.SelfDescribing, partitionRecords) if partitionRecords.nonEmpty => + case (data @ (_: RowType.SelfDescribing | _: RowType.Tsv), partitionRecords) if partitionRecords.nonEmpty => emitRecords(partitionRecords, afterEmit, getBase(Some(data))).map(_.asLeft) case _ => records // ReadingError or empty partition - should be handled later by serializer @@ -238,14 +240,17 @@ object KinesisS3Emitter { lastSeq: String, now: LocalDateTime )( - sdj: Option[RowType.SelfDescribing] + row: Option[RowType] ): String = { + val sdj = row.collect { case s: RowType.SelfDescribing => s } + val app = row.collect { case a: RowType.Tsv => a } val partitionPath = s3Config.partitionForPurpose(purpose).map { _.template("vendor", sdj.fold("unknown")(_.vendor)) .template("name", sdj.fold("unknown")(_.name)) .template("schema", sdj.fold("unknown")(_.name)) // allowed synonym .template("format", sdj.fold("unknown")(_.format)) .template("model", sdj.fold(-1)(_.model).toString) + .template("app", app.fold("unknown")(_.appId)) .template("yy+", now.format(DateTimeFormatter.ofPattern("yyyy"))) .template("mm", now.format(DateTimeFormatter.ofPattern("MM"))) .template("dd", now.format(DateTimeFormatter.ofPattern("dd"))) diff --git a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/package.scala b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/package.scala index 5f64b49..00f5699 100644 --- a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/package.scala +++ b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/package.scala @@ -31,4 +31,9 @@ package object loader { * Final result of S3 Loader processing */ type Result = Either[GenericError, RawRecord] + + /** + * The result of S3 Loader processing with a potentially parsed record + */ + type ParsedResult = Either[GenericError, (RawRecord, Option[Array[String]])] } diff --git a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Batch.scala b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Batch.scala index c205088..68d51ca 100644 --- a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Batch.scala +++ b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Batch.scala @@ -13,9 +13,7 @@ package com.snowplowanalytics.s3.loader.processing import java.time.Instant -import java.nio.charset.StandardCharsets.UTF_8 - -import com.snowplowanalytics.s3.loader.Result +import com.snowplowanalytics.s3.loader.{ParsedResult, Result} import com.snowplowanalytics.s3.loader.processing.Batch.Meta /** Content of a KCL buffer with metadata attached */ @@ -34,13 +32,12 @@ object Batch { val EmptyMeta: Meta = Meta(None, 0) - def fromEnriched(inputs: List[Result]): Batch[List[Result]] = { + def fromEnriched(inputs: List[ParsedResult]): Batch[List[ParsedResult]] = { val meta = inputs.foldLeft(EmptyMeta) { case (Meta(tstamp, count), Left(_)) => Meta(tstamp, count + 1) - case (Meta(tstamp, count), Right(raw)) => - val strRecord = new String(raw, UTF_8) - val extracted = Common.getTstamp(strRecord).toOption + case (Meta(tstamp, count), Right((_, array))) => + val extracted = array.flatMap(Common.getTstamp(_).toOption) val min = Common.compareTstamps(tstamp, extracted) Meta(min, count + 1) } @@ -48,6 +45,6 @@ object Batch { Batch(meta, inputs) } - def from(inputs: List[Result]): Batch[List[Result]] = + def from[R](inputs: List[R]): Batch[List[R]] = Batch(EmptyMeta, inputs) } diff --git a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Common.scala b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Common.scala index 6dd56c6..f12a873 100644 --- a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Common.scala +++ b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Common.scala @@ -14,15 +14,11 @@ package com.snowplowanalytics.s3.loader.processing import java.time.Instant import java.nio.charset.StandardCharsets.UTF_8 - import cats.syntax.either._ - import io.circe.parser.parse - import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.iglu.core.circe.implicits._ - -import com.snowplowanalytics.s3.loader.Result +import com.snowplowanalytics.s3.loader.{ParsedResult, Result} import com.snowplowanalytics.s3.loader.Config.Purpose import com.snowplowanalytics.s3.loader.monitoring.StatsD.CollectorTstampIdx @@ -40,14 +36,29 @@ object Common { */ def partition( purpose: Purpose, + partitionTsvByApp: Boolean, statsDEnabled: Boolean, records: List[Result] ): Batch.Partitioned = purpose match { case Purpose.SelfDescribingJson => Batch.from(records).map(rs => partitionByType(rs).toList) - case Purpose.Enriched if statsDEnabled => - Batch.fromEnriched(records).map(rs => List((RowType.Unpartitioned, rs))) + case Purpose.Enriched => + // We need to parse the record from bytes to Array[String] to obtain time stats (for StatsD), + // as well as for partitioning by app id + val parsed = records.map(toParsedRecord(_, actuallyParse = statsDEnabled || partitionTsvByApp)) + val batch = + if (statsDEnabled) + Batch.fromEnriched(parsed) + else + Batch.from(parsed) + if (partitionTsvByApp) + batch.map(rs => + partitionByApp(rs).toList.map { case (row, records) => + (row, records.map(fromParsedRecord)) + }) + else + batch.map(rs => List((RowType.Unpartitioned, rs.map(fromParsedRecord)))) case _ => Batch.from(records).map(rs => List((RowType.Unpartitioned, rs))) } @@ -70,16 +81,31 @@ object Common { case Left(_) => RowType.ReadingError } + def toParsedRecord(record: Result, actuallyParse: Boolean): ParsedResult = + record.map { byteArray => + val parsed = if (actuallyParse) Some(new String(byteArray, UTF_8).split("\t", -1)) else None + (byteArray, parsed) + } + + def fromParsedRecord(record: ParsedResult): Result = record.map(_._1) + + def partitionByApp(records: List[ParsedResult]): Map[RowType, List[ParsedResult]] = + records.groupBy { + case Right((_, array)) => + // if there are no tabs, avoid returning the whole string + val appId = array.flatMap(_.headOption.filter(_.size > 1)) + appId.fold[RowType](RowType.Unpartitioned)(RowType.Tsv) + case Left(_) => RowType.ReadingError + } + /** Extract a timestamp from enriched TSV line */ - def getTstamp(row: String): Either[RuntimeException, Instant] = { - val array = row.split("\t", -1) + def getTstamp(array: Array[String]): Either[RuntimeException, Instant] = for { string <- Either .catchOnly[IndexOutOfBoundsException](array(CollectorTstampIdx)) .map(_.replaceAll(" ", "T") + "Z") tstamp <- Either.catchOnly[DateTimeParseException](Instant.parse(string)) } yield tstamp - } def compareTstamps(a: Option[Instant], b: Option[Instant]): Option[Instant] = (a, b) match { diff --git a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/RowType.scala b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/RowType.scala index 3e9471d..714eed5 100644 --- a/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/RowType.scala +++ b/modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/RowType.scala @@ -23,6 +23,9 @@ object RowType { */ case object Unpartitioned extends RowType + /** TSV line with payload that can be partitioned */ + final case class Tsv(appId: String) extends RowType + /** JSON line with self-describing payload that can be partitioned */ final case class SelfDescribing(vendor: String, name: String, format: String, model: Int) extends RowType diff --git a/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/processing/BatchSpec.scala b/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/processing/BatchSpec.scala index ddfe88e..2d08248 100644 --- a/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/processing/BatchSpec.scala +++ b/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/processing/BatchSpec.scala @@ -14,16 +14,12 @@ package com.snowplowanalytics.s3.loader.processing import java.time.Instant import java.util.UUID - import cats.data.NonEmptyList import cats.syntax.either._ - import com.snowplowanalytics.snowplow.badrows.BadRow.GenericError import com.snowplowanalytics.snowplow.badrows.Failure.GenericFailure import com.snowplowanalytics.snowplow.badrows.Payload.RawPayload - -import com.snowplowanalytics.s3.loader.{Result, S3Loader} - +import com.snowplowanalytics.s3.loader.{ParsedResult, Result, S3Loader} import org.specs2.mutable.Specification class BatchSpec extends Specification { @@ -37,11 +33,11 @@ class BatchSpec extends Specification { "fromEnriched" should { "extract the earliest timestamp" in { - val input: List[Result] = List( + val input: List[ParsedResult] = List( BatchSpec.getEvent("2020-11-26 00:02:05"), BatchSpec.getEvent("2020-11-26 00:01:05"), BatchSpec.getEvent("2020-11-26 00:03:05") - ).map(_.getBytes.asRight) + ).map(_.getBytes.asRight).map(Common.toParsedRecord(_, actuallyParse = true)) val expected = Batch.Meta(Some(Instant.parse("2020-11-26T00:01:05Z")), 3) @@ -49,8 +45,10 @@ class BatchSpec extends Specification { } "ignore invalid TSVs for timestamps, but preserve for count" in { - val input: List[Result] = - List("invalid event", "rubbish").map(_.getBytes.asRight) + val input: List[ParsedResult] = + List("invalid event", "rubbish") + .map(_.getBytes.asRight) + .map(Common.toParsedRecord(_, actuallyParse = true)) val expected = Batch(Batch.Meta(None, 2), input) diff --git a/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/processing/CommonSpec.scala b/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/processing/CommonSpec.scala index 135fea2..41caf7d 100644 --- a/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/processing/CommonSpec.scala +++ b/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/processing/CommonSpec.scala @@ -70,7 +70,7 @@ class CommonSpec extends Specification { "getTimestamp" should { "parse timestamp in proper format" in { - val input = List.fill(4)("2020-11-26 00:01:05").mkString("\t") + val input = List.fill(4)("2020-11-26 00:01:05").toArray val expected = Instant.parse("2020-11-26T00:01:05Z") Common.getTstamp(input) must beRight(expected) } @@ -79,13 +79,13 @@ class CommonSpec extends Specification { "partition" should { "add metadata for enriched if statsd is enabled" in { val input = List("".getBytes.asRight) - val result = Common.partition(Config.Purpose.Enriched, true, input) + val result = Common.partition(Config.Purpose.Enriched, false, true, input) result.meta should beEqualTo(Batch.Meta(None, 1)) } "not add metadata for enriched if statsd is disabled" in { val input = List("".getBytes.asRight) - val result = Common.partition(Config.Purpose.Enriched, false, input) + val result = Common.partition(Config.Purpose.Enriched, false, false, input) result.meta should beEqualTo(Batch.EmptyMeta) } @@ -94,7 +94,7 @@ class CommonSpec extends Specification { val input = List(dataType11.asRight, dataType21.asRight) val result = - Common.partition(Config.Purpose.SelfDescribingJson, false, input) + Common.partition(Config.Purpose.SelfDescribingJson, false, false, input) result should beEqualTo( Batch( Batch.EmptyMeta, diff --git a/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/serializers/GZipSerializerSpec.scala b/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/serializers/GZipSerializerSpec.scala index d0a823b..7903d64 100644 --- a/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/serializers/GZipSerializerSpec.scala +++ b/modules/main/src/test/scala/com/snowplowanalytics/s3/loader/serializers/GZipSerializerSpec.scala @@ -42,8 +42,8 @@ class GZipSerializerSpec extends Specification { cleanup() val binaryInputs = List( - (List("A", "B", 1000, "a", "b"):List[Any]).mkString("\t").getBytes.asRight, - (List("X", "Y", 2000, "x", "y"):List[Any]).mkString("\t").getBytes.asRight + (List("A", "B", 1000, "a", "b"): List[Any]).mkString("\t").getBytes.asRight, + (List("X", "Y", 2000, "x", "y"): List[Any]).mkString("\t").getBytes.asRight ) val serializationResult =