Skip to content

Commit

Permalink
Merge 869d700 into 66ea6cd
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Jul 31, 2023
2 parents 66ea6cd + 869d700 commit 961f19d
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@
# "fileFormat": "json"
#}

# Schemas that won't be loaded
# Optional, default value []
"skipSchemas": [
"iglu:com.acme/skipped1/jsonschema/1-0-0",
"iglu:com.acme/skipped2/jsonschema/1-0-*",
"iglu:com.acme/skipped3/jsonschema/1-*-*",
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

# Specifies interval Transformer will work on
"runInterval": {
# Optional, Transformer will start to process after given timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"tsv": [],
"skip": []
}
"skipSchemas": []
"monitoring": {
"metrics": {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import io.circe._
import io.circe.generic.semiauto._

import scala.concurrent.duration.FiniteDuration
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.rdbloader.common._
import com.snowplowanalytics.snowplow.rdbloader.common.config.args.HoconOrPath
import com.snowplowanalytics.snowplow.rdbloader.common.config.{ConfigUtils, TransformerConfig}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
Expand All @@ -30,6 +31,7 @@ final case class Config(
deduplication: Config.Deduplication,
runInterval: Config.RunInterval,
featureFlags: TransformerConfig.FeatureFlags,
skipSchemas: List[SchemaCriterion],
validations: TransformerConfig.Validations
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class ShredJob[T](

LoaderMessage.ShreddingComplete(
outFolder,
transformer.typesInfo,
transformer.typesInfo(config.skipSchemas),
timestamps,
config.output.compression,
MessageProcessor,
Expand Down Expand Up @@ -231,7 +231,9 @@ class TypeAccumJob(@transient val spark: SparkSession, config: Config) extends S
}
.flatMap {
case Right(event) =>
event.inventory.map(TypesAccumulator.wideRowTypeConverter)
event.inventory
.filterNot(t => Transformer.inSkipSchemas(config.skipSchemas, t.schemaKey))
.map(TypesAccumulator.wideRowTypeConverter)
case Left(_) =>
List.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.util.LongAccumulator

// Snowplow
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer}
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage._
Expand All @@ -50,7 +50,7 @@ sealed trait Transformer[T] extends Product with Serializable {
def badTransform(badRow: BadRow, badEventsCounter: LongAccumulator): Transformed
def typesAccumulator: TypesAccumulator[T]
def timestampsAccumulator: TimestampsAccumulator
def typesInfo: TypesInfo
def typesInfo(skipSchemas: List[SchemaCriterion]): TypesInfo
def sink(
sc: SparkSession,
compression: TransformerConfig.Compression,
Expand Down Expand Up @@ -107,7 +107,10 @@ object Transformer {
Transformed.Shredded.Json(false, vendor, name, model, data)
}

def typesInfo: TypesInfo = TypesInfo.Shredded(typesAccumulator.value.toList)
def typesInfo(skipSchemas: List[SchemaCriterion]): TypesInfo = {
val types = typesAccumulator.value.toList.filterNot(t => inSkipSchemas(skipSchemas, t.schemaKey))
TypesInfo.Shredded(types)
}

def sink(
spark: SparkSession,
Expand Down Expand Up @@ -145,8 +148,10 @@ object Transformer {
Transformed.WideRow(false, data)
}

def typesInfo: TypesInfo =
TypesInfo.WideRow(TypesInfo.WideRow.WideRowFormat.JSON, typesAccumulator.value.toList)
def typesInfo(skipSchemas: List[SchemaCriterion]): TypesInfo = {
val types = typesAccumulator.value.toList.filterNot(t => inSkipSchemas(skipSchemas, t.schemaKey))
TypesInfo.WideRow(TypesInfo.WideRow.WideRowFormat.JSON, types)
}

def sink(
spark: SparkSession,
Expand Down Expand Up @@ -188,8 +193,10 @@ object Transformer {
Transformed.WideRow(false, data)
}

def typesInfo: TypesInfo =
TypesInfo.WideRow(TypesInfo.WideRow.WideRowFormat.PARQUET, typesAccumulator.value.toList)
def typesInfo(skipSchemas: List[SchemaCriterion]): TypesInfo = {
val types = typesAccumulator.value.toList.filterNot(t => inSkipSchemas(skipSchemas, t.schemaKey))
TypesInfo.WideRow(TypesInfo.WideRow.WideRowFormat.PARQUET, types)
}

def sink(
spark: SparkSession,
Expand Down Expand Up @@ -245,4 +252,7 @@ object Transformer {
}

}

def inSkipSchemas(skipSchemas: List[SchemaCriterion], schemaKey: SchemaKey): Boolean =
skipSchemas.exists(_.matches(schemaKey))
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Schema for client geolocation contexts",
"self": {
"vendor": "com.snowplowanalytics.snowplow",
"name": "geolocationContext",
"format": "jsonschema",
"version": "1-0-0"
},

"type": "object",
"properties": {
"latitude": {
"type": "number",
"minimum": -90,
"maximum": 90
},
"longitude": {
"type": "number",
"minimum": -180,
"maximum": 180
},
"latitudeLongitudeAccuracy": {
"type": "number"
},
"altitude": {
"type": "number"
},
"altitudeAccuracy": {
"type": "number"
},
"bearing": {
"type": "number"
},
"speed": {
"type": "number"
}
},
"required": ["latitude", "longitude"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ConfigSpec extends Specification {
exampleDeduplication,
exampleRunInterval,
exampleDefaultFeatureFlags,
exampleSkipSchemas,
exampleValidations
)
result must beRight(expected)
Expand All @@ -51,6 +52,7 @@ class ConfigSpec extends Specification {
exampleDeduplication,
emptyRunInterval,
exampleDefaultFeatureFlags,
Nil,
emptyValidations
)
result must beRight(expected)
Expand Down Expand Up @@ -156,6 +158,12 @@ object TransformerConfigSpec {
val exampleDefaultFeatureFlags = TransformerConfig.FeatureFlags(false, None, false, false)
val exampleValidations = Validations(Some(Instant.parse("2021-11-18T11:00:00.00Z")))
val emptyValidations = Validations(None)
val exampleSkipSchemas = List(
SchemaCriterion("com.acme", "skipped1", "jsonschema", Some(1), Some(0), Some(0)),
SchemaCriterion("com.acme", "skipped2", "jsonschema", Some(1), Some(0), None),
SchemaCriterion("com.acme", "skipped3", "jsonschema", Some(1), None, None),
SchemaCriterion("com.acme", "skipped4", "jsonschema", None, None, None)
)

def getConfigFromResource[A](resourcePath: String, parse: HoconOrPath => Either[String, A]): Either[String, A] =
parse(Right(pathOf(resourcePath)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ import io.circe.parser.{parse => parseCirce}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse}

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits._
import com.snowplowanalytics.iglu.core.SchemaCriterion

import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifestConfig

Expand Down Expand Up @@ -262,6 +261,7 @@ object ShredJobSpec {
val encoder = Base64.getUrlEncoder
val format = if (tsv) "TSV" else "JSON"
val jsonCriterions = jsonSchemas.map(x => s""""${x.asString}"""").mkString(",")
val skipSchemas = shredder.skipSchemas.map(x => s""""${x.asString}"""").mkString(",")
val formatsSection = wideRow match {
case Some(WideRow.PARQUET) =>
s"""
Expand Down Expand Up @@ -314,6 +314,7 @@ object ShredJobSpec {
|"validations": {
| "minimumTimestamp": "0000-01-02T00:00:00.00Z"
|}
|"skipSchemas": [$skipSchemas]
|"monitoring": {"snowplow": null, "sentry": null}
|}""".stripMargin
new String(encoder.encode(configPlain.getBytes()))
Expand Down Expand Up @@ -418,10 +419,14 @@ object ShredJobSpec {
case None => s"Environment variable [$envvar] is not available".invalidNel
}

def inSkipSchemas(skipSchemas: List[SchemaCriterion], schemaKey: SchemaKey): Boolean =
skipSchemas.exists(_.matches(schemaKey))

def getShredder(
events: Events,
dirs: OutputDirs,
deduplication: Config.Deduplication
deduplication: Config.Deduplication,
skipSchemas: List[SchemaCriterion]
): Config = {
val input = events match {
case r: ResourceFile => r.toUri
Expand All @@ -443,6 +448,7 @@ object ShredJobSpec {
deduplication,
Config.RunInterval(None, None, None),
TransformerConfig.FeatureFlags(false, None, false, false),
skipSchemas,
TransformerConfig.Validations(None)
)
}
Expand All @@ -468,9 +474,10 @@ trait ShredJobSpec extends SparkSpec {
jsonSchemas: List[SchemaCriterion] = Nil,
wideRow: Option[WideRow] = None,
outputDirs: Option[OutputDirs] = None,
deduplication: Config.Deduplication = Config.Deduplication(Config.Deduplication.Synthetic.Broadcast(1), true)
deduplication: Config.Deduplication = Config.Deduplication(Config.Deduplication.Synthetic.Broadcast(1), true),
skipSchemas: List[SchemaCriterion] = Nil
): LoaderMessage.ShreddingComplete = {
val shredder = getShredder(events, outputDirs.getOrElse(dirs), deduplication)
val shredder = getShredder(events, outputDirs.getOrElse(dirs), deduplication, skipSchemas)
val config = Array(
"--iglu-config",
igluConfigWithLocal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.transformer.batch.good.widerow

import com.snowplowanalytics.iglu.core.SchemaCriterion

import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage._
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.ShredJobSpec
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.ShredJobSpec._

Expand All @@ -15,11 +18,31 @@ import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.

import org.specs2.mutable.Specification

class WideRowJsonSpec extends Specification with ShredJobSpec {
abstract class WideRowJsonSpec extends Specification with ShredJobSpec {
override def appName = "wide-row"
def skipSchemas: List[SchemaCriterion]
def inputEventsPath: String
sequential
"A job which is configured for wide row json output" should {
runShredJob(events = ResourceFile("/widerow/json/input-events"), wideRow = Some(WideRow.JSON))
val shreddingComplete = runShredJob(
events = ResourceFile(inputEventsPath),
wideRow = Some(WideRow.JSON),
skipSchemas = skipSchemas
)

def schemaCount: Int =
shreddingComplete.typesInfo match {
case t: TypesInfo.Shredded => t.types.size
case t: TypesInfo.WideRow => t.types.size
}

def containSkipSchemas: Boolean = {
val schemas = shreddingComplete.typesInfo match {
case l: TypesInfo.Shredded => l.types.map(_.schemaKey)
case l: TypesInfo.WideRow => l.types.map(_.schemaKey)
}
schemas.exists(s => ShredJobSpec.inSkipSchemas(skipSchemas, s))
}

"transform the enriched event to wide row json" in {
val Some((lines, _)) = readPartFile(dirs.goodRows)
Expand All @@ -33,5 +56,22 @@ class WideRowJsonSpec extends Specification with ShredJobSpec {
.map(_.replace(VersionPlaceholder, BuildInfo.version))
lines.toSet mustEqual (expected.toSet)
}

"shouldn't contain skipped schemas" in {
schemaCount mustEqual (21 - skipSchemas.size)
containSkipSchemas must beFalse
}
}
}

class PlainWideRowJsonSpec extends WideRowJsonSpec {
def skipSchemas = Nil
def inputEventsPath: String = "/widerow/json/input-events"
}

class SkipSchemasWideRowJsonSpec extends WideRowJsonSpec {
def skipSchemas = List(
SchemaCriterion("com.snowplowanalytics.snowplow", "geolocation_context", "jsonschema", Some(1))
)
def inputEventsPath: String = "/widerow/json/input-events-skip-schemas"
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.apache.spark.sql.types._
import io.circe.{Json, JsonObject}
import io.circe.syntax._

import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats.WideRow
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.ShredJobSpec
Expand Down Expand Up @@ -80,6 +81,27 @@ class WideRowParquetSpec extends Specification with ShredJobSpec {
}
}

"A job which is configured for wide row parquet output with skipSchemas" should {
val skipSchemas = List(
SchemaCriterion("com.snowplowanalytics.snowplow", "geolocationContext", "jsonschema", Some(1))
)
val testOutputDirs = OutputDirs(randomFile("output"))
runShredJob(
events = ResourceFile("/widerow/parquet/input-events-skip-schemas"),
wideRow = Some(WideRow.PARQUET),
outputDirs = Some(testOutputDirs),
skipSchemas = skipSchemas
)

"transform the enriched event to wide row parquet" in {
val lines = readParquetFile(spark, testOutputDirs.goodRows)
.sortBy(_.asObject.flatMap(_("event_id")).flatMap(_.asString))

assertGeneratedParquetSchema(testOutputDirs)
lines.size must beEqualTo(46)
}
}

// This test case uses events which contains contexts with
// different versions of test schema.
"A job which is configured for wide row parquet output" should {
Expand Down

0 comments on commit 961f19d

Please sign in to comment.