Skip to content

Commit

Permalink
use new shredmodel
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Apr 4, 2023
1 parent 9901ea5 commit 3b43079
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 415 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object Processing {
): Stream[F, Unit] = {
val transformer: Transformer[F] = config.formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(resources.igluResolver, resources.propertiesCache, f, processor)
Transformer.ShredTransformer(resources.igluResolver, resources.shredModelCache, f, processor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(resources.igluResolver, f, processor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import io.sentry.SentryClient
import com.snowplowanalytics.iglu.client.resolver.{CreateResolverCache, Resolver}
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup}
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.rdbloader.common.Sentry
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils.EventParser
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, ShredModelCache}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Config.Output.Bad
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.metrics.Metrics
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.BadSink
Expand All @@ -49,7 +50,7 @@ import org.http4s.blaze.client.BlazeClientBuilder

case class Resources[F[_], C](
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
shredModelCache: ShredModelCache[F],
eventParser: EventParser,
producer: Queue.Producer[F],
instanceId: String,
Expand Down Expand Up @@ -82,7 +83,7 @@ object Resources {
producer <- mkQueue(config.queue)
resolverConfig <- mkResolverConfig(igluConfig)
resolver <- mkResolver(resolverConfig)
propertiesCache <- Resource.eval(CreateLruMap[F, PropertiesKey, Properties].create(resolverConfig.cacheSize))
shredModelCache <- Resource.eval(CreateLruMap[F, SchemaKey, ShredModel].create(resolverConfig.cacheSize))
httpClient <- BlazeClientBuilder[F].withExecutionContext(executionContext).resource
implicit0(registryLookup: RegistryLookup[F]) <- Resource.pure(Http4sRegistryLookup[F](httpClient))
eventParser <- mkEventParser(resolver, config)
Expand All @@ -104,7 +105,7 @@ object Resources {
badSink <- mkBadSink(config, mkBadQueue)
} yield Resources(
resolver,
propertiesCache,
shredModelCache,
eventParser,
producer,
instanceId.toString,
Expand Down Expand Up @@ -145,13 +146,17 @@ object Resources {
case Left(error) => Sync[F].raiseError[Resolver[F]](error)
}
}
private def mkEventParser[F[_]: Sync: Clock](igluResolver: Resolver[F], config: Config): Resource[F, EventParser] = Resource.eval {
mkAtomicLengths(igluResolver, config).flatMap {
case Right(atomicLengths) => Sync[F].pure(Event.parser(atomicLengths))
case Left(error) => Sync[F].raiseError[EventParser](error)
private def mkEventParser[F[_]: Sync: RegistryLookup: Clock](igluResolver: Resolver[F], config: Config): Resource[F, EventParser] =
Resource.eval {
mkAtomicLengths(igluResolver, config).flatMap {
case Right(atomicLengths) => Sync[F].pure(Event.parser(atomicLengths))
case Left(error) => Sync[F].raiseError[EventParser](error)
}
}
}
private def mkAtomicLengths[F[_]: Sync: Clock](igluResolver: Resolver[F], config: Config): F[Either[RuntimeException, Map[String, Int]]] =
private def mkAtomicLengths[F[_]: Sync: RegistryLookup: Clock](
igluResolver: Resolver[F],
config: Config
): F[Either[RuntimeException, Map[String, Int]]] =
if (config.featureFlags.truncateAtomicFields) {
EventUtils.getAtomicLengths(igluResolver)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{SnowplowEntity, TypesInfo}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats.WideRow
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{
AtomicFieldsProvider,
Expand All @@ -45,7 +45,7 @@ sealed trait Transformer[F[_]] extends Product with Serializable {
object Transformer {
case class ShredTransformer[F[_]: Monad: RegistryLookup: Clock](
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
shredModelCache: ShredModelCache[F],
formats: Formats.Shred,
processor: Processor
) extends Transformer[F] {
Expand All @@ -59,7 +59,7 @@ object Transformer {
else TypesInfo.Shredded.ShreddedFormat.JSON

def goodTransform(event: Event): EitherT[F, BadRow, List[Transformed]] =
Transformed.shredEvent[F](igluResolver, propertiesCache, isTabular, processor)(event)
Transformed.shredEvent[F](igluResolver, shredModelCache, isTabular, processor)(event)

def badTransform(badRow: BadRow): Transformed = {
val SchemaKey(vendor, name, _, SchemaVer.Full(model, revision, addition)) = badRow.schemaKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import io.circe.optics.JsonPath._
import io.circe.parser.{parse => parseCirce}
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.Registry
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.schemaddl.redshift.ShredModel
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, PropertiesKey, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{ShredModelCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{Processing, Transformer}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.{Checkpointer, ParsedC}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.TestApplication._
Expand Down Expand Up @@ -128,12 +129,12 @@ object TransformingSpec {
val defaultWindow = Window(1, 1, 1, 1, 1)
val dummyTransformedData = Transformed.Data.DString("")

def propertiesCache: PropertiesCache[IO] = CreateLruMap[IO, PropertiesKey, Properties].create(100).unsafeRunSync()
def shredModelCache: ShredModelCache[IO] = CreateLruMap[IO, SchemaKey, ShredModel].create(100).unsafeRunSync()

def createTransformer(formats: TransformerConfig.Formats): Transformer[IO] =
formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(defaultIgluResolver, propertiesCache, f, TestProcessor)
Transformer.ShredTransformer(defaultIgluResolver, shredModelCache, f, TestProcessor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(defaultIgluResolver, f, TestProcessor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.time.format.DateTimeParseException

import io.circe.Json
import cats.Monad
import cats.data.{EitherT, NonEmptyList}
import cats.data.NonEmptyList
import cats.implicits._

import cats.effect.Clock
Expand All @@ -30,16 +30,13 @@ import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.iglu.schemaddl.migrations.FlatData
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, ParsingError}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import Flattening.NullCharacter
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.snowplow.analytics.scalasdk.decode.TSVParser
import com.snowplowanalytics.snowplow.rdbloader.common.Common

object EventUtils {

Expand Down Expand Up @@ -138,34 +135,6 @@ object EventUtils {
"events"
)

/**
* Transform a self-desribing entity into tabular format, using its known schemas to get a correct
* order of columns
* @param resolver
* Iglu resolver to get list of known schemas
* @param instance
* self-describing JSON that needs to be transformed
* @return
* list of columns or flattening error
*/
def flatten[F[_]: Monad: Clock: RegistryLookup](
resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
instance: SelfDescribingData[Json]
): EitherT[F, FailureDetails.LoaderIgluError, List[String]] =
Flattening
.getDdlProperties(resolver, propertiesCache, instance.schema)
.map(props => mapProperties(props, instance))

private def mapProperties(props: Properties, instance: SelfDescribingData[Json]) =
props
.map { case (pointer, _) =>
FlatData.getPath(pointer.forData, instance.data, getString, NullCharacter)
}

def getString(json: Json): String =
json.fold(NullCharacter, transformBool, _ => json.show, escape, _ => escape(json.noSpaces), _ => escape(json.noSpaces))

def getEntities(event: Event): List[SelfDescribingData[Json]] =
event.unstruct_event.data.toList ++
event.derived_contexts.data ++
Expand All @@ -178,11 +147,6 @@ object EventUtils {
case _ => 0
}

/** Prevents data with newlines and tabs from breaking the loading process */
private def escape(s: String): String =
if (s == NullCharacter) "\\\\N"
else s.replace('\t', ' ').replace('\n', ' ')

private def getLength(schema: Schema): Option[Int] =
schema.maxLength.map(_.value.toInt)
}

This file was deleted.

Loading

0 comments on commit 3b43079

Please sign in to comment.