Skip to content

Commit

Permalink
Merge 2314134 into 5e88175
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Jun 6, 2023
2 parents 5e88175 + 2314134 commit 56c700a
Show file tree
Hide file tree
Showing 43 changed files with 620 additions and 778 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object Processing {
implicit val lookup: RegistryLookup[F] = resources.registryLookup
val transformer: Transformer[F] = config.formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(resources.igluResolver, resources.propertiesCache, f, processor)
Transformer.ShredTransformer(resources.igluResolver, resources.shredModelCache, f, processor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(resources.igluResolver, f, processor)
}
Expand Down Expand Up @@ -262,7 +262,9 @@ object Processing {
implicit class TransformedOps(t: Transformed) {
def getPath: SinkPath = t match {
case p: Transformed.Shredded =>
val suffix = Some(s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/")
val suffix = Some(
s"vendor=${p.vendor}/name=${p.name}/format=${p.format.path.toLowerCase}/model=${p.model}/revision=${p.revision}/addition=${p.addition}"
)
val pathType = if (p.isGood) SinkPath.PathType.Good else SinkPath.PathType.Bad
SinkPath(suffix, pathType)
case p: Transformed.WideRow =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import io.sentry.SentryClient
import com.snowplowanalytics.iglu.client.resolver.{CreateResolverCache, Resolver}
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup}
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.rdbloader.common.Sentry
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils.EventParser
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, ShredModelCache}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Config.Output.Bad
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.metrics.Metrics
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.BadSink
Expand All @@ -49,7 +50,7 @@ import org.http4s.blaze.client.BlazeClientBuilder

case class Resources[F[_], C](
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
shredModelCache: ShredModelCache[F],
eventParser: EventParser,
producer: Queue.Producer[F],
instanceId: String,
Expand Down Expand Up @@ -83,7 +84,7 @@ object Resources {
producer <- mkQueue(config.queue)
resolverConfig <- mkResolverConfig(igluConfig)
resolver <- mkResolver(resolverConfig)
propertiesCache <- Resource.eval(CreateLruMap[F, PropertiesKey, Properties].create(resolverConfig.cacheSize))
shredModelCache <- Resource.eval(CreateLruMap[F, SchemaKey, ShredModel].create(resolverConfig.cacheSize))
httpClient <- BlazeClientBuilder[F].withExecutionContext(executionContext).resource
implicit0(registryLookup: RegistryLookup[F]) <- Resource.pure(Http4sRegistryLookup[F](httpClient))
eventParser <- mkEventParser(resolver, config)
Expand All @@ -105,7 +106,7 @@ object Resources {
badSink <- mkBadSink(config, mkBadQueue)
} yield Resources(
resolver,
propertiesCache,
shredModelCache,
eventParser,
producer,
instanceId.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{SnowplowEntity, TypesInfo}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats.WideRow
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{
AtomicFieldsProvider,
Expand All @@ -45,7 +45,7 @@ sealed trait Transformer[F[_]] extends Product with Serializable {
object Transformer {
case class ShredTransformer[F[_]: Monad: RegistryLookup: Clock](
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
shredModelCache: ShredModelCache[F],
formats: Formats.Shred,
processor: Processor
) extends Transformer[F] {
Expand All @@ -59,12 +59,12 @@ object Transformer {
else TypesInfo.Shredded.ShreddedFormat.JSON

def goodTransform(event: Event): EitherT[F, BadRow, List[Transformed]] =
Transformed.shredEvent[F](igluResolver, propertiesCache, isTabular, processor)(event)
Transformed.shredEvent[F](igluResolver, shredModelCache, isTabular, processor)(event)

def badTransform(badRow: BadRow): Transformed = {
val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey
val SchemaKey(vendor, name, _, SchemaVer.Full(model, revision, addition)) = badRow.schemaKey
val data = Transformed.Data.DString(badRow.compact)
Transformed.Shredded.Json(false, vendor, name, model, data)
Transformed.Shredded.Json(false, vendor, name, model, revision, addition, data)
}

def typesInfo(types: Set[Data.ShreddedType]): TypesInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,28 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0"
)
)
actualOptimizelyRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.optimizely/name=state/format=tsv/model=1/revision=0/addition=0"
)
)
actualConsentRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=consent_document/format=tsv/model=1/revision=0/addition=0"
)
)
actualBadRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_parsing_error/format=json/model=2/revision=0/addition=0"
)
)

Expand Down Expand Up @@ -99,14 +99,14 @@ class ShredTsvProcessingSpec extends BaseProcessingSpec {
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=good/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/revision=0/addition=0"
)
)
actualBadRows <-
readStringRowsFrom(
Path(
outputDirectory.toString +
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/"
s"/run=1970-01-01-10-30-00-${AppId.appId}/output=bad/vendor=com.snowplowanalytics.snowplow.badrows/name=loader_iglu_error/format=json/model=2/revision=0/addition=0"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import io.circe.parser.{parse => parseCirce}
import org.http4s.client.{Client => Http4sClient}
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, Registry, RegistryLookup}
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, PropertiesKey, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{Processing, Transformer}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.{Checkpointer, ParsedC}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.TestApplication._
Expand All @@ -48,12 +49,12 @@ class TransformingSpec extends Specification {

val testFileNameMap = List(
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "atomic", 1, dummyTransformedData)
.Tabular("com.snowplowanalytics.snowplow", "atomic", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-atomic",
Transformed.Shredded
.Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, dummyTransformedData)
.Tabular("com.snowplowanalytics.snowplow", "consent_document", 1, 0, 0, dummyTransformedData)
.getPath -> "com.snowplowanalytics.snowplow-consent_document",
Transformed.Shredded.Tabular("com.optimizely", "state", 1, dummyTransformedData).getPath -> "com.optimizely-state"
Transformed.Shredded.Tabular("com.optimizely", "state", 1, 0, 0, dummyTransformedData).getPath -> "com.optimizely-state"
).toMap

val expectedTransformedMap =
Expand Down Expand Up @@ -135,12 +136,12 @@ object TransformingSpec {
val defaultWindow = Window(1, 1, 1, 1, 1)
val dummyTransformedData = Transformed.Data.DString("")

def propertiesCache: PropertiesCache[IO] = CreateLruMap[IO, PropertiesKey, Properties].create(100).unsafeRunSync()
def shredModelCache: ShredModelCache[IO] = CreateLruMap[IO, SchemaKey, ShredModel].create(100).unsafeRunSync()

def createTransformer(formats: TransformerConfig.Formats): Transformer[IO] =
formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(defaultIgluResolver, propertiesCache, f, TestProcessor)
Transformer.ShredTransformer(defaultIgluResolver, shredModelCache, f, TestProcessor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(defaultIgluResolver, f, TestProcessor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object Common {
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss").withZone(ZoneId.from(ZoneOffset.UTC))

def entityPath(entity: TypesInfo.Shredded.Type) =
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}"
s"$GoodPrefix/vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}/revision=${entity.schemaKey.version.revision}/addition=${entity.schemaKey.version.addition}"

def entityPathFull(base: BlobStorage.Folder, entity: TypesInfo.Shredded.Type): BlobStorage.Folder =
BlobStorage.Folder.append(base, entityPath(entity))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.time.format.DateTimeParseException

import io.circe.Json
import cats.Monad
import cats.data.{EitherT, NonEmptyList}
import cats.data.NonEmptyList
import cats.implicits._

import cats.effect.Clock
Expand All @@ -30,16 +30,13 @@ import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.iglu.schemaddl.migrations.FlatData
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, ParsingError}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import Flattening.NullCharacter
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.snowplow.analytics.scalasdk.decode.TSVParser
import com.snowplowanalytics.snowplow.rdbloader.common.Common

object EventUtils {

Expand Down Expand Up @@ -138,34 +135,6 @@ object EventUtils {
"events"
)

/**
* Transform a self-desribing entity into tabular format, using its known schemas to get a correct
* order of columns
* @param resolver
* Iglu resolver to get list of known schemas
* @param instance
* self-describing JSON that needs to be transformed
* @return
* list of columns or flattening error
*/
def flatten[F[_]: Monad: Clock: RegistryLookup](
resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
instance: SelfDescribingData[Json]
): EitherT[F, FailureDetails.LoaderIgluError, List[String]] =
Flattening
.getDdlProperties(resolver, propertiesCache, instance.schema)
.map(props => mapProperties(props, instance))

private def mapProperties(props: Properties, instance: SelfDescribingData[Json]) =
props
.map { case (pointer, _) =>
FlatData.getPath(pointer.forData, instance.data, getString, NullCharacter)
}

def getString(json: Json): String =
json.fold(NullCharacter, transformBool, _ => json.show, escape, _ => escape(json.noSpaces), _ => escape(json.noSpaces))

def getEntities(event: Event): List[SelfDescribingData[Json]] =
event.unstruct_event.data.toList ++
event.derived_contexts.data ++
Expand All @@ -178,11 +147,6 @@ object EventUtils {
case _ => 0
}

/** Prevents data with newlines and tabs from breaking the loading process */
private def escape(s: String): String =
if (s == NullCharacter) "\\\\N"
else s.replace('\t', ' ').replace('\n', ' ')

private def getLength(schema: Schema): Option[Int] =
schema.maxLength.map(_.value.toInt)
}
Loading

0 comments on commit 56c700a

Please sign in to comment.