Skip to content

Commit

Permalink
Fix dateFormat partitioning in output path (close #236)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 26, 2021
1 parent cafd473 commit 41c42ff
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 36 deletions.
7 changes: 5 additions & 2 deletions config/config.hocon.sample
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
"path": "s3://acme-snowplow-output/raw/",

# Partitioning format; Optional
"dateFormat": "%Y-%M-%d",
# Valid substitutions are {vendor}, {name}, {format}, {model} for self-describing jsons
# and {yy}, {mm}, {dd}, {hh} for year, month, day, hour
partitionFormat: "{vendor}.{name}/model={model}/date={yy}-{mm}-{dd}"

# Prefix for all file names; Optional
"filenamePrefix": "pre",

Expand Down Expand Up @@ -82,4 +85,4 @@
}
}
}
}
}
9 changes: 8 additions & 1 deletion src/main/scala/com/snowplowanalytics/s3/loader/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ object Config {
}

final case class S3Output(path: String,
dateFormat: Option[String],
partitionFormat: Option[String],
filenamePrefix: Option[String],
compression: Compression,
maxTimeout: Int,
Expand All @@ -146,6 +146,13 @@ object Config {
if (possiblyEmpty.isEmpty) None else Some(possiblyEmpty)
}

def partitionForPurpose(purpose: Purpose): Option[String] =
(partitionFormat, purpose) match {
case (Some(f), _) => Some(f)
case (None, Purpose.SelfDescribingJson) => Some("{vendor}.{schema}")
case (None, Purpose.Raw | Purpose.Enriched) => None
}

// For backward-compatibility
private val scheme = "s3://"
private val withoutPrefix =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class KinesisS3Emitter(client: AmazonS3,
val partitionedBatch =
Common.partition(purpose, monitoring.isStatsDEnabled, records)

val getBase: Option[String] => String =
getBaseFilename(output.s3, buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber)
val getBase: Option[RowType.SelfDescribing] => String =
getBaseFilename(output.s3, purpose, buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber, LocalDateTime.now)
val afterEmit: () => Unit =
() => monitoring.report(partitionedBatch.meta)

Expand All @@ -79,7 +79,7 @@ class KinesisS3Emitter(client: AmazonS3,
emitRecords(partitionRecords, afterEmit, getBase(None))
.map(_.asLeft)
case (data: RowType.SelfDescribing, partitionRecords) if partitionRecords.nonEmpty =>
emitRecords(partitionRecords, afterEmit, getBase(Some(data.partition))).map(_.asLeft)
emitRecords(partitionRecords, afterEmit, getBase(Some(data))).map(_.asLeft)
case _ =>
records // ReadingError or empty partition - should be handled later by serializer
}.asJava
Expand Down Expand Up @@ -234,20 +234,33 @@ object KinesisS3Emitter {
*/
def getBaseFilename(
s3Config: S3Output,
purpose: Purpose,
firstSeq: String,
lastSeq: String
lastSeq: String,
now: LocalDateTime
)(
partition: Option[String]
sdj: Option[RowType.SelfDescribing]
): String = {
val path = List(s3Config.outputDirectory, s3Config.dateFormat).flatten.mkString("/")
val fileName = (s3Config.filenamePrefix.toList ++ partition.toList ++ List(
LocalDateTime.now.format(
val partitionPath = s3Config.partitionForPurpose(purpose).map {
_.template("vendor", sdj.fold("unknown")(_.vendor))
.template("schema", sdj.fold("unknown")(_.name))
.template("format", sdj.fold("unknown")(_.format))
.template("model", sdj.fold(-1)(_.model).toString)
.template("yy+", now.format(DateTimeFormatter.ofPattern("yyyy")))
.template("mm", now.format(DateTimeFormatter.ofPattern("MM")))
.template("dd", now.format(DateTimeFormatter.ofPattern("dd")))
.template("hh", now.format(DateTimeFormatter.ofPattern("HH")))
}

val path = List(s3Config.outputDirectory, partitionPath).flatten
val fileName = (s3Config.filenamePrefix ++: List(
now.format(
DateTimeFormatter.ofPattern("yyyy-MM-dd-HHmmss")
),
firstSeq,
lastSeq
)).mkString("-")
val fullPath = List(path, fileName).filterNot(_.isEmpty).mkString("/")
val fullPath = (path :+ fileName).filterNot(_.isEmpty).mkString("/")

DynamicPath.normalize(fullPath)
}
Expand All @@ -262,4 +275,9 @@ object KinesisS3Emitter {
catch {
case _: InterruptedException => ()
}

private implicit class StringOps(val s: String) extends AnyVal {
def template(matcher: String, replacement: String): String =
s.replaceAll(s"""(?i)\\{$matcher\\}""", replacement)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ object RowType {
case object Unpartitioned extends RowType

/** JSON line with self-describing payload that can be partitioned */
final case class SelfDescribing(vendor: String, name: String, format: String, model: Int) extends RowType {
def partition: String = s"$vendor.$name/$format-$model"
}
final case class SelfDescribing(vendor: String, name: String, format: String, model: Int) extends RowType

/** Unrecognized line, e.g. non-string or non-SDJSON whereas partitioning is enabled */
case object ReadingError extends RowType
Expand Down
16 changes: 13 additions & 3 deletions src/test/scala/com/snowplowanalytics/s3/loader/ConfigSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ConfigSpec extends Specification {
"s3": {
"path": "s3://s3-loader-integration-test/usual",
"dateFormat": "%Y-%M-%d",
"partitionFormat": "schema={vendor}.{schema}/year={yy}",
"filenamePrefix": "pre",
"maxTimeout": 2000,
Expand Down Expand Up @@ -72,7 +72,12 @@ class ConfigSpec extends Specification {
Purpose.Raw,
Config.Input("acme-s3-loader", "enriched-events", InitialPosition.Latest, None, 10),
Config.Output(
S3Output("s3://s3-loader-integration-test/usual", Some("%Y-%M-%d"), Some("pre"), Compression.Gzip, 2000, None),
S3Output("s3://s3-loader-integration-test/usual",
Some("schema={vendor}.{schema}/year={yy}"),
Some("pre"),
Compression.Gzip,
2000,
None),
Config.KinesisOutput("stream-name")
),
Config.Buffer(2048L, 10L, 5000L),
Expand Down Expand Up @@ -103,7 +108,12 @@ class ConfigSpec extends Specification {
Purpose.Raw,
Config.Input("acme-s3-loader", "raw-events", InitialPosition.Latest, None, 10),
Config.Output(
S3Output("s3://acme-snowplow-output/raw/", Some("%Y-%M-%d"), Some("pre"), Compression.Gzip, 2000, None),
S3Output("s3://acme-snowplow-output/raw/",
Some("{vendor}.{name}/model={model}/date={yy}-{mm}-{dd}"),
Some("pre"),
Compression.Gzip,
2000,
None),
Config.KinesisOutput("stream-name")
),
Config.Buffer(2048L, 10L, 5000L),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,56 +12,88 @@
*/
package com.snowplowanalytics.s3.loader.connector

import java.time.LocalDateTime

import org.specs2.mutable.Specification

import com.snowplowanalytics.s3.loader.Config.{Compression, S3Output}
import com.snowplowanalytics.s3.loader.processing.RowType
import com.snowplowanalytics.s3.loader.Config.{Compression, Purpose, S3Output}

class KinesisS3EmitterSpec extends Specification {
"KinesisS3Emitter" should {
val firstSeq = "firstSeq"
val lastSeq = "lastSeq"
val partition = "com.snowplow.partition"
val sdj = RowType.SelfDescribing("com.snowplow", "myschema", "jsonschema", 42)
val outputDirectory = "outputDirectory"
val dateFormat = "{YYYY}/{MM}/{dd}/{HH}"
val sdjPartitionFormat = "{vendor}.{schema}/model={model}/date={yy}-{mm}-{dd}"
val rawPartitionFormat = "date={yy}-{mm}-{dd}"
val filenamePrefix = "fileNamePrefix"
val now = LocalDateTime.of(2020, 2, 4, 6, 8, 10, 12)

"format file name with optional components" in {
"format file name for raw with optional components" in {
val purpose = Purpose.Raw
val s3Config =
S3Output(s"s3://no-bucket/$outputDirectory", Some(dateFormat), Some(filenamePrefix), Compression.Gzip, 0, None)
val actual = KinesisS3Emitter.getBaseFilename(s3Config, firstSeq, lastSeq)(Some(partition))
S3Output(s"s3://no-bucket/$outputDirectory", Some(rawPartitionFormat), Some(filenamePrefix), Compression.Gzip, 0, None)
val actual = KinesisS3Emitter.getBaseFilename(s3Config, purpose, firstSeq, lastSeq, now)(Some(sdj))

actual must beEqualTo(
s"$outputDirectory/date=2020-02-04/$filenamePrefix-2020-02-04-060810-$firstSeq-$lastSeq"
)
}

"format file name for sdj with optional components" in {
val purpose = Purpose.SelfDescribingJson
val s3Config =
S3Output(s"s3://no-bucket/$outputDirectory", Some(sdjPartitionFormat), Some(filenamePrefix), Compression.Gzip, 0, None)
val actual = KinesisS3Emitter.getBaseFilename(s3Config, purpose, firstSeq, lastSeq, now)(Some(sdj))

actual must beEqualTo(
s"$outputDirectory/com.snowplow.myschema/model=42/date=2020-02-04/$filenamePrefix-2020-02-04-060810-$firstSeq-$lastSeq"
)
}

"format file name for raw without optional components" in {
val purpose = Purpose.Raw
val s3Config =
S3Output("s3://no-bucket", None, None, Compression.Gzip, 0, None)
val actual =
KinesisS3Emitter.getBaseFilename(s3Config, purpose, firstSeq, lastSeq, now)(None)

actual.replaceAll("\\d{4}-\\d{2}-\\d{2}-\\d{6}", "2021-04-30-000000") must beEqualTo(
s"$outputDirectory/$dateFormat/$filenamePrefix-$partition-2021-04-30-000000-$firstSeq-$lastSeq"
actual must beEqualTo(
s"2020-02-04-060810-$firstSeq-$lastSeq"
)
}

"format file name without optional components" in {
"format file name for sdj without optional components" in {
val purpose = Purpose.SelfDescribingJson
val s3Config =
S3Output("s3://no-bucket", None, None, Compression.Gzip, 0, None)
val actual =
KinesisS3Emitter.getBaseFilename(s3Config, firstSeq, lastSeq)(None)
KinesisS3Emitter.getBaseFilename(s3Config, purpose, firstSeq, lastSeq, now)(None)

actual.replaceAll("\\d{4}-\\d{2}-\\d{2}-\\d{6}", "2021-04-30-000000") must beEqualTo(
s"2021-04-30-000000-$firstSeq-$lastSeq"
actual must beEqualTo(
s"unknown.unknown/2020-02-04-060810-$firstSeq-$lastSeq"
)
}

"format file name with path, but without optional components" in {
val purpose = Purpose.Raw
val s3Config = S3Output(s"s3://no-bucket/$outputDirectory", None, None, Compression.Gzip, 0, None)
val actual =
KinesisS3Emitter.getBaseFilename(s3Config, firstSeq, lastSeq)(None)
KinesisS3Emitter.getBaseFilename(s3Config, purpose, firstSeq, lastSeq, now)(None)

actual.replaceAll("\\d{4}-\\d{2}-\\d{2}-\\d{6}", "2021-04-30-000000") must beEqualTo(
s"$outputDirectory/2021-04-30-000000-$firstSeq-$lastSeq"
actual must beEqualTo(
s"$outputDirectory/2020-02-04-060810-$firstSeq-$lastSeq"
)
}

"format file name with path and partition" in {
val purpose = Purpose.SelfDescribingJson
val s3Config = S3Output(s"s3://no-bucket/$outputDirectory", None, None, Compression.Gzip, 0, None)
val actual = KinesisS3Emitter.getBaseFilename(s3Config, firstSeq, lastSeq)(Some(partition))
val actual = KinesisS3Emitter.getBaseFilename(s3Config, purpose, firstSeq, lastSeq, now)(Some(sdj))

actual.replaceAll("\\d{4}-\\d{2}-\\d{2}-\\d{6}", "2021-04-30-000000") must beEqualTo(
s"$outputDirectory/$partition-2021-04-30-000000-$firstSeq-$lastSeq"
actual must beEqualTo(
s"$outputDirectory/com.snowplow.myschema/2020-02-04-060810-$firstSeq-$lastSeq"
)
}
}
Expand Down

0 comments on commit 41c42ff

Please sign in to comment.