Skip to content

Commit

Permalink
Bump Snowplow Events Manifest to 0.4.0 (close #1303)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Jul 31, 2023
1 parent 66ea6cd commit 42d0822
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
package com.snowplowanalytics.snowplow.rdbloader.transformer.batch

import cats.Id
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.{Client, Resolver}
import com.snowplowanalytics.iglu.client.validator.CirceValidator
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.snowplow.rdbloader.common.catsClockIdInstance
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{AtomicFieldsProvider, NonAtomicFieldsProvider}
Expand Down Expand Up @@ -42,8 +43,6 @@ import io.circe.parser.{parse => parseCirce}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse}

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._
import com.snowplowanalytics.iglu.core.SchemaCriterion

import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifestConfig
Expand Down Expand Up @@ -390,6 +389,7 @@ object ShredJobSpec {
json"""{
"schema": "iglu:com.snowplowanalytics.snowplow.storage/amazon_dynamodb_config/jsonschema/2-0-0",
"data": {
"id": "72512e4c-d52e-4f41-abc2-87e341d8c49a",
"name": "local",
"auth": null,
"awsRegion": $dynamodbDuplicateStorageRegion,
Expand Down Expand Up @@ -478,25 +478,28 @@ trait ShredJobSpec extends SparkSpec {
storageConfig(shredder, tsv, jsonSchemas, wideRow)
)

val (dedupeConfigCli, dedupeConfig) = if (crossBatchDedupe) {
val dedupeConfigCli = if (crossBatchDedupe) {
val encoder = Base64.getUrlEncoder
val encoded = new String(encoder.encode(duplicateStorageConfig.noSpaces.getBytes()))
val config = SelfDescribingData
.parse(duplicateStorageConfig)
.leftMap(_.code)
.flatMap(EventsManifestConfig.DynamoDb.extract)
.valueOr(e => throw new RuntimeException(e))
(Array("--duplicate-storage-config", encoded), Some(config))
Array("--duplicate-storage-config", encoded)
} else {
(Array.empty[String], None)
Array.empty[String]
}

CliConfig.loadConfigFrom("snowplow-rdb-shredder", "Test specification for RDB Shrederr")(config ++ dedupeConfigCli) match {
case Right(cli) =>
val resolverConfig = Resolver
.parseConfig(cli.igluConfig)
.valueOr(error => throw new IllegalArgumentException(s"Could not parse iglu resolver config: ${error.getMessage()}"))

val dedupeConfig = if (crossBatchDedupe) {
val igluClient = Client[Id, Json](IgluSingleton.get(resolverConfig), CirceValidator)
val config = EventsManifestConfig
.parseJson[Id](igluClient, duplicateStorageConfig)
.valueOr(err => throw new IllegalArgumentException(err))
Some(config)
} else {
None
}
val transformer = cli.config.formats match {
case f: TransformerConfig.Formats.Shred => Transformer.ShredTransformer(resolverConfig, f, 0)
case TransformerConfig.Formats.WideRow.JSON => Transformer.WideRowJsonTransformer()
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object Dependencies {
val scalaTracker = "2.0.0"

val spark = "3.3.1"
val eventsManifest = "0.3.0"
val eventsManifest = "0.4.0"
val schemaDdl = "0.18.2"
val jacksonModule = "2.14.2" // Override incompatible version in spark runtime
val jacksonDatabind = "2.14.2"
Expand Down

0 comments on commit 42d0822

Please sign in to comment.