diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala index 293c2a36a..435aa65fb 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala @@ -24,13 +24,15 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{ShreddedTy */ object Common { + val GoodPrefix = "kind=good" + val AtomicSchema: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) val AtomicType = ShreddedType(AtomicSchema, Format.TSV) val AtomicPath: String = entityPath(AtomicType) def entityPath(entity: ShreddedType) = - s"vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}" + s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}" /** * Remove all occurrences of access key id and secret access key from message diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/Config.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/Config.scala index 86acf5051..904d85860 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/Config.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/config/Config.scala @@ -118,7 +118,7 @@ object Config { case class File(dir: String) extends StreamInput } - case class Output(good: URI, bad: URI, compression: Compression) + case class Output(path: URI, compression: Compression) sealed trait Compression extends StringEnum object Compression { diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Shredded.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Shredded.scala index 5dc1e7290..51de66a89 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Shredded.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Shredded.scala @@ -14,15 +14,15 @@ */ package com.snowplowanalytics.snowplow.rdbloader.common.transformation +import cats.{Show, Monad} import cats.data.{EitherT, NonEmptyList} -import cats.{Monad, Show} import cats.implicits._ import cats.effect.Clock import io.circe.{Json => CJson} -import com.snowplowanalytics.iglu.core.SchemaKey +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.iglu.client.{Resolver, Client} import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup @@ -34,24 +34,26 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.Format /** ADT, representing possible forms of data in blob storage */ sealed trait Shredded { + def isGood: Boolean def vendor: String def name: String def format: Format def model: Int def data: String - def json: Option[(String, String, String, Int, String)] = this match { - case Shredded.Json(vendor, name, version, data) => Some((vendor, name, Format.JSON.path, version, data)) - case Shredded.Tabular(_, _, _, _) => None + def json: Option[(String, String, String, String, Int, String)] = this match { + case _: Shredded.Json if isGood => Some(("good", vendor, name, format.path, model, data)) + case _: Shredded.Json => Some(("bad", vendor, name, format.path, model, data)) + case _: Shredded.Tabular => None } - def tsv: Option[(String, String, String, Int, String)] = this match { - case _: Shredded.Tabular => Some((vendor, name, format.path, model, data)) + def tsv: Option[(String, String, String, String, Int, String)] = this match { + case _: Shredded.Tabular => Some(("good", vendor, name, format.path, model, data)) case _: Shredded.Json => None } - def splitGood: (Shredded.Path, Shredded.Data) = - (Shredded.Path(true, vendor, name, format, model), Shredded.Data(data)) + def split: (Shredded.Path, Shredded.Data) = + (Shredded.Path(isGood, vendor, name, format, model), Shredded.Data(data)) } object Shredded { @@ -65,16 +67,23 @@ object Shredded { } } + def fromBadRow(badRow: BadRow): Shredded = { + val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey + val data = Shredded.Data(badRow.compact) + Json(false, vendor, name, model, data.value) + } + implicit val pathShow: Show[Path] = Show(_.getDir) /** Data will be represented as JSON, with RDB Loader loading it using JSON Paths. Legacy format */ - case class Json(vendor: String, name: String, model: Int, data: String) extends Shredded { + case class Json(isGood: Boolean, vendor: String, name: String, model: Int, data: String) extends Shredded { val format: Format = Format.JSON } /** Data will be represented as TSV, with RDB Loader loading it directly */ case class Tabular(vendor: String, name: String, model: Int, data: String) extends Shredded { + val isGood = true // We don't support TSV shredding for bad data val format: Format = Format.TSV } @@ -96,7 +105,7 @@ object Shredded { Tabular(vendor, name, hierarchy.entity.schema.version.model, (meta ++ columns).mkString("\t")) } } else - EitherT.pure[F, FailureDetails.LoaderIgluError](Json(vendor, name, hierarchy.entity.schema.version.model, hierarchy.dumpJson)) + EitherT.pure[F, FailureDetails.LoaderIgluError](Json(true, vendor, name, hierarchy.entity.schema.version.model, hierarchy.dumpJson)) } /** diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CommonSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CommonSpec.scala index 0606e1608..7d46bc8ff 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CommonSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CommonSpec.scala @@ -80,7 +80,6 @@ object CommonSpec { URI.create("s3://bucket/input/"), Config.Shredder.Output( URI.create("s3://bucket/good/"), - URI.create("s3://bucket/bad/"), Config.Shredder.Compression.Gzip ) ) diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ConfigSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ConfigSpec.scala index d8367ed9c..12574a707 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ConfigSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ConfigSpec.scala @@ -41,8 +41,7 @@ class ConfigSpec extends Specification { "type": "batch", "input": "s3://bucket/input/", "output" = { - "good": "s3://bucket/good/", - "bad": "s3://bucket/bad/", + "path": "s3://bucket/good/", "compression": "GZIP" } }, @@ -119,7 +118,7 @@ class ConfigSpec extends Specification { "type": "batch", "input": "s3://bucket/input/", "output" = { - "good": "s3://bucket/good/", + "path": "s3://bucket/good/", "bad": "s3://bucket/bad/", "compression": "GZIP" } @@ -231,7 +230,6 @@ object ConfigSpec { URI.create("s3://bucket/input/"), Shredder.Output( URI.create("s3://bucket/good/"), - URI.create("s3://bucket/bad/"), Config.Shredder.Compression.Gzip ) ), diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala index 79afea68c..ff1c6fe12 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala @@ -56,7 +56,7 @@ object ShreddedType { */ final case class Json(info: Info, jsonPaths: S3.Key) extends ShreddedType { def getLoadPath: String = - s"${info.base}vendor=${info.vendor}/name=${info.name}/format=json/model=${info.model}" + s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=json/model=${info.model}" def show: String = s"${info.toCriterion.asString} ($jsonPaths)" } @@ -69,7 +69,7 @@ object ShreddedType { */ final case class Tabular(info: Info) extends ShreddedType { def getLoadPath: String = - s"${info.base}vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.model}" + s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.model}" def show: String = s"${info.toCriterion.asString} TSV" } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftStatements.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftStatements.scala index f88d5ea22..db0f22cfd 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftStatements.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftStatements.scala @@ -19,7 +19,7 @@ import com.snowplowanalytics.snowplow.rdbloader.loading.RedshiftStatements._ import com.snowplowanalytics.snowplow.rdbloader.loading.Load.SqlString import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Shredder.Compression import com.snowplowanalytics.snowplow.rdbloader.common.config.StorageTarget.Redshift -import com.snowplowanalytics.snowplow.rdbloader.common.config.{Step, Config} +import com.snowplowanalytics.snowplow.rdbloader.common.config.{Config, Step} /** diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala index c3949a481..38caa1b2a 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala @@ -59,7 +59,6 @@ object SpecHelpers { URI.create("s3://bucket/input/"), Config.Shredder.Output( URI.create("s3://bucket/good/"), - URI.create("s3://bucket/bad/"), Config.Shredder.Compression.Gzip ) ), diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala index 0e5296456..64b5ee02b 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/CliConfigSpec.scala @@ -74,8 +74,7 @@ object CliConfigSpec { "type": "batch", "input": "s3://bucket/input/", "output" = { - "good": "s3://bucket/good/", - "bad": "s3://bucket/bad/", + "path": "s3://bucket/good/", "compression": "GZIP" } }, diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala index 388bfe78a..2a554f882 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala @@ -42,8 +42,8 @@ class LoadSpec extends Specification { val expected = List( "BEGIN", - "COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.events FROM 's3://shredded/base/kind=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/kind=good/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", "COMMIT" ) @@ -62,8 +62,8 @@ class LoadSpec extends Specification { val expected = List( "BEGIN", - "COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.events FROM 's3://shredded/base/kind=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/kind=good/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", "ACK", "COMMIT" ) @@ -83,8 +83,8 @@ class LoadSpec extends Specification { val expected = List( "BEGIN", - "COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.events FROM 's3://shredded/base/kind=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/kind=good/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", ) val result = Load.load[Pure](SpecHelpers.validCliConfig, message).runS diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftLoaderSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftLoaderSpec.scala index 4eb2269da..2177691de 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftLoaderSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftLoaderSpec.scala @@ -98,10 +98,10 @@ class RedshiftLoaderSpec extends Specification { val (state, result) = RedshiftLoader.run[Pure](SpecHelpers.validConfig.copy(steps = Set(Step.Vacuum, Step.Analyze)), discovery).flatMap(identity).run val expected = List( - "COPY atomic.events FROM 's3://my-bucket/my-path/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_event_2 FROM 's3://my-bucket/my-path/vendor=com.acme/name=event/format=json/model=2' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/event_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_context_2 FROM 's3://my-bucket/my-path/vendor=com.acme/name=context/format=json/model=2' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/context_2.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_context_3 FROM 's3://my-bucket/my-path/vendor=com.acme/name=context/format=tsv/model=3' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.events FROM 's3://my-bucket/my-path/kind=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_event_2 FROM 's3://my-bucket/my-path/kind=good/vendor=com.acme/name=event/format=json/model=2' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/event_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_context_2 FROM 's3://my-bucket/my-path/kind=good/vendor=com.acme/name=context/format=json/model=2' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/context_2.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_context_3 FROM 's3://my-bucket/my-path/kind=good/vendor=com.acme/name=context/format=tsv/model=3' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", "VACUUM SORT ONLY atomic.events", "VACUUM SORT ONLY atomic.com_acme_event_2", "VACUUM SORT ONLY atomic.com_acme_context_2", diff --git a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/ShredJob.scala b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/ShredJob.scala index 57668d2eb..7cf5ba0df 100644 --- a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/ShredJob.scala +++ b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/ShredJob.scala @@ -18,13 +18,12 @@ import cats.Id import cats.implicits._ import io.circe.Json - import java.util.UUID import java.time.Instant // Spark import org.apache.spark.SparkContext -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.SparkSession // Snowplow import com.snowplowanalytics.snowplow.badrows.Processor @@ -80,8 +79,7 @@ class ShredJob(@transient val spark: SparkSession, eventsManifest: Option[EventsManifestConfig]): LoaderMessage.ShreddingComplete = { val jobStarted: Instant = Instant.now() val inputFolder: S3.Folder = S3.Folder.coerce(shredderConfig.input.toString).append(folderName) - val outFolder: S3.Folder = S3.Folder.coerce(shredderConfig.output.good.toString).append(folderName) - val badFolder: S3.Folder = S3.Folder.coerce(shredderConfig.output.bad.toString).append(folderName) + val outFolder: S3.Folder = S3.Folder.coerce(shredderConfig.output.path.toString).append(folderName) // Enriched TSV lines along with their shredded components val common = sc.textFile(inputFolder) @@ -148,16 +146,17 @@ class ShredJob(@transient val spark: SparkSession, }.cache() // Update the shredded JSONs with the new deduplicated event IDs and stringify - val shreddedData = shredded.flatMap(_.getOrElse(Nil)) + val shreddedData = shredded.flatMap { + case Right(shredded) => shredded + case Left(row) => List(Shredded.fromBadRow(row)) + } // Data that failed TSV transformation - val shreddedBad = (common.flatMap(_.swap.toOption) ++ shredded.flatMap(_.swap.toOption)).map(bad => Row(bad.compact)) + val shreddedBad = common.flatMap(_.swap.toOption.map(Shredded.fromBadRow).flatMap(_.json)) // Final output - Sink.writeShredded(spark, shredderConfig.output.compression, formats, shreddedData, outFolder) - - // Bad data - Sink.writeBad(spark, shredderConfig.output.compression, shreddedBad, badFolder) + Sink.writeShredded(spark, shredderConfig.output.compression, shreddedData.flatMap(_.tsv), outFolder) + Sink.writeShredded(spark, shredderConfig.output.compression, shreddedData.flatMap(_.json) ++ shreddedBad, outFolder) val shreddedTypes = shreddedTypesAccumulator.value.toList val batchTimestamps = timestampsAccumulator.value @@ -187,7 +186,7 @@ object ShredJob { val atomicLengths = EventUtils.getAtomicLengths(IgluSingleton.get(igluConfig).resolver).fold(err => throw err, identity) val enrichedFolder = Folder.coerce(shredderConfig.input.toString) - val shreddedFolder = Folder.coerce(shredderConfig.output.good.toString) + val shreddedFolder = Folder.coerce(shredderConfig.output.path.toString) val (incomplete, unshredded) = Discovery .getState(region, enrichedFolder, shreddedFolder) diff --git a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/spark/Sink.scala b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/spark/Sink.scala index 6d7998821..5b77f4316 100644 --- a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/spark/Sink.scala +++ b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/spark/Sink.scala @@ -15,41 +15,23 @@ package com.snowplowanalytics.snowplow.rdbloader.shredder.batch.spark import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SparkSession, Row, SaveMode, DataFrameWriter} -import org.apache.spark.sql.types.{StructField, StructType, StringType} +import org.apache.spark.sql.{SparkSession, SaveMode, DataFrameWriter} -import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage -import com.snowplowanalytics.snowplow.rdbloader.common.config.Config import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Shredder.Compression -import com.snowplowanalytics.snowplow.rdbloader.common.transformation.Shredded - object Sink { - def writeShredded(spark: SparkSession, compression: Compression, formats: Config.Formats, shreddedData: RDD[Shredded], outFolder: String): Unit = { - writeShredded(spark, compression, shreddedData.flatMap(_.tsv), outFolder) - val canBeJson = formats.default == LoaderMessage.Format.JSON || formats.json.nonEmpty - if (canBeJson) writeShredded(spark, compression, shreddedData.flatMap(_.json), outFolder) - } - - def writeShredded(spark: SparkSession, compression: Compression, data: RDD[(String, String, String, Int, String)], outFolder: String): Unit = { + def writeShredded(spark: SparkSession, compression: Compression, data: RDD[(String, String, String, String, Int, String)], outFolder: String): Unit = { import spark.implicits._ data - .toDF("vendor", "name", "format", "model", "data") + .toDF("kind", "vendor", "name", "format", "model", "data") .write .withCompression(compression) - .partitionBy("vendor", "name", "format", "model") + .partitionBy("kind", "vendor", "name", "format", "model") .mode(SaveMode.Append) .text(outFolder) } - def writeBad(spark: SparkSession, compression: Compression, shreddedBad: RDD[Row], outFolder: String): Unit = - spark.createDataFrame(shreddedBad, StructType(StructField("_", StringType, true) :: Nil)) - .write - .withCompression(compression) - .mode(SaveMode.Overwrite) - .text(outFolder) - private implicit class DataframeOps[A](w: DataFrameWriter[A]) { def withCompression(compression: Compression): DataFrameWriter[A] = compression match { diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/ShredJobSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/ShredJobSpec.scala index 57988d068..66383c540 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/ShredJobSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/ShredJobSpec.scala @@ -15,13 +15,13 @@ package com.snowplowanalytics.snowplow.rdbloader.shredder.batch import java.io.{FileWriter, IOException, File, BufferedWriter} +import java.util.Base64 import scala.collection.JavaConverters._ import scala.io.Source import scala.util.Random // Commons -import org.apache.commons.codec.binary.Base64 import org.apache.commons.io.filefilter.IOFileFilter import org.apache.commons.io.FileUtils import org.apache.commons.io.filefilter.TrueFileFilter @@ -81,7 +81,10 @@ object ShredJobSpec { } /** Case class representing the directories where the output of the job has been written. */ - case class OutputDirs(output: File, badRows: File) + case class OutputDirs(output: File) { + val goodRows: File = new File(output, "kind=good") + val badRows: File = new File(output, "kind=bad") + } /** * Read a part file at the given path into a List of Strings @@ -214,7 +217,7 @@ object ShredJobSpec { } private def storageConfig(shredder: Shredder.Batch, tsv: Boolean, jsonSchemas: List[SchemaCriterion]) = { - val encoder = new Base64(true) + val encoder = Base64.getUrlEncoder val format = if (tsv) "TSV" else "JSON" val jsonCriterions = jsonSchemas.map(x => s""""${x.asString}"""").mkString(",") val configPlain = s"""|{ @@ -228,8 +231,7 @@ object ShredJobSpec { | "type": "batch", | "input": "${shredder.input}", | "output" = { - | "good": "${shredder.output.good}", - | "bad": "${shredder.output.bad}", + | "path": "${shredder.output.path}", | "compression": "${shredder.output.compression.toString.toUpperCase}" | } |}, @@ -255,7 +257,7 @@ object ShredJobSpec { } private val igluConfigWithLocal = { - val encoder = new Base64(true) + val encoder = Base64.getUrlEncoder new String(encoder.encode( """|{ |"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0", @@ -327,7 +329,7 @@ object ShredJobSpec { def getShredder(lines: Lines, dirs: OutputDirs): Shredder.Batch = { val input = mkTmpFile("input", createParents = true, containing = Some(lines)) - Shredder.Batch(input.toURI, Shredder.Output(dirs.output.toURI, dirs.badRows.toURI, Shredder.Compression.None)) + Shredder.Batch(input.toURI, Shredder.Output(dirs.output.toURI, Shredder.Compression.None)) } } @@ -335,7 +337,7 @@ object ShredJobSpec { trait ShredJobSpec extends SparkSpec { import ShredJobSpec._ - val dirs = OutputDirs(randomFile("output"), randomFile("bad-rows")) + val dirs = OutputDirs(randomFile("output")) /** * Run the shred job with the specified lines as input. @@ -349,7 +351,7 @@ trait ShredJobSpec extends SparkSpec { ) val (dedupeConfigCli, dedupeConfig) = if (crossBatchDedupe) { - val encoder = new Base64(true) + val encoder = Base64.getUrlEncoder val encoded = new String(encoder.encode(duplicateStorageConfig.noSpaces.getBytes())) val config = SelfDescribingData.parse(duplicateStorageConfig).leftMap(_.code).flatMap(EventsManifestConfig.DynamoDb.extract).valueOr(e => throw new RuntimeException(e)) (Array("--duplicate-storage-config", encoded), Some(config)) diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/InvalidEnrichedEventsSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/InvalidEnrichedEventsSpec.scala index 7e5770988..81a3ccfa4 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/InvalidEnrichedEventsSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/InvalidEnrichedEventsSpec.scala @@ -55,7 +55,7 @@ class InvalidEnrichedEventsSpec extends Specification with ShredJobSpec { } "not write any jsons" in { - dirs.output must beEmptyDir + dirs.goodRows must beEmptyDir } } } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/InvalidJsonsSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/InvalidJsonsSpec.scala index 72848eaca..1c296e4a8 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/InvalidJsonsSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/InvalidJsonsSpec.scala @@ -54,7 +54,7 @@ class InvalidJsonsSpec extends Specification with ShredJobSpec { } "not write any jsons" in { - dirs.output must beEmptyDir + dirs.goodRows must beEmptyDir } } } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/MissingJsonSchemaSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/MissingJsonSchemaSpec.scala index d2e8605af..d4addf315 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/MissingJsonSchemaSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/MissingJsonSchemaSpec.scala @@ -240,7 +240,7 @@ class MissingJsonSchemaSpec extends Specification with ShredJobSpec { } "not write any jsons" in { - dirs.output must beEmptyDir + dirs.goodRows must beEmptyDir } } } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/NotEnrichedEventsSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/NotEnrichedEventsSpec.scala index f5c8eb11b..5bc3e1e2e 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/NotEnrichedEventsSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/NotEnrichedEventsSpec.scala @@ -47,7 +47,7 @@ class NotEnrichedEventsSpec extends Specification with ShredJobSpec { } "not write any jsons" in { - dirs.output must beEmptyDir + dirs.goodRows must beEmptyDir } } } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/SchemaValidationFailedSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/SchemaValidationFailedSpec.scala index e15780311..5e4fa1a84 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/SchemaValidationFailedSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/bad/SchemaValidationFailedSpec.scala @@ -52,7 +52,7 @@ class SchemaValidationFailedSpec extends Specification with ShredJobSpec { } "not write any jsons" in { - dirs.output must beEmptyDir + dirs.goodRows must beEmptyDir } } } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/CrossBatchDeduplicationSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/CrossBatchDeduplicationSpec.scala index 792edbfad..900224701 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/CrossBatchDeduplicationSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/CrossBatchDeduplicationSpec.scala @@ -234,12 +234,12 @@ class CrossBatchDeduplicationSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] "remove cross-batch duplicate and store left event in atomic events folder" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f lines.sorted mustEqual CrossBatchDeduplicationSpec.expected.events } "shred two unique events out of cross-batch and in-batch duplicates" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f val eventIds = lines.map(_.split("\t").apply(6)) eventIds must containTheSameElementsAs( @@ -247,7 +247,7 @@ class CrossBatchDeduplicationSpec extends Specification with ShredJobSpec { ) } "shred additional contexts into their appropriate path" in { - val Some((contexts, f)) = readPartFile(dirs.output, + val Some((contexts, f)) = readPartFile(dirs.goodRows, CrossBatchDeduplicationSpec.expected.additionalContextPath) expectedFiles += f contexts must containTheSameElementsAs(Seq(CrossBatchDeduplicationSpec.expected.additionalContextContents2, CrossBatchDeduplicationSpec.expected.additionalContextContents1)) @@ -259,7 +259,7 @@ class CrossBatchDeduplicationSpec extends Specification with ShredJobSpec { } "not shred any unexpected JSONs" in { - listFilesWithExclusions(dirs.output, expectedFiles.toList) must be empty + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must be empty } "not write any bad row JSONs" in { dirs.badRows must beEmptyDir diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/DerivedContextsSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/DerivedContextsSpec.scala index 2c58df8c0..9959a0b90 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/DerivedContextsSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/DerivedContextsSpec.scala @@ -65,19 +65,18 @@ class DerivedContextsSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] "transform the enriched event and store it in atomic events" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f lines mustEqual Seq(DerivedContextsSpec.expected.event) } "shred the website page_context into its appropriate path" in { - println(dirs.output.list().toList) - val Some((lines, f)) = readPartFile(dirs.output, DerivedContextsSpec.expected.path) + val Some((lines, f)) = readPartFile(dirs.goodRows, DerivedContextsSpec.expected.path) expectedFiles += f lines mustEqual Seq(DerivedContextsSpec.expected.contents) } "not shred any unexpected JSONs" in { - listFilesWithExclusions(dirs.output, expectedFiles.toList) must beEmpty + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must beEmpty } "not write any bad row JSONs" in { dirs.badRows must beEmptyDir diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/EmptySchemaSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/EmptySchemaSpec.scala index bc483e197..65011bca5 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/EmptySchemaSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/EmptySchemaSpec.scala @@ -27,7 +27,7 @@ class EmptySchemaSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] "transform the enriched event and store it in atomic folder" in { - readPartFile(dirs.output, ShredJobSpec.AtomicFolder) match { + readPartFile(dirs.goodRows, ShredJobSpec.AtomicFolder) match { case Some((lines, f)) => expectedFiles += f lines mustEqual Seq(EmptySchemaSpec.expected.event) @@ -36,7 +36,7 @@ class EmptySchemaSpec extends Specification with ShredJobSpec { } } "shred the context without any data into TSV with only metadata" in { - readPartFile(dirs.output, EmptySchemaSpec.expected.contextAPath) match { + readPartFile(dirs.goodRows, EmptySchemaSpec.expected.contextAPath) match { case Some((lines, f)) => expectedFiles += f lines mustEqual Seq(EmptySchemaSpec.expected.contexAContents) @@ -45,7 +45,7 @@ class EmptySchemaSpec extends Specification with ShredJobSpec { } } "shred the anything-b context with additional datat into TSV with only metadata" in { - readPartFile(dirs.output, EmptySchemaSpec.expected.contextBPath) match { + readPartFile(dirs.goodRows, EmptySchemaSpec.expected.contextBPath) match { case Some((lines, f)) => expectedFiles += f lines mustEqual Seq(EmptySchemaSpec.expected.contexBContents) @@ -54,7 +54,7 @@ class EmptySchemaSpec extends Specification with ShredJobSpec { } } "not shred any unexpected data" in { - listFilesWithExclusions(dirs.output, expectedFiles.toList) must beEmpty + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must beEmpty } "not write any bad rows" in { dirs.badRows must beEmptyDir diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/EventDeduplicationSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/EventDeduplicationSpec.scala index fabcadea0..c72858204 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/EventDeduplicationSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/EventDeduplicationSpec.scala @@ -134,13 +134,13 @@ class EventDeduplicationSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] "transform two enriched events and store them in atomic events folder" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f val updatedLines = lines.map(EventDeduplicationSpec.eraseEventId) updatedLines.sorted mustEqual EventDeduplicationSpec.expected.events } "shred two enriched events with deduplicated event ids" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f val eventIds = lines.map(_.split("\t").apply(6)) @@ -150,7 +150,7 @@ class EventDeduplicationSpec extends Specification with ShredJobSpec { exactTwoEventsIds.and(distinctIds) } "shred duplicate contexts into their appropriate path" in { - val Some((contexts, f)) = readPartFile(dirs.output, EventDeduplicationSpec.expected.path) + val Some((contexts, f)) = readPartFile(dirs.goodRows, EventDeduplicationSpec.expected.path) expectedFiles += f val updatedLines = contexts.map(EventDeduplicationSpec.eraseHierarchy) updatedLines must containTheSameElementsAs(Seq(EventDeduplicationSpec.expected.contents, EventDeduplicationSpec.expected.contents)) @@ -158,7 +158,7 @@ class EventDeduplicationSpec extends Specification with ShredJobSpec { } "shred additional, non-duplicate contexts into their appropriate path" in { val Some((contexts, f)) = - readPartFile(dirs.output, EventDeduplicationSpec.expected.additionalContextPath) + readPartFile(dirs.goodRows, EventDeduplicationSpec.expected.additionalContextPath) expectedFiles += f val rootIds = contexts.map(EventDeduplicationSpec.getRootId) @@ -173,7 +173,7 @@ class EventDeduplicationSpec extends Specification with ShredJobSpec { } "not shred any unexpected JSONs" in { - listFilesWithExclusions(dirs.output, expectedFiles.toList) must beEmpty + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must beEmpty } "not write any bad row JSONs" in { dirs.badRows must beEmptyDir diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/ForwardCompatibleContextSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/ForwardCompatibleContextSpec.scala index bedb4acac..498ca9867 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/ForwardCompatibleContextSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/ForwardCompatibleContextSpec.scala @@ -65,18 +65,18 @@ class ForwardCompatibleContextSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] "transform the enriched event and store it in atomic events folder" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f lines mustEqual Seq(ForwardCompatibleContextSpec.expected.event) } "shred the website page_context into its appropriate path" in { - val Some((lines, f)) = readPartFile(dirs.output, ForwardCompatibleContextSpec.expected.path) + val Some((lines, f)) = readPartFile(dirs.goodRows, ForwardCompatibleContextSpec.expected.path) expectedFiles += f lines mustEqual Seq(ForwardCompatibleContextSpec.expected.contents) } "not shred any unexpected JSONs" in { - listFilesWithExclusions(dirs.output, expectedFiles.toList) must be empty + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must be empty } "not write any bad row JSONs" in { dirs.badRows must beEmptyDir diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/LinkClickEventSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/LinkClickEventSpec.scala index cdcd88a8a..2521365a6 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/LinkClickEventSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/LinkClickEventSpec.scala @@ -62,18 +62,18 @@ class LinkClickEventSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] "transform the enriched event and store it in atomic folder" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f lines mustEqual Seq(LinkClickEventSpec.expected.event) } "shred the Snowplow link_click event into its appropriate path" in { - val Some((lines, f)) = readPartFile(dirs.output, LinkClickEventSpec.expected.path) + val Some((lines, f)) = readPartFile(dirs.goodRows, LinkClickEventSpec.expected.path) expectedFiles += f lines mustEqual Seq(LinkClickEventSpec.expected.contents) } "not shred any unexpected JSONs" in { - listFilesWithExclusions(dirs.output, expectedFiles.toList) must be empty + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must be empty } "not write any bad row JSONs" in { dirs.badRows must beEmptyDir diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/MultipleJsonsSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/MultipleJsonsSpec.scala index e390a4eb0..22b57fba6 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/MultipleJsonsSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/MultipleJsonsSpec.scala @@ -39,23 +39,23 @@ class MultipleJsonsSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] "transform the enriched event and store it in atomic events folder" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f lines mustEqual Seq(MultipleJsonsSpec.expectedAtomicEvent) } "shred the Snowplow link_click event into its appropriate path" in { - val Some((lines, f)) = readPartFile(dirs.output, MultipleJsonsSpec.expectedEvent.path) + val Some((lines, f)) = readPartFile(dirs.goodRows, MultipleJsonsSpec.expectedEvent.path) expectedFiles += f lines mustEqual Seq(MultipleJsonsSpec.expectedEvent.contents) } "shred the website page_context into its appropriate path" in { - val Some((lines, f)) = readPartFile(dirs.output, MultipleJsonsSpec.expectedContext.path) + val Some((lines, f)) = readPartFile(dirs.goodRows, MultipleJsonsSpec.expectedContext.path) expectedFiles += f lines mustEqual Seq(MultipleJsonsSpec.expectedContext.contents) } "not shred any unexpected JSONs" in { - listFilesWithExclusions(dirs.output, expectedFiles.toList) must be empty + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must be empty } "not write any bad row JSONs" in { dirs.badRows must beEmptyDir diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/WebsitePageContextSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/WebsitePageContextSpec.scala index 190be7b07..37e319fec 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/WebsitePageContextSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/WebsitePageContextSpec.scala @@ -64,18 +64,18 @@ class WebsitePageContextSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] "transform the enriched event and store it in atomic events folder" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f lines mustEqual Seq(WebsitePageContextSpec.expected.event) } "shred the website page_context into its appropriate path" in { - val Some((lines, f)) = readPartFile(dirs.output, WebsitePageContextSpec.expected.path) + val Some((lines, f)) = readPartFile(dirs.goodRows, WebsitePageContextSpec.expected.path) expectedFiles += f lines mustEqual Seq(WebsitePageContextSpec.expected.contents) } "not shred any unexpected JSONs" in { - listFilesWithExclusions(dirs.output, expectedFiles.toList) must beEmpty + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must beEmpty } "not write any bad row JSONs" in { dirs.badRows must beEmptyDir diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/tabular/NewlineSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/tabular/NewlineSpec.scala index ff5b83068..e8e492b7e 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/tabular/NewlineSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/tabular/NewlineSpec.scala @@ -30,17 +30,17 @@ class NewlineSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] "transform the enriched event and store it in atomic events folder" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f lines mustEqual Seq(NewlineSpec.expected.event) } "shred the page_context TSV into its appropriate path" in { - val Some((lines, f)) = readPartFile(dirs.output, NewlineSpec.expected.contextPath) + val Some((lines, f)) = readPartFile(dirs.goodRows, NewlineSpec.expected.contextPath) expectedFiles += f lines mustEqual Seq(NewlineSpec.expected.contextContents) } "not shred any unexpected data" in { - listFilesWithExclusions(dirs.output, expectedFiles.toList) must beEmpty + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must beEmpty } "not write any bad rows" in { dirs.badRows must beEmptyDir diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/tabular/TabularOutputSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/tabular/TabularOutputSpec.scala index 609525dd5..ca692077f 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/tabular/TabularOutputSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/batch/good/tabular/TabularOutputSpec.scala @@ -29,22 +29,22 @@ class TabularOutputSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] "transform the enriched event and store it in atomic events folder" in { - val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) + val Some((lines, f)) = readPartFile(dirs.goodRows, AtomicFolder) expectedFiles += f lines mustEqual Seq(TabularOutputSpec.expected.event) } "shred the page_context TSV into its appropriate path" in { - val Some((lines, f)) = readPartFile(dirs.output, TabularOutputSpec.expected.contextPath) + val Some((lines, f)) = readPartFile(dirs.goodRows, TabularOutputSpec.expected.contextPath) expectedFiles += f lines mustEqual Seq(TabularOutputSpec.expected.contextContents) } "shred the application_error TSV into its appropriate path" in { - val Some((lines, f)) = readPartFile(dirs.output, TabularOutputSpec.expected.eventPath) + val Some((lines, f)) = readPartFile(dirs.goodRows, TabularOutputSpec.expected.eventPath) expectedFiles += f lines mustEqual Seq(TabularOutputSpec.expected.eventContents) } "not shred any unexpected data" in { - listFilesWithExclusions(dirs.output, expectedFiles.toList) must beEmpty + listFilesWithExclusions(dirs.goodRows, expectedFiles.toList) must beEmpty } "not write any bad rows" in { dirs.badRows must beEmptyDir diff --git a/modules/stream-shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/stream/Processing.scala b/modules/stream-shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/stream/Processing.scala index 6ebfbdf5c..445a6e474 100644 --- a/modules/stream-shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/stream/Processing.scala +++ b/modules/stream-shredder/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/shredder/stream/Processing.scala @@ -13,10 +13,9 @@ import io.circe.Json import org.typelevel.log4cats.slf4j.Slf4jLogger import com.snowplowanalytics.iglu.client.Client -import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey} +import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.snowplow.badrows.Processor -import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.Format import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Shredder.Compression import com.snowplowanalytics.snowplow.rdbloader.common.{S3, Common} import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.{Shredder, Formats} @@ -44,7 +43,7 @@ object Processing { val windowing: Pipe[F, ParsedF[F], Windowed[F, Parsed]] = Record.windowed(Window.fromNow[F](config.windowing.toMinutes.toInt)) val onComplete: Window => F[Unit] = - getOnComplete(config.output.compression, isTabular, config.output.good, queueName)(resources.windows) + getOnComplete(config.output.compression, isTabular, config.output.path, queueName)(resources.windows) val sinkId: Window => F[Int] = getSinkId(resources.windows) @@ -133,10 +132,9 @@ object Processing { } Stream.eval(shreddedRecord).flatMap { case Record.Data(window, checkpoint, Right(shredded)) => - Record.mapWithLast(shredded)(s => Record.Data(window, None, s.splitGood), s => Record.Data(window, checkpoint, s.splitGood)) + Record.mapWithLast(shredded)(s => Record.Data(window, None, s.split), s => Record.Data(window, checkpoint, s.split)) case Record.Data(window, checkpoint, Left(badRow)) => - val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey - Stream.emit(Record.Data(window, checkpoint, (Shredded.Path(false, vendor, name, Format.JSON, model), Shredded.Data(badRow.compact)))) + Stream.emit(Record.Data(window, checkpoint, Shredded.fromBadRow(badRow).split)) case Record.EndWindow(window, next, checkpoint) => Stream.emit(Record.EndWindow[F, Window, (Shredded.Path, Shredded.Data)](window, next, checkpoint)) }