Skip to content

Commit

Permalink
Common: merge good and bad output folders (close #358)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Mar 24, 2021
1 parent 0f3727e commit fb54708
Show file tree
Hide file tree
Showing 30 changed files with 110 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{ShreddedTy
*/
object Common {

val GoodPrefix = "kind=good"

val AtomicSchema: SchemaKey =
SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0))
val AtomicType = ShreddedType(AtomicSchema, Format.TSV)
val AtomicPath: String = entityPath(AtomicType)

def entityPath(entity: ShreddedType) =
s"vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}"
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}"

/**
* Remove all occurrences of access key id and secret access key from message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object Config {
case class File(dir: String) extends StreamInput
}

case class Output(good: URI, bad: URI, compression: Compression)
case class Output(path: URI, compression: Compression)

sealed trait Compression extends StringEnum
object Compression {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.common.transformation

import cats.{Show, Monad}
import cats.data.{EitherT, NonEmptyList}
import cats.{Monad, Show}
import cats.implicits._

import cats.effect.Clock

import io.circe.{Json => CJson}

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}

import com.snowplowanalytics.iglu.client.{Resolver, Client}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
Expand All @@ -34,24 +34,26 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.Format

/** ADT, representing possible forms of data in blob storage */
sealed trait Shredded {
def isGood: Boolean
def vendor: String
def name: String
def format: Format
def model: Int
def data: String

def json: Option[(String, String, String, Int, String)] = this match {
case Shredded.Json(vendor, name, version, data) => Some((vendor, name, Format.JSON.path, version, data))
case Shredded.Tabular(_, _, _, _) => None
def json: Option[(String, String, String, String, Int, String)] = this match {
case _: Shredded.Json if isGood => Some(("good", vendor, name, format.path, model, data))
case _: Shredded.Json => Some(("bad", vendor, name, format.path, model, data))
case _: Shredded.Tabular => None
}

def tsv: Option[(String, String, String, Int, String)] = this match {
case _: Shredded.Tabular => Some((vendor, name, format.path, model, data))
def tsv: Option[(String, String, String, String, Int, String)] = this match {
case _: Shredded.Tabular => Some(("good", vendor, name, format.path, model, data))
case _: Shredded.Json => None
}

def splitGood: (Shredded.Path, Shredded.Data) =
(Shredded.Path(true, vendor, name, format, model), Shredded.Data(data))
def split: (Shredded.Path, Shredded.Data) =
(Shredded.Path(isGood, vendor, name, format, model), Shredded.Data(data))
}

object Shredded {
Expand All @@ -65,16 +67,23 @@ object Shredded {
}
}

def fromBadRow(badRow: BadRow): Shredded = {
val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey
val data = Shredded.Data(badRow.compact)
Json(false, vendor, name, model, data.value)
}

implicit val pathShow: Show[Path] =
Show(_.getDir)

/** Data will be represented as JSON, with RDB Loader loading it using JSON Paths. Legacy format */
case class Json(vendor: String, name: String, model: Int, data: String) extends Shredded {
case class Json(isGood: Boolean, vendor: String, name: String, model: Int, data: String) extends Shredded {
val format: Format = Format.JSON
}

/** Data will be represented as TSV, with RDB Loader loading it directly */
case class Tabular(vendor: String, name: String, model: Int, data: String) extends Shredded {
val isGood = true // We don't support TSV shredding for bad data
val format: Format = Format.TSV
}

Expand All @@ -96,7 +105,7 @@ object Shredded {
Tabular(vendor, name, hierarchy.entity.schema.version.model, (meta ++ columns).mkString("\t"))
}
} else
EitherT.pure[F, FailureDetails.LoaderIgluError](Json(vendor, name, hierarchy.entity.schema.version.model, hierarchy.dumpJson))
EitherT.pure[F, FailureDetails.LoaderIgluError](Json(true, vendor, name, hierarchy.entity.schema.version.model, hierarchy.dumpJson))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ object CommonSpec {
URI.create("s3://bucket/input/"),
Config.Shredder.Output(
URI.create("s3://bucket/good/"),
URI.create("s3://bucket/bad/"),
Config.Shredder.Compression.Gzip
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class ConfigSpec extends Specification {
"type": "batch",
"input": "s3://bucket/input/",
"output" = {
"good": "s3://bucket/good/",
"bad": "s3://bucket/bad/",
"path": "s3://bucket/good/",
"compression": "GZIP"
}
},
Expand Down Expand Up @@ -119,7 +118,7 @@ class ConfigSpec extends Specification {
"type": "batch",
"input": "s3://bucket/input/",
"output" = {
"good": "s3://bucket/good/",
"path": "s3://bucket/good/",
"bad": "s3://bucket/bad/",
"compression": "GZIP"
}
Expand Down Expand Up @@ -231,7 +230,6 @@ object ConfigSpec {
URI.create("s3://bucket/input/"),
Shredder.Output(
URI.create("s3://bucket/good/"),
URI.create("s3://bucket/bad/"),
Config.Shredder.Compression.Gzip
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object ShreddedType {
*/
final case class Json(info: Info, jsonPaths: S3.Key) extends ShreddedType {
def getLoadPath: String =
s"${info.base}vendor=${info.vendor}/name=${info.name}/format=json/model=${info.model}"
s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=json/model=${info.model}"

def show: String = s"${info.toCriterion.asString} ($jsonPaths)"
}
Expand All @@ -69,7 +69,7 @@ object ShreddedType {
*/
final case class Tabular(info: Info) extends ShreddedType {
def getLoadPath: String =
s"${info.base}vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.model}"
s"${info.base}${Common.GoodPrefix}/vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.model}"

def show: String = s"${info.toCriterion.asString} TSV"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.snowplowanalytics.snowplow.rdbloader.loading.RedshiftStatements._
import com.snowplowanalytics.snowplow.rdbloader.loading.Load.SqlString
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Shredder.Compression
import com.snowplowanalytics.snowplow.rdbloader.common.config.StorageTarget.Redshift
import com.snowplowanalytics.snowplow.rdbloader.common.config.{Step, Config}
import com.snowplowanalytics.snowplow.rdbloader.common.config.{Config, Step}


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ object SpecHelpers {
URI.create("s3://bucket/input/"),
Config.Shredder.Output(
URI.create("s3://bucket/good/"),
URI.create("s3://bucket/bad/"),
Config.Shredder.Compression.Gzip
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ object CliConfigSpec {
"type": "batch",
"input": "s3://bucket/input/",
"output" = {
"good": "s3://bucket/good/",
"bad": "s3://bucket/bad/",
"path": "s3://bucket/good/",
"compression": "GZIP"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class LoadSpec extends Specification {

val expected = List(
"BEGIN",
"COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.events FROM 's3://shredded/base/kind=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/kind=good/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COMMIT"
)

Expand All @@ -62,8 +62,8 @@ class LoadSpec extends Specification {

val expected = List(
"BEGIN",
"COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.events FROM 's3://shredded/base/kind=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/kind=good/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"ACK",
"COMMIT"
)
Expand All @@ -83,8 +83,8 @@ class LoadSpec extends Specification {

val expected = List(
"BEGIN",
"COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.events FROM 's3://shredded/base/kind=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/kind=good/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
)

val result = Load.load[Pure](SpecHelpers.validCliConfig, message).runS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ class RedshiftLoaderSpec extends Specification {
val (state, result) = RedshiftLoader.run[Pure](SpecHelpers.validConfig.copy(steps = Set(Step.Vacuum, Step.Analyze)), discovery).flatMap(identity).run

val expected = List(
"COPY atomic.events FROM 's3://my-bucket/my-path/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_event_2 FROM 's3://my-bucket/my-path/vendor=com.acme/name=event/format=json/model=2' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/event_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_context_2 FROM 's3://my-bucket/my-path/vendor=com.acme/name=context/format=json/model=2' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/context_2.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_context_3 FROM 's3://my-bucket/my-path/vendor=com.acme/name=context/format=tsv/model=3' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.events FROM 's3://my-bucket/my-path/kind=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_event_2 FROM 's3://my-bucket/my-path/kind=good/vendor=com.acme/name=event/format=json/model=2' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/event_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_context_2 FROM 's3://my-bucket/my-path/kind=good/vendor=com.acme/name=context/format=json/model=2' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/context_2.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_context_3 FROM 's3://my-bucket/my-path/kind=good/vendor=com.acme/name=context/format=tsv/model=3' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"VACUUM SORT ONLY atomic.events",
"VACUUM SORT ONLY atomic.com_acme_event_2",
"VACUUM SORT ONLY atomic.com_acme_context_2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ import cats.Id
import cats.implicits._

import io.circe.Json

import java.util.UUID
import java.time.Instant

// Spark
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.SparkSession

// Snowplow
import com.snowplowanalytics.snowplow.badrows.Processor
Expand Down Expand Up @@ -80,8 +79,7 @@ class ShredJob(@transient val spark: SparkSession,
eventsManifest: Option[EventsManifestConfig]): LoaderMessage.ShreddingComplete = {
val jobStarted: Instant = Instant.now()
val inputFolder: S3.Folder = S3.Folder.coerce(shredderConfig.input.toString).append(folderName)
val outFolder: S3.Folder = S3.Folder.coerce(shredderConfig.output.good.toString).append(folderName)
val badFolder: S3.Folder = S3.Folder.coerce(shredderConfig.output.bad.toString).append(folderName)
val outFolder: S3.Folder = S3.Folder.coerce(shredderConfig.output.path.toString).append(folderName)

// Enriched TSV lines along with their shredded components
val common = sc.textFile(inputFolder)
Expand Down Expand Up @@ -148,16 +146,17 @@ class ShredJob(@transient val spark: SparkSession,
}.cache()

// Update the shredded JSONs with the new deduplicated event IDs and stringify
val shreddedData = shredded.flatMap(_.getOrElse(Nil))
val shreddedData = shredded.flatMap {
case Right(shredded) => shredded
case Left(row) => List(Shredded.fromBadRow(row))
}

// Data that failed TSV transformation
val shreddedBad = (common.flatMap(_.swap.toOption) ++ shredded.flatMap(_.swap.toOption)).map(bad => Row(bad.compact))
val shreddedBad = common.flatMap(_.swap.toOption.map(Shredded.fromBadRow).flatMap(_.json))

// Final output
Sink.writeShredded(spark, shredderConfig.output.compression, formats, shreddedData, outFolder)

// Bad data
Sink.writeBad(spark, shredderConfig.output.compression, shreddedBad, badFolder)
Sink.writeShredded(spark, shredderConfig.output.compression, shreddedData.flatMap(_.tsv), outFolder)
Sink.writeShredded(spark, shredderConfig.output.compression, shreddedData.flatMap(_.json) ++ shreddedBad, outFolder)

val shreddedTypes = shreddedTypesAccumulator.value.toList
val batchTimestamps = timestampsAccumulator.value
Expand Down Expand Up @@ -187,7 +186,7 @@ object ShredJob {
val atomicLengths = EventUtils.getAtomicLengths(IgluSingleton.get(igluConfig).resolver).fold(err => throw err, identity)

val enrichedFolder = Folder.coerce(shredderConfig.input.toString)
val shreddedFolder = Folder.coerce(shredderConfig.output.good.toString)
val shreddedFolder = Folder.coerce(shredderConfig.output.path.toString)

val (incomplete, unshredded) = Discovery
.getState(region, enrichedFolder, shreddedFolder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,23 @@
package com.snowplowanalytics.snowplow.rdbloader.shredder.batch.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SparkSession, Row, SaveMode, DataFrameWriter}
import org.apache.spark.sql.types.{StructField, StructType, StringType}
import org.apache.spark.sql.{SparkSession, SaveMode, DataFrameWriter}

import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config
import com.snowplowanalytics.snowplow.rdbloader.common.config.Config.Shredder.Compression

import com.snowplowanalytics.snowplow.rdbloader.common.transformation.Shredded

object Sink {

def writeShredded(spark: SparkSession, compression: Compression, formats: Config.Formats, shreddedData: RDD[Shredded], outFolder: String): Unit = {
writeShredded(spark, compression, shreddedData.flatMap(_.tsv), outFolder)
val canBeJson = formats.default == LoaderMessage.Format.JSON || formats.json.nonEmpty
if (canBeJson) writeShredded(spark, compression, shreddedData.flatMap(_.json), outFolder)
}

def writeShredded(spark: SparkSession, compression: Compression, data: RDD[(String, String, String, Int, String)], outFolder: String): Unit = {
def writeShredded(spark: SparkSession, compression: Compression, data: RDD[(String, String, String, String, Int, String)], outFolder: String): Unit = {
import spark.implicits._
data
.toDF("vendor", "name", "format", "model", "data")
.toDF("kind", "vendor", "name", "format", "model", "data")
.write
.withCompression(compression)
.partitionBy("vendor", "name", "format", "model")
.partitionBy("kind", "vendor", "name", "format", "model")
.mode(SaveMode.Append)
.text(outFolder)
}

def writeBad(spark: SparkSession, compression: Compression, shreddedBad: RDD[Row], outFolder: String): Unit =
spark.createDataFrame(shreddedBad, StructType(StructField("_", StringType, true) :: Nil))
.write
.withCompression(compression)
.mode(SaveMode.Overwrite)
.text(outFolder)

private implicit class DataframeOps[A](w: DataFrameWriter[A]) {
def withCompression(compression: Compression): DataFrameWriter[A] =
compression match {
Expand Down
Loading

0 comments on commit fb54708

Please sign in to comment.