From 5c30da03d99945fbf61922d20d392158c94d960d Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 23 Jun 2022 17:35:25 +0200 Subject: [PATCH] common: use iglu-scala-client with improved caching (close #699) --- .../snowplow/enrich/common/fs2/Enrich.scala | 5 +- .../enrich/common/fs2/Environment.scala | 8 +-- .../common/fs2/config/ParsedConfigs.scala | 9 ++- .../enrich/common/fs2/EnrichSpec.scala | 60 ++++++++++--------- .../common/fs2/EventGenEtlPipelineSpec.scala | 4 +- .../enrich/common/fs2/SpecHelpers.scala | 6 ++ .../common/fs2/blackbox/BlackBoxTesting.scala | 40 ++++++------- .../blackbox/adapters/Tp2AdapterSpec.scala | 35 ++++++----- .../common/fs2/test/TestEnvironment.scala | 9 +-- .../common/EtlPipeline.scala | 6 +- .../common/adapters/AdapterRegistry.scala | 6 +- .../common/adapters/registry/Adapter.scala | 4 +- .../adapters/registry/CallrailAdapter.scala | 6 +- .../registry/CloudfrontAccessLogAdapter.scala | 7 ++- .../registry/GoogleAnalyticsAdapter.scala | 4 +- .../adapters/registry/HubSpotAdapter.scala | 7 ++- .../adapters/registry/IgluAdapter.scala | 4 +- .../adapters/registry/MailchimpAdapter.scala | 7 ++- .../adapters/registry/MailgunAdapter.scala | 4 +- .../adapters/registry/MandrillAdapter.scala | 7 ++- .../adapters/registry/MarketoAdapter.scala | 7 ++- .../adapters/registry/OlarkAdapter.scala | 7 ++- .../adapters/registry/PagerdutyAdapter.scala | 7 ++- .../adapters/registry/PingdomAdapter.scala | 7 ++- .../adapters/registry/RemoteAdapter.scala | 7 ++- .../adapters/registry/SendgridAdapter.scala | 8 ++- .../registry/StatusGatorAdapter.scala | 8 ++- .../adapters/registry/UnbounceAdapter.scala | 7 ++- .../registry/UrbanAirshipAdapter.scala | 9 ++- .../adapters/registry/VeroAdapter.scala | 7 ++- .../registry/snowplow/RedirectAdapter.scala | 4 +- .../registry/snowplow/Tp1Adapter.scala | 6 +- .../registry/snowplow/Tp2Adapter.scala | 6 +- .../enrichments/EnrichmentManager.scala | 4 +- .../enrichments/EnrichmentRegistry.scala | 4 +- .../common/utils/IgluUtils.scala | 18 +++--- .../EtlPipelineSpec.scala | 7 +-- .../SpecHelpers.scala | 4 +- .../adapters/registry/AdapterSpec.scala | 4 +- .../pii/PiiPseudonymizerEnrichmentSpec.scala | 6 +- .../Enrich.scala | 10 ++-- .../sources/Source.scala | 5 +- .../SpecHelpers.scala | 4 +- .../sources/TestSource.scala | 5 +- .../KafkaEnrich.scala | 4 +- .../sources/KafkaSource.scala | 7 +-- .../KinesisEnrich.scala | 4 +- .../sources/KinesisSource.scala | 7 +-- .../NsqEnrich.scala | 4 +- .../sources/NsqSource.scala | 7 +-- .../StdinEnrich.scala | 4 +- .../sources/StdinSource.scala | 7 +-- project/Dependencies.scala | 2 +- 53 files changed, 239 insertions(+), 206 deletions(-) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala index 95288bcc7..2b75f1766 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala @@ -33,13 +33,12 @@ import fs2.{Pipe, Stream} import _root_.io.sentry.SentryClient -import _root_.io.circe.Json import _root_.io.circe.syntax._ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Processor, Payload => BadRowPayload} import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline @@ -116,7 +115,7 @@ object Enrich { def enrichWith[F[_]: Clock: ContextShift: RegistryLookup: Sync: HttpClient]( enrichRegistry: F[EnrichmentRegistry[F]], adapterRegistry: AdapterRegistry, - igluClient: Client[F, Json], + igluClient: IgluCirceClient[F], sentry: Option[SentryClient], processor: Processor, featureFlags: FeatureFlags, diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala index 058157cdd..7cfa39cf9 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala @@ -23,8 +23,6 @@ import cats.effect.concurrent.{Ref, Semaphore} import fs2.Stream -import _root_.io.circe.Json - import _root_.io.sentry.{Sentry, SentryClient} import org.http4s.client.{Client => Http4sClient} @@ -32,7 +30,7 @@ import org.http4s.Status import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import com.snowplowanalytics.iglu.client.{Client => IgluClient} +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup} import com.snowplowanalytics.snowplow.badrows.Processor @@ -95,7 +93,7 @@ import java.util.concurrent.TimeoutException * getPayload must be defined for this type, as well as checkpointing */ final case class Environment[F[_], A]( - igluClient: IgluClient[F, Json], + igluClient: IgluCirceClient[F], registryLookup: RegistryLookup[F], enrichments: Ref[F, Environment.Enrichments[F]], semaphore: Semaphore[F], @@ -179,7 +177,7 @@ object Environment { pii <- sinkPii.sequence http <- Clients.mkHttp(ec = ec) clts <- clients.map(Clients.init(http, _)) - igluClient <- IgluClient.parseDefault[F](parsedConfigs.igluJson).resource + igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty metrics <- Resource.eval(Metrics.build[F](blocker, file.monitoring.metrics, remoteAdaptersEnabled)) metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http)) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala index 2f999c86a..caa801c62 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala @@ -29,7 +29,7 @@ import cats.Applicative import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.{IgluCirceClient, Resolver} import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry @@ -79,9 +79,12 @@ object ParsedConfigs { ) goodAttributes = outputAttributes(configFile.output.good) piiAttributes = configFile.output.pii.map(outputAttributes).getOrElse { _: EnrichedEvent => Map.empty[String, String] } - client <- Client.parseDefault[F](igluJson).leftMap(x => show"Cannot decode Iglu Client. $x") + resolverConfig <- + EitherT.fromEither[F](Resolver.parseConfig(igluJson)).leftMap(x => show"Cannot decode Iglu resolver from provided json. $x") + resolver <- Resolver.fromConfig[F](resolverConfig).leftMap(x => show"Cannot create Iglu resolver from provided json. $x") + client <- EitherT.liftF(IgluCirceClient.fromResolver[F](resolver, resolverConfig.cacheSize)) _ <- EitherT.liftF( - Logger[F].info(show"Parsed Iglu Client with following registries: ${client.resolver.repos.map(_.config.name).mkString(", ")}") + Logger[F].info(show"Parsed Iglu Client with following registries: ${resolver.repos.map(_.config.name).mkString(", ")}") ) configs <- EitherT(EnrichmentRegistry.parse[F](enrichmentJsons, client, false).map(_.toEither)).leftMap { x => show"Cannot decode enrichments - ${x.mkString_(", ")}" diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala index 55f69fe89..4bf5f5e9e 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala @@ -41,6 +41,7 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.FeatureFlags import com.snowplowanalytics.snowplow.enrich.common.fs2.EnrichSpec.{Expected, minimalEvent, normalizeResult} +import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers.createIgluClient import com.snowplowanalytics.snowplow.enrich.common.fs2.test._ import org.specs2.ScalaCheck @@ -68,47 +69,52 @@ class EnrichSpec extends Specification with CatsIO with ScalaCheck { derived_tstamp = Some(Instant.ofEpochMilli(0L)) ) implicit val c = TestEnvironment.http4sClient - Enrich - .enrichWith( - TestEnvironment.enrichmentReg.pure[IO], - TestEnvironment.adapterRegistry, - TestEnvironment.igluClient, - None, - EnrichSpec.processor, - EnrichSpec.featureFlags, - IO.unit - )( - EnrichSpec.payload - ) - .map(normalizeResult) - .map { - case List(Validated.Valid(event)) => event must beEqualTo(expected) - case other => ko(s"Expected one valid event, got $other") - } - } - - "enrich a randomly generated page view event" in { - implicit val cpGen = PayloadGen.getPageViewArbitrary - implicit val c = TestEnvironment.http4sClient - prop { (collectorPayload: CollectorPayload) => - val payload = collectorPayload.toRaw + createIgluClient(List(TestEnvironment.embeddedRegistry)).flatMap { igluClient => Enrich .enrichWith( TestEnvironment.enrichmentReg.pure[IO], TestEnvironment.adapterRegistry, - TestEnvironment.igluClient, + igluClient, None, EnrichSpec.processor, EnrichSpec.featureFlags, IO.unit )( - payload + EnrichSpec.payload ) .map(normalizeResult) .map { - case List(Validated.Valid(e)) => e.event must beSome("page_view") + case List(Validated.Valid(event)) => event must beEqualTo(expected) case other => ko(s"Expected one valid event, got $other") } + } + + } + + "enrich a randomly generated page view event" in { + implicit val cpGen = PayloadGen.getPageViewArbitrary + implicit val c = TestEnvironment.http4sClient + prop { (collectorPayload: CollectorPayload) => + val payload = collectorPayload.toRaw + createIgluClient(List(TestEnvironment.embeddedRegistry)).flatMap { igluClient => + Enrich + .enrichWith( + TestEnvironment.enrichmentReg.pure[IO], + TestEnvironment.adapterRegistry, + igluClient, + None, + EnrichSpec.processor, + EnrichSpec.featureFlags, + IO.unit + )( + payload + ) + .map(normalizeResult) + .map { + case List(Validated.Valid(e)) => e.event must beSome("page_view") + case other => ko(s"Expected one valid event, got $other") + } + } }.setParameters(Parameters(maxSize = 20, minTestsOk = 25)) } } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala index 8bee950fd..4b2210061 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala @@ -16,7 +16,7 @@ import cats.data.{Validated, ValidatedNel} import cats.effect.testing.specs2.CatsIO import cats.effect.{Blocker, IO} import cats.implicits._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.Registry import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor} import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline @@ -176,7 +176,7 @@ class EventGenEtlPipelineSpec extends Specification with CatsIO { val adapterRegistry = new AdapterRegistry() val enrichmentReg = EnrichmentRegistry[IO]() val igluCentral = Registry.IgluCentral - val client = Client.parseDefault[IO](json""" + val client = IgluCirceClient.parseDefault[IO](json""" { "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1", "data": { diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/SpecHelpers.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/SpecHelpers.scala index e98364226..9e1be7e88 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/SpecHelpers.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/SpecHelpers.scala @@ -23,6 +23,9 @@ import cats.implicits._ import fs2.io.file.deleteIfExists +import com.snowplowanalytics.iglu.client.{IgluCirceClient, Resolver} +import com.snowplowanalytics.iglu.client.resolver.registries.Registry + import com.snowplowanalytics.snowplow.enrich.common.fs2.test._ import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients @@ -61,4 +64,7 @@ object SpecHelpers extends CatsIO { /** Make sure files don't exist before and after test starts */ def filesResource(blocker: Blocker, files: List[Path]): Resource[IO, Unit] = Resource.make(filesCleanup(blocker, files))(_ => filesCleanup(blocker, files)) + + def createIgluClient(registries: List[Registry]): IO[IgluCirceClient[IO]] = + IgluCirceClient.fromResolver[IO](Resolver(registries, None), cacheSize = 0) } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala index 012967542..b96d2eaf6 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala @@ -26,7 +26,6 @@ import cats.data.Validated import cats.data.Validated.{Invalid, Valid} import io.circe.Json -import io.circe.literal._ import io.circe.syntax._ import org.apache.thrift.TSerializer @@ -35,7 +34,7 @@ import org.http4s.client.{Client => Http4sClient, JavaNetClientBuilder} import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.CollectorPayload -import com.snowplowanalytics.iglu.client.{CirceValidator, Client, Resolver} +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.Registry import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} @@ -49,13 +48,11 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.Enrich import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.FeatureFlags import com.snowplowanalytics.snowplow.enrich.common.fs2.EnrichSpec +import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers.createIgluClient import com.snowplowanalytics.snowplow.enrich.common.fs2.test.TestEnvironment object BlackBoxTesting extends Specification with CatsIO { - val igluClient: Client[IO, Json] = - Client[IO, Json](Resolver(List(Registry.EmbeddedRegistry), None), CirceValidator) - val blocker: Blocker = Blocker.liftExecutionContext(ExecutionContext.global) implicit val httpClient: Http4sClient[IO] = JavaNetClientBuilder[IO](blocker).create @@ -98,20 +95,23 @@ object BlackBoxTesting extends Specification with CatsIO { expected: Map[String, String], enrichmentConfig: Option[Json] = None ) = - Enrich - .enrichWith(getEnrichmentRegistry(enrichmentConfig), - TestEnvironment.adapterRegistry, - igluClient, - None, - EnrichSpec.processor, - featureFlags, - IO.unit - )( - input - ) - .map { - case (List(Validated.Valid(enriched)), _) => checkEnriched(enriched, expected) - case other => ko(s"there should be one enriched event but got $other") + createIgluClient(List(Registry.EmbeddedRegistry)) + .flatMap { igluClient => + Enrich + .enrichWith(getEnrichmentRegistry(enrichmentConfig, igluClient), + TestEnvironment.adapterRegistry, + igluClient, + None, + EnrichSpec.processor, + featureFlags, + IO.unit + )( + input + ) + .map { + case (List(Validated.Valid(enriched)), _) => checkEnriched(enriched, expected) + case other => ko(s"there should be one enriched event but got $other") + } } private def checkEnriched(enriched: EnrichedEvent, expectedFields: Map[String, String]) = { @@ -126,7 +126,7 @@ object BlackBoxTesting extends Specification with CatsIO { private def getMap(enriched: EnrichedEvent): Map[String, String] = enrichedFields.map(f => (f.getName(), Option(f.get(enriched)).map(_.toString).getOrElse(""))).toMap - private def getEnrichmentRegistry(enrichmentConfig: Option[Json]): IO[EnrichmentRegistry[IO]] = + private def getEnrichmentRegistry(enrichmentConfig: Option[Json], igluClient: IgluCirceClient[IO]): IO[EnrichmentRegistry[IO]] = enrichmentConfig match { case None => IO.pure(EnrichmentRegistry[IO]()) diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/adapters/Tp2AdapterSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/adapters/Tp2AdapterSpec.scala index 8007eb07a..cb4c6b929 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/adapters/Tp2AdapterSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/adapters/Tp2AdapterSpec.scala @@ -25,6 +25,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.Enrich import com.snowplowanalytics.snowplow.enrich.common.fs2.EnrichSpec import com.snowplowanalytics.snowplow.enrich.common.fs2.test.TestEnvironment import com.snowplowanalytics.snowplow.enrich.common.fs2.blackbox.BlackBoxTesting +import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers.createIgluClient class Tp2AdapterSpec extends Specification with CatsIO { "enrichWith" should { @@ -35,22 +36,24 @@ class Tp2AdapterSpec extends Specification with CatsIO { contentType = "application/json".some ) implicit val c = TestEnvironment.http4sClient - Enrich - .enrichWith( - TestEnvironment.enrichmentReg.pure[IO], - TestEnvironment.adapterRegistry, - BlackBoxTesting.igluClient, - None, - EnrichSpec.processor, - EnrichSpec.featureFlags, - IO.unit - )( - input - ) - .map { - case (l, _) if l.forall(_.isValid) => l must haveSize(10) - case other => ko(s"there should be 10 enriched events, got $other") - } + createIgluClient(List(TestEnvironment.embeddedRegistry)).flatMap { igluClient => + Enrich + .enrichWith( + TestEnvironment.enrichmentReg.pure[IO], + TestEnvironment.adapterRegistry, + igluClient, + None, + EnrichSpec.processor, + EnrichSpec.featureFlags, + IO.unit + )( + input + ) + .map { + case (l, _) if l.forall(_.isValid) => l must haveSize(10) + case other => ko(s"there should be 10 enriched events, got $other") + } + } } } } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala index ebc19d073..51e73c887 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala @@ -29,9 +29,8 @@ import cats.effect.testing.specs2.CatsIO import fs2.Stream -import io.circe.{Json, parser} +import io.circe.parser -import com.snowplowanalytics.iglu.client.{CirceValidator, Client, Resolver} import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, Registry} import com.snowplowanalytics.snowplow.analytics.scalasdk.Event @@ -45,7 +44,7 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF import com.snowplowanalytics.snowplow.enrich.common.fs2.{Assets, AttributedData, Enrich, EnrichSpec, Environment} import com.snowplowanalytics.snowplow.enrich.common.fs2.Environment.{Enrichments, StreamsSettings} -import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers.{filesResource, ioClock} +import com.snowplowanalytics.snowplow.enrich.common.fs2.SpecHelpers.{createIgluClient, filesResource, ioClock} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Telemetry} import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients import org.http4s.client.{Client => Http4sClient} @@ -116,9 +115,6 @@ object TestEnvironment extends CatsIO { val adapterRegistry = new AdapterRegistry() - val igluClient: Client[IO, Json] = - Client[IO, Json](Resolver(List(embeddedRegistry), None), CirceValidator) - val http4sClient: Http4sClient[IO] = Http4sClient[IO] { _ => val dsl = new Http4sDsl[IO] {}; import dsl._ Resource.eval(Ok("")) @@ -147,6 +143,7 @@ object TestEnvironment extends CatsIO { goodRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty)) piiRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty)) badRef <- Resource.eval(Ref.of[IO, Vector[Array[Byte]]](Vector.empty)) + igluClient <- Resource.eval(createIgluClient(List(embeddedRegistry))) environment = Environment[IO, Array[Byte]]( igluClient, Http4sRegistryLookup(http), diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala index fcd57a254..c95b45b15 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala @@ -17,9 +17,7 @@ import cats.data.{Validated, ValidatedNel} import cats.effect.Clock import cats.implicits._ -import io.circe.Json - -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor} @@ -64,7 +62,7 @@ object EtlPipeline { def processEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[F], - client: Client[F, Json], + client: IgluCirceClient[F], processor: Processor, etlTstamp: DateTime, input: ValidatedNel[BadRow, Option[CollectorPayload]], diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/AdapterRegistry.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/AdapterRegistry.scala index a26b26095..206463e76 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/AdapterRegistry.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/AdapterRegistry.scala @@ -20,10 +20,8 @@ import cats.data.{NonEmptyList, Validated} import cats.effect.Clock import cats.implicits._ -import io.circe.Json - import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.badrows._ @@ -93,7 +91,7 @@ class AdapterRegistry(remoteAdapters: Map[(String, String), RemoteAdapter] = Map */ def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( payload: CollectorPayload, - client: Client[F, Json], + client: IgluCirceClient[F], processor: Processor ): F[Validated[BadRow, NonEmptyList[RawEvent]]] = (adapters.get((payload.api.vendor, payload.api.version)) match { diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala index 5d192a32d..eda63f7f9 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala @@ -30,7 +30,7 @@ import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.snowplow.badrows.FailureDetails @@ -115,7 +115,7 @@ trait Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[ + def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: IgluCirceClient[F]): F[ ValidatedNel[FailureDetails.AdapterFailureOrTrackerProtocolViolation, NonEmptyList[RawEvent]] ] diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/CallrailAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/CallrailAdapter.scala index ad60c090f..24cc7513b 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/CallrailAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/CallrailAdapter.scala @@ -19,12 +19,10 @@ import cats.data.NonEmptyList import cats.effect.Clock import cats.syntax.validated._ -import io.circe.Json - import org.joda.time.DateTimeZone import org.joda.time.format.DateTimeFormat -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} @@ -76,7 +74,7 @@ object CallrailAdapter extends Adapter { */ override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( payload: CollectorPayload, - client: Client[F, Json] + client: IgluCirceClient[F] ): F[Adapted] = { val _ = client val params = toMap(payload.querystring) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/CloudfrontAccessLogAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/CloudfrontAccessLogAdapter.scala index c4019ee9d..5a46b285c 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/CloudfrontAccessLogAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/CloudfrontAccessLogAdapter.scala @@ -31,7 +31,7 @@ import io.circe._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.snowplow.badrows.FailureDetails @@ -82,7 +82,10 @@ object CloudfrontAccessLogAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a validation boxing either a NEL of raw events or a NEL of failure strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = payload.body match { case Some(p) => val _ = client diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/GoogleAnalyticsAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/GoogleAnalyticsAdapter.scala index 1b50febfc..326e01aa1 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/GoogleAnalyticsAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/GoogleAnalyticsAdapter.scala @@ -26,7 +26,7 @@ import cats.effect.Clock import io.circe._ import io.circe.syntax._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} @@ -472,7 +472,7 @@ object GoogleAnalyticsAdapter extends Adapter { */ override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( payload: CollectorPayload, - client: Client[F, Json] + client: IgluCirceClient[F] ): F[Adapted] = { val events: Option[NonEmptyList[ValidatedNel[FailureDetails.AdapterFailure, RawEvent]]] = for { body <- payload.body diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/HubSpotAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/HubSpotAdapter.scala index 7833d3fda..5981783c3 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/HubSpotAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/HubSpotAdapter.scala @@ -25,7 +25,7 @@ import cats.effect.Clock import org.joda.time.DateTime -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ @@ -81,7 +81,10 @@ object HubSpotAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/IgluAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/IgluAdapter.scala index 66a16c96d..e0a049b8b 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/IgluAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/IgluAdapter.scala @@ -22,7 +22,7 @@ import cats.syntax.validated._ import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.snowplow.badrows._ @@ -68,7 +68,7 @@ object IgluAdapter extends Adapter { */ override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( payload: CollectorPayload, - client: Client[F, Json] + client: IgluCirceClient[F] ): F[Adapted] = { val _ = client val params = toMap(payload.querystring) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MailchimpAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MailchimpAdapter.scala index 7ee65561d..90f2e4560 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MailchimpAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MailchimpAdapter.scala @@ -20,7 +20,7 @@ import cats.effect.Clock import cats.syntax.either._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ @@ -75,7 +75,10 @@ object MailchimpAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MailgunAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MailgunAdapter.scala index c0d3304e7..40cc09953 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MailgunAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MailgunAdapter.scala @@ -20,7 +20,7 @@ import cats.effect.Clock import cats.syntax.either._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ @@ -67,7 +67,7 @@ object MailgunAdapter extends Adapter { */ override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( payload: CollectorPayload, - client: Client[F, Json] + client: IgluCirceClient[F] ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MandrillAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MandrillAdapter.scala index aef2b7235..d3b0e3290 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MandrillAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MandrillAdapter.scala @@ -20,7 +20,7 @@ import cats.effect.Clock import cats.syntax.either._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ @@ -70,7 +70,10 @@ object MandrillAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MarketoAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MarketoAdapter.scala index 277dfed71..18b7f5153 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MarketoAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/MarketoAdapter.scala @@ -21,7 +21,7 @@ import cats.syntax.either._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ @@ -72,7 +72,10 @@ object MarketoAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/OlarkAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/OlarkAdapter.scala index bfcfdb3df..dc1a0c540 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/OlarkAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/OlarkAdapter.scala @@ -29,7 +29,7 @@ import cats.syntax.validated._ import cats.effect.Clock -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows.FailureDetails @@ -72,7 +72,10 @@ object OlarkAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/PagerdutyAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/PagerdutyAdapter.scala index 8e85c13e9..2512bc0d6 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/PagerdutyAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/PagerdutyAdapter.scala @@ -20,7 +20,7 @@ import cats.effect.Clock import cats.syntax.either._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ @@ -62,7 +62,10 @@ object PagerdutyAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/PingdomAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/PingdomAdapter.scala index 6c4cd849b..f2d3ebb7b 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/PingdomAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/PingdomAdapter.scala @@ -20,7 +20,7 @@ import cats.effect.Clock import cats.syntax.either._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ @@ -63,7 +63,10 @@ object PingdomAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = payload.querystring match { case Nil => val msg = "empty querystring: no events to process" diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/RemoteAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/RemoteAdapter.scala index cb9d35c47..608d50112 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/RemoteAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/RemoteAdapter.scala @@ -21,7 +21,7 @@ import cats.syntax.either._ import cats.syntax.functor._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.snowplow.badrows._ import io.circe.Json @@ -49,7 +49,10 @@ final case class RemoteAdapter( * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = payload.body match { case Some(body) if body.nonEmpty => val _ = client diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/SendgridAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/SendgridAdapter.scala index 7089d5c76..931b1797e 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/SendgridAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/SendgridAdapter.scala @@ -20,11 +20,10 @@ import cats.effect.Clock import cats.syntax.either._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ -import io.circe.Json import loaders.CollectorPayload import utils.{HttpClient, JsonUtils} @@ -69,7 +68,10 @@ object SendgridAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/StatusGatorAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/StatusGatorAdapter.scala index ed8722bed..f341a8b72 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/StatusGatorAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/StatusGatorAdapter.scala @@ -25,11 +25,10 @@ import cats.data.NonEmptyList import cats.effect.Clock import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ -import io.circe.Json import io.circe.syntax._ import org.apache.http.client.utils.URLEncodedUtils @@ -59,7 +58,10 @@ object StatusGatorAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/UnbounceAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/UnbounceAdapter.scala index 9d6e602de..8285a1983 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/UnbounceAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/UnbounceAdapter.scala @@ -27,7 +27,7 @@ import cats.syntax.apply._ import cats.syntax.either._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ @@ -61,7 +61,10 @@ object UnbounceAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/UrbanAirshipAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/UrbanAirshipAdapter.scala index 5c508d785..b8bf7239e 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/UrbanAirshipAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/UrbanAirshipAdapter.scala @@ -21,11 +21,11 @@ import cats.syntax.apply._ import cats.syntax.either._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ -import io.circe.{DecodingFailure, Json} +import io.circe.DecodingFailure import org.joda.time.{DateTime, DateTimeZone} import loaders.CollectorPayload @@ -75,7 +75,10 @@ object UrbanAirshipAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/VeroAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/VeroAdapter.scala index 1980085a7..b3aeab44e 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/VeroAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/VeroAdapter.scala @@ -20,7 +20,7 @@ import cats.syntax.either._ import cats.syntax.option._ import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} @@ -61,7 +61,10 @@ object VeroAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] = + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient]( + payload: CollectorPayload, + client: IgluCirceClient[F] + ): F[Adapted] = (payload.body, payload.contentType) match { case (None, _) => val failure = FailureDetails.AdapterFailure.InputData( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/RedirectAdapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/RedirectAdapter.scala index e05c3f43e..ec202cbc8 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/RedirectAdapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/RedirectAdapter.scala @@ -26,7 +26,7 @@ import io.circe.syntax._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.snowplow.badrows.FailureDetails @@ -71,7 +71,7 @@ object RedirectAdapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[ + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: IgluCirceClient[F]): F[ ValidatedNel[FailureDetails.AdapterFailureOrTrackerProtocolViolation, NonEmptyList[RawEvent]] ] = { val _ = client diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp1Adapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp1Adapter.scala index a5c9d0885..6a7b9ce31 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp1Adapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp1Adapter.scala @@ -18,9 +18,7 @@ import cats.data.{NonEmptyList, ValidatedNel} import cats.effect.Clock import cats.syntax.validated._ -import io.circe.Json - -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.snowplow.badrows.FailureDetails @@ -40,7 +38,7 @@ object Tp1Adapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[ + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: IgluCirceClient[F]): F[ ValidatedNel[FailureDetails.AdapterFailureOrTrackerProtocolViolation, NonEmptyList[RawEvent]] ] = { val _ = client diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp2Adapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp2Adapter.scala index 3a03e8ebb..94db6064f 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp2Adapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp2Adapter.scala @@ -21,7 +21,7 @@ import cats.effect.Clock import io.circe.Json -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaCriterion, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ @@ -56,7 +56,7 @@ object Tp2Adapter extends Adapter { * @param client The Iglu client used for schema lookup and validation * @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings */ - def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[ + def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: IgluCirceClient[F]): F[ ValidatedNel[FailureDetails.AdapterFailureOrTrackerProtocolViolation, NonEmptyList[RawEvent]] ] = { val qsParams = toMap(payload.querystring) @@ -210,7 +210,7 @@ object Tp2Adapter extends Adapter { private def extractAndValidateJson[F[_]: Monad: RegistryLookup: Clock]( schemaCriterion: SchemaCriterion, instance: String, - client: Client[F, Json] + client: IgluCirceClient[F] ): EitherT[F, NonEmptyList[FailureDetails.TrackerProtocolViolation], Json] = (for { j <- EitherT.fromEither[F]( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index 2d85ed05d..548af8692 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -25,7 +25,7 @@ import cats.implicits._ import com.snowplowanalytics.refererparser._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.SelfDescribingData @@ -60,7 +60,7 @@ object EnrichmentManager { */ def enrichEvent[F[_]: Monad: RegistryLookup: Clock]( registry: EnrichmentRegistry[F], - client: Client[F, Json], + client: IgluCirceClient[F], processor: Processor, etlTstamp: DateTime, raw: RawEvent, diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala index d2695a78d..dcc37b713 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala @@ -24,7 +24,7 @@ import io.circe.syntax._ import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.forex.CreateForex @@ -56,7 +56,7 @@ object EnrichmentRegistry { */ def parse[F[_]: Monad: RegistryLookup: Clock]( json: Json, - client: Client[F, Json], + client: IgluCirceClient[F], localMode: Boolean ): F[ValidatedNel[String, List[EnrichmentConf]]] = (for { diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala index a46db59fb..45cbff319 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala @@ -21,7 +21,7 @@ import io.circe.Json import java.time.Instant -import com.snowplowanalytics.iglu.client.{Client, ClientError} +import com.snowplowanalytics.iglu.client.{ClientError, IgluCirceClient} import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData} @@ -55,7 +55,7 @@ object IgluUtils { */ def extractAndValidateInputJsons[F[_]: Monad: RegistryLookup: Clock]( enriched: EnrichedEvent, - client: Client[F, Json], + client: IgluCirceClient[F], raw: RawEvent, processor: Processor ): EitherT[ @@ -95,7 +95,7 @@ object IgluUtils { */ private[common] def extractAndValidateUnstructEvent[F[_]: Monad: RegistryLookup: Clock]( enriched: EnrichedEvent, - client: Client[F, Json], + client: IgluCirceClient[F], field: String = "ue_properties", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", 1, 0) ): F[Validated[FailureDetails.SchemaViolation, Option[SelfDescribingData[Json]]]] = @@ -132,7 +132,7 @@ object IgluUtils { */ private[common] def extractAndValidateInputContexts[F[_]: Monad: RegistryLookup: Clock]( enriched: EnrichedEvent, - client: Client[F, Json], + client: IgluCirceClient[F], field: String = "contexts", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "contexts", "jsonschema", 1, 0) ): F[ValidatedNel[FailureDetails.SchemaViolation, List[SelfDescribingData[Json]]]] = @@ -167,7 +167,7 @@ object IgluUtils { * @return Unit if all the contexts are valid */ private[common] def validateEnrichmentsContexts[F[_]: Monad: RegistryLookup: Clock]( - client: Client[F, Json], + client: IgluCirceClient[F], sdjs: List[SelfDescribingData[Json]], raw: RawEvent, processor: Processor, @@ -199,7 +199,7 @@ object IgluUtils { rawJson: String, field: String, // to put in the bad row expectedCriterion: SchemaCriterion, - client: Client[F, Json] + client: IgluCirceClient[F] ): EitherT[F, FailureDetails.SchemaViolation, Json] = for { // Parse Json string with the SDJ @@ -236,7 +236,7 @@ object IgluUtils { /** Check that a SDJ is valid */ private def check[F[_]: Monad: RegistryLookup: Clock]( - client: Client[F, Json], + client: IgluCirceClient[F], sdj: SelfDescribingData[Json] ): EitherT[F, (SchemaKey, ClientError), Unit] = client @@ -245,7 +245,7 @@ object IgluUtils { /** Check a list of SDJs and merge the Iglu errors */ private def checkList[F[_]: Monad: RegistryLookup: Clock]( - client: Client[F, Json], + client: IgluCirceClient[F], sdjs: List[SelfDescribingData[Json]] ): EitherT[F, NonEmptyList[(SchemaKey, ClientError)], Unit] = EitherT { @@ -258,7 +258,7 @@ object IgluUtils { /** Parse a Json as a SDJ and check that it's valid */ private def parseAndValidateSDJ_sv[F[_]: Monad: RegistryLookup: Clock]( // _sv for SchemaViolation json: Json, - client: Client[F, Json] + client: IgluCirceClient[F] ): EitherT[F, FailureDetails.SchemaViolation, SelfDescribingData[Json]] = for { sdj <- SelfDescribingData diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/EtlPipelineSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/EtlPipelineSpec.scala index 78854c67b..90968fe51 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/EtlPipelineSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/EtlPipelineSpec.scala @@ -16,10 +16,9 @@ import cats.Id import cats.data.Validated import cats.syntax.validated._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.Registry -import com.snowplowanalytics.iglu.client.validator.CirceValidator import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.badrows.BadRow @@ -28,8 +27,6 @@ import org.apache.thrift.TSerializer import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.{CollectorPayload => tCollectorPayload} -import io.circe.Json - import org.joda.time.DateTime import org.specs2.Specification @@ -52,7 +49,7 @@ class EtlPipelineSpec extends Specification with ValidatedMatchers { val adapterRegistry = new AdapterRegistry() val enrichmentReg = EnrichmentRegistry[Id]() val igluCentral = Registry.IgluCentral - val client = Client[Id, Json](Resolver(List(igluCentral), None), CirceValidator) + val client = IgluCirceClient.fromResolver[Id](Resolver(List(igluCentral), None), cacheSize = 0) val processor = Processor("sce-test-suite", "1.0.0") val dateTime = DateTime.now() diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala index 9aea95e95..5f2de3e9b 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.enrich.common import cats.Id import cats.implicits._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.core.SelfDescribingData import com.snowplowanalytics.iglu.core.circe.implicits._ @@ -62,7 +62,7 @@ object SpecHelpers { }""" /** Builds an Iglu client from the above Iglu configuration. */ - val client: Client[Id, Json] = Client + val client: IgluCirceClient[Id] = IgluCirceClient .parseDefault[Id](igluConfig) .value .getOrElse(throw new RuntimeException("invalid resolver configuration")) diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/adapters/registry/AdapterSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/adapters/registry/AdapterSpec.scala index d91ad417b..8e10d0cbd 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/adapters/registry/AdapterSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/adapters/registry/AdapterSpec.scala @@ -21,7 +21,7 @@ import cats.syntax.validated._ import cats.effect.Clock -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.badrows._ @@ -54,7 +54,7 @@ class AdapterSpec extends Specification with DataTables with ValidatedMatchers { // TODO: add test for buildFormatter() object BaseAdapter extends Adapter { - override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]) = { + override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: IgluCirceClient[F]) = { val _ = client Monad[F].pure( FailureDetails.AdapterFailure diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala index 835e07c5a..918838cca 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/pii/PiiPseudonymizerEnrichmentSpec.scala @@ -17,7 +17,6 @@ import cats.data.Validated import cats.syntax.option._ import cats.syntax.validated._ -import io.circe.Json import io.circe.literal._ import io.circe.parser._ @@ -30,10 +29,9 @@ import org.specs2.matcher.ValidatedMatchers import com.snowplowanalytics.iglu.core._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.Registry -import com.snowplowanalytics.iglu.client.validator.CirceValidator import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor} @@ -167,7 +165,7 @@ class PiiPseudonymizerEnrichmentSpec extends Specification with ValidatedMatcher List("com.snowplowanalytics.snowplow", "com.acme", "com.mailgun") ) val reg = Registry.Embedded(regConf, path = "/iglu-schemas") - val client = Client[Id, Json](Resolver(List(reg), None), CirceValidator) + val client = IgluCirceClient.fromResolver[Id](Resolver(List(reg), None), cacheSize = 0) EtlPipeline .processEvents[Id]( new AdapterRegistry(), diff --git a/modules/stream/common/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/Enrich.scala b/modules/stream/common/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/Enrich.scala index 0f824d712..67800a125 100644 --- a/modules/stream/common/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/Enrich.scala +++ b/modules/stream/common/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/Enrich.scala @@ -25,7 +25,7 @@ import scala.sys.process._ import cats.Id import cats.implicits._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ import com.snowplowanalytics.snowplow.badrows.Processor @@ -113,7 +113,7 @@ trait Enrich { def getSource( streamsConfig: StreamsConfig, sentryConfig: Option[SentryConfig], - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], tracker: Option[Tracker[Id]], @@ -170,11 +170,11 @@ trait Enrich { * a * @param creds optionally necessary credentials to download the resolver * @return a validated iglu resolver */ - def parseClient(resolverArg: String)(implicit creds: Credentials): Either[String, Client[Id, Json]] = + def parseClient(resolverArg: String)(implicit creds: Credentials): Either[String, IgluCirceClient[Id]] = for { parsedResolver <- extractResolver(resolverArg) json <- JsonUtils.extractJson(parsedResolver) - client <- Client.parseDefault[Id](json).leftMap(_.toString).value + client <- IgluCirceClient.parseDefault[Id](json).leftMap(_.toString).value } yield client /** @@ -202,7 +202,7 @@ trait Enrich { */ def parseEnrichmentRegistry( enrichmentsDirArg: Option[String], - client: Client[Id, Json] + client: IgluCirceClient[Id] )( implicit creds: Credentials ): Either[String, List[EnrichmentConf]] = diff --git a/modules/stream/common/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/Source.scala b/modules/stream/common/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/Source.scala index a25083571..8425e8a91 100644 --- a/modules/stream/common/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/Source.scala +++ b/modules/stream/common/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/Source.scala @@ -30,7 +30,7 @@ import cats.Id import cats.data.{Validated, ValidatedNel} import cats.data.Validated.{Invalid, Valid} import cats.implicits._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.badrows._ import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry @@ -39,7 +39,6 @@ import com.snowplowanalytics.snowplow.enrich.common.loaders.{CollectorPayload, T import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils import com.snowplowanalytics.snowplow.enrich.stream.model.SentryConfig -import io.circe.Json import org.joda.time.DateTime import org.slf4j.LoggerFactory import io.sentry.Sentry @@ -105,7 +104,7 @@ object Source { /** Abstract base for the different sources we support. */ abstract class Source( - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], processor: Processor, diff --git a/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/SpecHelpers.scala b/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/SpecHelpers.scala index 65b2ece0f..e8dcbec67 100644 --- a/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/SpecHelpers.scala +++ b/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/SpecHelpers.scala @@ -18,7 +18,7 @@ import java.util.regex.Pattern import scala.util.matching.Regex import cats.Id -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry @@ -174,7 +174,7 @@ object SpecHelpers { } val validatedResolver = for { json <- JsonUtils.extractJson(igluConfig) - resolver <- Client.parseDefault[Id](json).leftMap(_.toString).value + resolver <- IgluCirceClient.parseDefault[Id](json).leftMap(_.toString).value } yield resolver val client = validatedResolver.fold( diff --git a/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/TestSource.scala b/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/TestSource.scala index 3b5d97e82..c372fa4b9 100644 --- a/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/TestSource.scala +++ b/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/TestSource.scala @@ -18,11 +18,10 @@ package com.snowplowanalytics.snowplow.enrich.stream package sources import cats.Id -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import io.circe.Json import sinks.Sink @@ -32,7 +31,7 @@ import sinks.Sink * sources. */ class TestSource( - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id] ) extends Source(client, adapterRegistry, enrichmentRegistry, Processor("sce-test", "1.0.0"), "", None) { diff --git a/modules/stream/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/KafkaEnrich.scala b/modules/stream/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/KafkaEnrich.scala index d75ffa20f..6ebf9f25a 100644 --- a/modules/stream/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/KafkaEnrich.scala +++ b/modules/stream/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/KafkaEnrich.scala @@ -19,7 +19,7 @@ package com.snowplowanalytics.snowplow.enrich.stream import cats.Id -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry @@ -38,7 +38,7 @@ object KafkaEnrich extends Enrich { override def getSource( streamsConfig: StreamsConfig, sentryConfig: Option[SentryConfig], - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], tracker: Option[Tracker[Id]], diff --git a/modules/stream/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KafkaSource.scala b/modules/stream/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KafkaSource.scala index 1884011dd..7df4671fc 100644 --- a/modules/stream/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KafkaSource.scala +++ b/modules/stream/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KafkaSource.scala @@ -25,11 +25,10 @@ import scala.collection.JavaConverters._ import cats.Id import cats.syntax.either._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import io.circe.Json import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ @@ -42,7 +41,7 @@ object KafkaSource { def create( config: StreamsConfig, sentryConfig: Option[SentryConfig], - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], processor: Processor @@ -82,7 +81,7 @@ class KafkaSource private ( goodProducer: KafkaProducer[String, String], piiProducer: Option[KafkaProducer[String, String]], badProducer: KafkaProducer[String, String], - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], processor: Processor, diff --git a/modules/stream/kinesis/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/KinesisEnrich.scala b/modules/stream/kinesis/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/KinesisEnrich.scala index f282b940b..d34228796 100644 --- a/modules/stream/kinesis/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/KinesisEnrich.scala +++ b/modules/stream/kinesis/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/KinesisEnrich.scala @@ -30,7 +30,7 @@ import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.services.dynamodbv2.model.{AttributeValue, ScanRequest} import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item} import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.core._ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ import com.snowplowanalytics.snowplow.badrows.Processor @@ -101,7 +101,7 @@ object KinesisEnrich extends Enrich { override def getSource( streamsConfig: StreamsConfig, sentryConfig: Option[SentryConfig], - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], tracker: Option[Tracker[Id]], diff --git a/modules/stream/kinesis/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KinesisSource.scala b/modules/stream/kinesis/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KinesisSource.scala index 1e99d61a3..d1df45dcd 100644 --- a/modules/stream/kinesis/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KinesisSource.scala +++ b/modules/stream/kinesis/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KinesisSource.scala @@ -36,12 +36,11 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions._ import com.amazonaws.services.kinesis.clientlibrary.lib.worker._ import com.amazonaws.services.kinesis.model.Record import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.scalatracker.Tracker -import io.circe.Json import model.{Kinesis, SentryConfig, StreamsConfig} import sinks._ @@ -52,7 +51,7 @@ object KinesisSource { def createAndInitialize( config: StreamsConfig, sentryConfig: Option[SentryConfig], - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], tracker: Option[Tracker[Id]], @@ -83,7 +82,7 @@ object KinesisSource { /** Source to read events from a Kinesis stream */ class KinesisSource private ( - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], tracker: Option[Tracker[Id]], diff --git a/modules/stream/nsq/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/NsqEnrich.scala b/modules/stream/nsq/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/NsqEnrich.scala index 50c25bacd..662338bbc 100644 --- a/modules/stream/nsq/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/NsqEnrich.scala +++ b/modules/stream/nsq/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/NsqEnrich.scala @@ -23,7 +23,7 @@ import cats.syntax.either._ import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.badrows.Processor -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.scalatracker.Tracker import io.circe.Json @@ -39,7 +39,7 @@ object NsqEnrich extends Enrich { def getSource( streamsConfig: StreamsConfig, sentryConfig: Option[SentryConfig], - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], tracker: Option[Tracker[Id]], diff --git a/modules/stream/nsq/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/NsqSource.scala b/modules/stream/nsq/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/NsqSource.scala index 4b1a1da71..dd03672f7 100644 --- a/modules/stream/nsq/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/NsqSource.scala +++ b/modules/stream/nsq/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/NsqSource.scala @@ -26,11 +26,10 @@ import com.snowplowanalytics.client.nsq._ import com.snowplowanalytics.client.nsq.callbacks._ import com.snowplowanalytics.client.nsq.exceptions.NSQException import com.snowplowanalytics.client.nsq.lookup.DefaultNSQLookup -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import io.circe.Json import model.{Nsq, SentryConfig, StreamsConfig} import sinks.{NsqSink, Sink} @@ -40,7 +39,7 @@ object NsqSource { def create( config: StreamsConfig, sentryConfig: Option[SentryConfig], - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], processor: Processor @@ -79,7 +78,7 @@ class NsqSource private ( goodProducer: NSQProducer, piiProducer: Option[NSQProducer], badProducer: NSQProducer, - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], processor: Processor, diff --git a/modules/stream/stdin/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/StdinEnrich.scala b/modules/stream/stdin/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/StdinEnrich.scala index 00b2dfe75..0821d3456 100644 --- a/modules/stream/stdin/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/StdinEnrich.scala +++ b/modules/stream/stdin/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/StdinEnrich.scala @@ -19,7 +19,7 @@ package com.snowplowanalytics.snowplow.enrich.stream import cats.Id -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry @@ -38,7 +38,7 @@ object StdinEnrich extends Enrich { override def getSource( streamsConfig: StreamsConfig, sentryConfig: Option[SentryConfig], - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], tracker: Option[Tracker[Id]], diff --git a/modules/stream/stdin/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/StdinSource.scala b/modules/stream/stdin/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/StdinSource.scala index a2b499a13..45da1aa77 100644 --- a/modules/stream/stdin/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/StdinSource.scala +++ b/modules/stream/stdin/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/StdinSource.scala @@ -24,11 +24,10 @@ import org.apache.commons.codec.binary.Base64 import cats.Id import cats.syntax.either._ -import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import io.circe.Json import model.{SentryConfig, Stdin, StreamsConfig} import sinks.{Sink, StderrSink, StdoutSink} @@ -38,7 +37,7 @@ object StdinSource { def create( config: StreamsConfig, sentryConfig: Option[SentryConfig], - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], processor: Processor @@ -60,7 +59,7 @@ object StdinSource { /** Source to decode raw events (in base64) from stdin. */ class StdinSource private ( - client: Client[Id, Json], + client: IgluCirceClient[Id], adapterRegistry: AdapterRegistry, enrichmentRegistry: EnrichmentRegistry[Id], processor: Processor, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ac6a53460..b05dddb5c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -60,7 +60,7 @@ object Dependencies { val gatlingJsonpath = "0.6.14" val scalaUri = "1.5.1" val badRows = "2.1.1" - val igluClient = "1.1.1" + val igluClient = "1.2.0" val snowplowRawEvent = "0.1.0" val collectorPayload = "0.0.0"