Skip to content

Commit

Permalink
common: use iglu-scala-client with improved caching (close #699)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and pondzix committed Oct 5, 2022
1 parent bf95a6e commit 5c30da0
Show file tree
Hide file tree
Showing 53 changed files with 239 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ import fs2.{Pipe, Stream}

import _root_.io.sentry.SentryClient

import _root_.io.circe.Json
import _root_.io.circe.syntax._

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Processor, Payload => BadRowPayload}
import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
Expand Down Expand Up @@ -116,7 +115,7 @@ object Enrich {
def enrichWith[F[_]: Clock: ContextShift: RegistryLookup: Sync: HttpClient](
enrichRegistry: F[EnrichmentRegistry[F]],
adapterRegistry: AdapterRegistry,
igluClient: Client[F, Json],
igluClient: IgluCirceClient[F],
sentry: Option[SentryClient],
processor: Processor,
featureFlags: FeatureFlags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ import cats.effect.concurrent.{Ref, Semaphore}

import fs2.Stream

import _root_.io.circe.Json

import _root_.io.sentry.{Sentry, SentryClient}

import org.http4s.client.{Client => Http4sClient}
import org.http4s.Status
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.iglu.client.{Client => IgluClient}
import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup}

import com.snowplowanalytics.snowplow.badrows.Processor
Expand Down Expand Up @@ -95,7 +93,7 @@ import java.util.concurrent.TimeoutException
* getPayload must be defined for this type, as well as checkpointing
*/
final case class Environment[F[_], A](
igluClient: IgluClient[F, Json],
igluClient: IgluCirceClient[F],
registryLookup: RegistryLookup[F],
enrichments: Ref[F, Environment.Enrichments[F]],
semaphore: Semaphore[F],
Expand Down Expand Up @@ -179,7 +177,7 @@ object Environment {
pii <- sinkPii.sequence
http <- Clients.mkHttp(ec = ec)
clts <- clients.map(Clients.init(http, _))
igluClient <- IgluClient.parseDefault[F](parsedConfigs.igluJson).resource
igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource
remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty
metrics <- Resource.eval(Metrics.build[F](blocker, file.monitoring.metrics, remoteAdaptersEnabled))
metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import cats.Applicative
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.{IgluCirceClient, Resolver}

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
Expand Down Expand Up @@ -79,9 +79,12 @@ object ParsedConfigs {
)
goodAttributes = outputAttributes(configFile.output.good)
piiAttributes = configFile.output.pii.map(outputAttributes).getOrElse { _: EnrichedEvent => Map.empty[String, String] }
client <- Client.parseDefault[F](igluJson).leftMap(x => show"Cannot decode Iglu Client. $x")
resolverConfig <-
EitherT.fromEither[F](Resolver.parseConfig(igluJson)).leftMap(x => show"Cannot decode Iglu resolver from provided json. $x")
resolver <- Resolver.fromConfig[F](resolverConfig).leftMap(x => show"Cannot create Iglu resolver from provided json. $x")
client <- EitherT.liftF(IgluCirceClient.fromResolver[F](resolver, resolverConfig.cacheSize))
_ <- EitherT.liftF(
Logger[F].info(show"Parsed Iglu Client with following registries: ${client.resolver.repos.map(_.config.name).mkString(", ")}")
Logger[F].info(show"Parsed Iglu Client with following registries: ${resolver.repos.map(_.config.name).mkString(", ")}")
)
configs <- EitherT(EnrichmentRegistry.parse[F](enrichmentJsons, client, false).map(_.toEither)).leftMap { x =>
show"Cannot decode enrichments - ${x.mkString_(", ")}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.FeatureFlags

import com.snowplowanalytics.snowplow.enrich.common.fs2.EnrichSpec.{Expected, minimalEvent, normalizeResult}
import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers.createIgluClient
import com.snowplowanalytics.snowplow.enrich.common.fs2.test._

import org.specs2.ScalaCheck
Expand Down Expand Up @@ -68,47 +69,52 @@ class EnrichSpec extends Specification with CatsIO with ScalaCheck {
derived_tstamp = Some(Instant.ofEpochMilli(0L))
)
implicit val c = TestEnvironment.http4sClient
Enrich
.enrichWith(
TestEnvironment.enrichmentReg.pure[IO],
TestEnvironment.adapterRegistry,
TestEnvironment.igluClient,
None,
EnrichSpec.processor,
EnrichSpec.featureFlags,
IO.unit
)(
EnrichSpec.payload
)
.map(normalizeResult)
.map {
case List(Validated.Valid(event)) => event must beEqualTo(expected)
case other => ko(s"Expected one valid event, got $other")
}
}

"enrich a randomly generated page view event" in {
implicit val cpGen = PayloadGen.getPageViewArbitrary
implicit val c = TestEnvironment.http4sClient
prop { (collectorPayload: CollectorPayload) =>
val payload = collectorPayload.toRaw
createIgluClient(List(TestEnvironment.embeddedRegistry)).flatMap { igluClient =>
Enrich
.enrichWith(
TestEnvironment.enrichmentReg.pure[IO],
TestEnvironment.adapterRegistry,
TestEnvironment.igluClient,
igluClient,
None,
EnrichSpec.processor,
EnrichSpec.featureFlags,
IO.unit
)(
payload
EnrichSpec.payload
)
.map(normalizeResult)
.map {
case List(Validated.Valid(e)) => e.event must beSome("page_view")
case List(Validated.Valid(event)) => event must beEqualTo(expected)
case other => ko(s"Expected one valid event, got $other")
}
}

}

"enrich a randomly generated page view event" in {
implicit val cpGen = PayloadGen.getPageViewArbitrary
implicit val c = TestEnvironment.http4sClient
prop { (collectorPayload: CollectorPayload) =>
val payload = collectorPayload.toRaw
createIgluClient(List(TestEnvironment.embeddedRegistry)).flatMap { igluClient =>
Enrich
.enrichWith(
TestEnvironment.enrichmentReg.pure[IO],
TestEnvironment.adapterRegistry,
igluClient,
None,
EnrichSpec.processor,
EnrichSpec.featureFlags,
IO.unit
)(
payload
)
.map(normalizeResult)
.map {
case List(Validated.Valid(e)) => e.event must beSome("page_view")
case other => ko(s"Expected one valid event, got $other")
}
}
}.setParameters(Parameters(maxSize = 20, minTestsOk = 25))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import cats.data.{Validated, ValidatedNel}
import cats.effect.testing.specs2.CatsIO
import cats.effect.{Blocker, IO}
import cats.implicits._
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.registries.Registry
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
Expand Down Expand Up @@ -176,7 +176,7 @@ class EventGenEtlPipelineSpec extends Specification with CatsIO {
val adapterRegistry = new AdapterRegistry()
val enrichmentReg = EnrichmentRegistry[IO]()
val igluCentral = Registry.IgluCentral
val client = Client.parseDefault[IO](json"""
val client = IgluCirceClient.parseDefault[IO](json"""
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
"data": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import cats.implicits._

import fs2.io.file.deleteIfExists

import com.snowplowanalytics.iglu.client.{IgluCirceClient, Resolver}
import com.snowplowanalytics.iglu.client.resolver.registries.Registry

import com.snowplowanalytics.snowplow.enrich.common.fs2.test._
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients

Expand Down Expand Up @@ -61,4 +64,7 @@ object SpecHelpers extends CatsIO {
/** Make sure files don't exist before and after test starts */
def filesResource(blocker: Blocker, files: List[Path]): Resource[IO, Unit] =
Resource.make(filesCleanup(blocker, files))(_ => filesCleanup(blocker, files))

def createIgluClient(registries: List[Registry]): IO[IgluCirceClient[IO]] =
IgluCirceClient.fromResolver[IO](Resolver(registries, None), cacheSize = 0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import cats.data.Validated
import cats.data.Validated.{Invalid, Valid}

import io.circe.Json
import io.circe.literal._
import io.circe.syntax._

import org.apache.thrift.TSerializer
Expand All @@ -35,7 +34,7 @@ import org.http4s.client.{Client => Http4sClient, JavaNetClientBuilder}

import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload

import com.snowplowanalytics.iglu.client.{CirceValidator, Client, Resolver}
import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.registries.Registry

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
Expand All @@ -49,13 +48,11 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.Enrich
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.FeatureFlags

import com.snowplowanalytics.snowplow.enrich.common.fs2.EnrichSpec
import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers.createIgluClient
import com.snowplowanalytics.snowplow.enrich.common.fs2.test.TestEnvironment

object BlackBoxTesting extends Specification with CatsIO {

val igluClient: Client[IO, Json] =
Client[IO, Json](Resolver(List(Registry.EmbeddedRegistry), None), CirceValidator)

val blocker: Blocker = Blocker.liftExecutionContext(ExecutionContext.global)
implicit val httpClient: Http4sClient[IO] = JavaNetClientBuilder[IO](blocker).create

Expand Down Expand Up @@ -98,20 +95,23 @@ object BlackBoxTesting extends Specification with CatsIO {
expected: Map[String, String],
enrichmentConfig: Option[Json] = None
) =
Enrich
.enrichWith(getEnrichmentRegistry(enrichmentConfig),
TestEnvironment.adapterRegistry,
igluClient,
None,
EnrichSpec.processor,
featureFlags,
IO.unit
)(
input
)
.map {
case (List(Validated.Valid(enriched)), _) => checkEnriched(enriched, expected)
case other => ko(s"there should be one enriched event but got $other")
createIgluClient(List(Registry.EmbeddedRegistry))
.flatMap { igluClient =>
Enrich
.enrichWith(getEnrichmentRegistry(enrichmentConfig, igluClient),
TestEnvironment.adapterRegistry,
igluClient,
None,
EnrichSpec.processor,
featureFlags,
IO.unit
)(
input
)
.map {
case (List(Validated.Valid(enriched)), _) => checkEnriched(enriched, expected)
case other => ko(s"there should be one enriched event but got $other")
}
}

private def checkEnriched(enriched: EnrichedEvent, expectedFields: Map[String, String]) = {
Expand All @@ -126,7 +126,7 @@ object BlackBoxTesting extends Specification with CatsIO {
private def getMap(enriched: EnrichedEvent): Map[String, String] =
enrichedFields.map(f => (f.getName(), Option(f.get(enriched)).map(_.toString).getOrElse(""))).toMap

private def getEnrichmentRegistry(enrichmentConfig: Option[Json]): IO[EnrichmentRegistry[IO]] =
private def getEnrichmentRegistry(enrichmentConfig: Option[Json], igluClient: IgluCirceClient[IO]): IO[EnrichmentRegistry[IO]] =
enrichmentConfig match {
case None =>
IO.pure(EnrichmentRegistry[IO]())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.Enrich
import com.snowplowanalytics.snowplow.enrich.common.fs2.EnrichSpec
import com.snowplowanalytics.snowplow.enrich.common.fs2.test.TestEnvironment
import com.snowplowanalytics.snowplow.enrich.common.fs2.blackbox.BlackBoxTesting
import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers.createIgluClient

class Tp2AdapterSpec extends Specification with CatsIO {
"enrichWith" should {
Expand All @@ -35,22 +36,24 @@ class Tp2AdapterSpec extends Specification with CatsIO {
contentType = "application/json".some
)
implicit val c = TestEnvironment.http4sClient
Enrich
.enrichWith(
TestEnvironment.enrichmentReg.pure[IO],
TestEnvironment.adapterRegistry,
BlackBoxTesting.igluClient,
None,
EnrichSpec.processor,
EnrichSpec.featureFlags,
IO.unit
)(
input
)
.map {
case (l, _) if l.forall(_.isValid) => l must haveSize(10)
case other => ko(s"there should be 10 enriched events, got $other")
}
createIgluClient(List(TestEnvironment.embeddedRegistry)).flatMap { igluClient =>
Enrich
.enrichWith(
TestEnvironment.enrichmentReg.pure[IO],
TestEnvironment.adapterRegistry,
igluClient,
None,
EnrichSpec.processor,
EnrichSpec.featureFlags,
IO.unit
)(
input
)
.map {
case (l, _) if l.forall(_.isValid) => l must haveSize(10)
case other => ko(s"there should be 10 enriched events, got $other")
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ import cats.effect.testing.specs2.CatsIO

import fs2.Stream

import io.circe.{Json, parser}
import io.circe.parser

import com.snowplowanalytics.iglu.client.{CirceValidator, Client, Resolver}
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, Registry}

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
Expand All @@ -45,7 +44,7 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF

import com.snowplowanalytics.snowplow.enrich.common.fs2.{Assets, AttributedData, Enrich, EnrichSpec, Environment}
import com.snowplowanalytics.snowplow.enrich.common.fs2.Environment.{Enrichments, StreamsSettings}
import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers.{filesResource, ioClock}
import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers.{createIgluClient, filesResource, ioClock}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Telemetry}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients
import org.http4s.client.{Client => Http4sClient}
Expand Down Expand Up @@ -116,9 +115,6 @@ object TestEnvironment extends CatsIO {

val adapterRegistry = new AdapterRegistry()

val igluClient: Client[IO, Json] =
Client[IO, Json](Resolver(List(embeddedRegistry), None), CirceValidator)

val http4sClient: Http4sClient[IO] = Http4sClient[IO] { _ =>
val dsl = new Http4sDsl[IO] {}; import dsl._
Resource.eval(Ok(""))
Expand Down Expand Up @@ -147,6 +143,7 @@ object TestEnvironment extends CatsIO {
goodRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty))
piiRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty))
badRef <- Resource.eval(Ref.of[IO, Vector[Array[Byte]]](Vector.empty))
igluClient <- Resource.eval(createIgluClient(List(embeddedRegistry)))
environment = Environment[IO, Array[Byte]](
igluClient,
Http4sRegistryLookup(http),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import cats.data.{Validated, ValidatedNel}
import cats.effect.Clock
import cats.implicits._

import io.circe.Json

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
Expand Down Expand Up @@ -64,7 +62,7 @@ object EtlPipeline {
def processEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](
adapterRegistry: AdapterRegistry,
enrichmentRegistry: EnrichmentRegistry[F],
client: Client[F, Json],
client: IgluCirceClient[F],
processor: Processor,
etlTstamp: DateTime,
input: ValidatedNel[BadRow, Option[CollectorPayload]],
Expand Down

0 comments on commit 5c30da0

Please sign in to comment.