From f2290163cc71ecb49af062bb5b50d66697cb38de Mon Sep 17 00:00:00 2001 From: spenes Date: Tue, 2 Apr 2024 15:32:16 +0300 Subject: [PATCH] Address Ben's comments - 2 --- .../common/enrichments/AtomicFields.scala | 7 +- .../AtomicFieldsLengthValidator.scala | 2 +- .../enrichments/EnrichmentManager.scala | 52 ++-- .../common/enrichments/Failure.scala | 195 ++++++++++++++ .../common/enrichments/FailureEntity.scala | 246 ------------------ .../common/utils/IgluUtils.scala | 40 +-- .../enrichments/AtomicFieldsSpec.scala | 2 +- .../enrichments/EnrichmentManagerSpec.scala | 39 +-- ...lureEntitySpec.scala => FailureSpec.scala} | 60 ++--- .../utils/IgluUtilsSpec.scala | 124 ++++----- 10 files changed, 354 insertions(+), 413 deletions(-) create mode 100644 modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala delete mode 100644 modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/FailureEntity.scala rename modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/{FailureEntitySpec.scala => FailureSpec.scala} (86%) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala index 67c950978..2ddbc83b4 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala @@ -12,7 +12,6 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments import cats.data.NonEmptyList -import io.circe.Json import io.circe.syntax._ import com.snowplowanalytics.snowplow.badrows.FailureDetails @@ -136,12 +135,12 @@ object AtomicFields { AtomicFields(withLimits) } - def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureEntity.SchemaViolation = { + def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): Failure.SchemaViolation = { val clientError = ValidationError(ValidatorError.InvalidData(errors), None) - val failureData = Json.obj(errors.toList.flatMap(e => e.path.map(p => p := e.keyword)): _*) + val failureData = errors.toList.flatMap(e => e.path.map(p => p := e.keyword)).toMap.asJson - FailureEntity.SchemaViolation( + Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.IgluError( AtomicFields.atomicSchema, clientError diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala index 7771968ba..01929f75a 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala @@ -35,7 +35,7 @@ object AtomicFieldsLengthValidator { acceptInvalid: Boolean, invalidCount: F[Unit], atomicFields: AtomicFields - ): IorT[F, FailureEntity.SchemaViolation, Unit] = + ): IorT[F, Failure.SchemaViolation, Unit] = IorT { atomicFields.value .map(validateField(event, _).toValidatedNel) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index e396875aa..b0fdd3c10 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -30,7 +30,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.badrows._ -import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor} +import com.snowplowanalytics.snowplow.badrows.{Failure => BadRowFailure} import com.snowplowanalytics.snowplow.enrich.common.{EtlPipeline, QueryStringParameters, RawEventParameters} import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent @@ -46,15 +46,17 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{IgluUtils, Conversion object EnrichmentManager { + case class EnrichedWithContexts(enriched: EnrichedEvent, contexts: List[SelfDescribingData[Json]]) + private type EnrichmentResult[F[_]] = - IorT[F, NonEmptyList[IntermediateBadRow], (EnrichedEvent, List[SelfDescribingData[Json]])] + IorT[F, NonEmptyList[IntermediateBadRow], EnrichedWithContexts] // We need this intermediate representation because we have to create partially enriched event // right after an enrichment/validation step completed. If we don't do it like that and // create partially enriched event in the end instead, we might get partially enriched event // updated in the later steps. private case class IntermediateBadRow( - failureEntities: NonEmptyList[FailureEntity], + failureEntities: NonEmptyList[Failure], partiallyEnrichedEvent: Payload.PartiallyEnrichedEvent ) @@ -122,14 +124,14 @@ object EnrichmentManager { ) .leftMap(NonEmptyList.one) .possiblyExitingEarly(emitIncomplete) - } yield (enriched, validContexts ::: extractResult.validationInfoContexts) + } yield EnrichedWithContexts(enriched, validContexts ::: extractResult.validationInfoContexts) // derived contexts are set lastly because we want to include failure entities // to derived contexts as well and we can get failure entities only in the end // of the enrichment process setDerivedContexts(iorT, processor) .leftMap(createBadRow(_, RawEvent.toRawEvent(raw), processor)) - .map(_._1) + .map(_.enriched) } private def createBadRow( @@ -139,32 +141,32 @@ object EnrichmentManager { ): BadRow = { val intermediateBadRow = fe.head intermediateBadRow.failureEntities.head match { - case h: FailureEntity.SchemaViolation => - val sv = intermediateBadRow.failureEntities.tail.collect { case f: FailureEntity.SchemaViolation => f } + case h: Failure.SchemaViolation => + val sv = intermediateBadRow.failureEntities.tail.collect { case f: Failure.SchemaViolation => f } buildSchemaViolationsBadRow(NonEmptyList(h, sv), intermediateBadRow.partiallyEnrichedEvent, re, processor) - case h: FailureEntity.EnrichmentFailure => - val ef = intermediateBadRow.failureEntities.tail.collect { case f: FailureEntity.EnrichmentFailure => f } + case h: Failure.EnrichmentFailure => + val ef = intermediateBadRow.failureEntities.tail.collect { case f: Failure.EnrichmentFailure => f } buildEnrichmentFailuresBadRow(NonEmptyList(h, ef), intermediateBadRow.partiallyEnrichedEvent, re, processor) } } private def setDerivedContexts[F[_]: Sync](enriched: EnrichmentResult[F], processor: Processor): EnrichmentResult[F] = IorT( - enriched.value.flatTap(v => + enriched.value.flatTap { v => Sync[F].delay { val now = Instant.now() - val (derivedContexts, enriched) = v match { - case Ior.Right((e, l)) => (l, e.some) - case Ior.Left(l) => (convertFailureEntitiesToSDJ(l, now, processor), None) - case Ior.Both(b, (e, l)) => (l ::: convertFailureEntitiesToSDJ(b, now, processor), e.some) - } + val (derivedContexts, enriched) = v.fold( + l => (convertFailureEntitiesToSDJ(l, now, processor), None), + { case EnrichedWithContexts(e, l) => (l, e.some) }, + { case (b, EnrichedWithContexts(e, l)) => (l ::: convertFailureEntitiesToSDJ(b, now, processor), e.some) } + ) for { c <- ME.formatContexts(derivedContexts) e <- enriched _ = e.derived_contexts = c } yield () } - ) + } ) private def convertFailureEntitiesToSDJ( @@ -172,7 +174,7 @@ object EnrichmentManager { timestamp: Instant, processor: Processor ): List[SelfDescribingData[Json]] = - l.flatMap(_.failureEntities).map(FailureEntity.toSDJ(_, timestamp, processor)).toList + l.flatMap(_.failureEntities).map(_.toSDJ(timestamp, processor)).toList private def mapAndValidateInput[F[_]: Sync]( raw: RawEvent, @@ -187,7 +189,7 @@ object EnrichmentManager { .leftMap(NonEmptyList.one) extract <- IgluUtils .extractAndValidateInputJsons(enrichedEvent, client, registryLookup) - .leftMap { l: NonEmptyList[FailureEntity] => l } + .leftMap { l: NonEmptyList[Failure] => l } } yield extract iorT.leftMap(v => IntermediateBadRow(v, EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent))) @@ -215,7 +217,7 @@ object EnrichmentManager { failures.toNel match { case Some(nel) => Ior.both( - IntermediateBadRow(nel.map(FailureEntity.EnrichmentFailure), EnrichedEvent.toPartiallyEnrichedEvent(enriched)), + IntermediateBadRow(nel.map(Failure.EnrichmentFailure), EnrichedEvent.toPartiallyEnrichedEvent(enriched)), contexts ) case None => @@ -237,7 +239,7 @@ object EnrichmentManager { validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup) _ <- AtomicFieldsLengthValidator .validate[F](enriched, acceptInvalid, invalidCount, atomicFields) - .leftMap { v: FailureEntity => NonEmptyList.one(v) } + .leftMap { v: Failure => NonEmptyList.one(v) } } yield validContexts iorT.leftMap(v => IntermediateBadRow(v, EnrichedEvent.toPartiallyEnrichedEvent(enriched))) @@ -369,7 +371,7 @@ object EnrichmentManager { e: EnrichedEvent, etlTstamp: DateTime, processor: Processor - ): IorT[F, FailureEntity.SchemaViolation, Unit] = + ): IorT[F, Failure.SchemaViolation, Unit] = IorT { Sync[F].delay { e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter @@ -881,7 +883,7 @@ object EnrichmentManager { } private def buildSchemaViolationsBadRow( - fe: NonEmptyList[FailureEntity.SchemaViolation], + fe: NonEmptyList[Failure.SchemaViolation], pee: Payload.PartiallyEnrichedEvent, re: Payload.RawEvent, processor: Processor @@ -889,13 +891,13 @@ object EnrichmentManager { val now = Instant.now() BadRow.SchemaViolations( processor, - Failure.SchemaViolations(now, fe.map(_.schemaViolation)), + BadRowFailure.SchemaViolations(now, fe.map(_.schemaViolation)), Payload.EnrichmentPayload(pee, re) ) } private def buildEnrichmentFailuresBadRow( - fe: NonEmptyList[FailureEntity.EnrichmentFailure], + fe: NonEmptyList[Failure.EnrichmentFailure], pee: Payload.PartiallyEnrichedEvent, re: Payload.RawEvent, processor: Processor @@ -903,7 +905,7 @@ object EnrichmentManager { val now = Instant.now() BadRow.EnrichmentFailures( processor, - Failure.EnrichmentFailures(now, fe.map(_.enrichmentFailure)), + BadRowFailure.EnrichmentFailures(now, fe.map(_.enrichmentFailure)), Payload.EnrichmentPayload(pee, re) ) } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala new file mode 100644 index 000000000..6e982866a --- /dev/null +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2024-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.enrich.common.enrichments + +import java.time.Instant + +import cats.syntax.option._ + +import io.circe.{Encoder, Json} +import io.circe.generic.semiauto._ +import io.circe.syntax._ + +import com.snowplowanalytics.snowplow.badrows._ + +import com.snowplowanalytics.iglu.client.ClientError +import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorReport} + +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.core.circe.implicits.schemaKeyCirceJsonEncoder + +/** + * Represents a failure encountered during enrichment of the event. + * Failure entities will be attached to incomplete events as derived contexts. + */ +sealed trait Failure { + def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] +} + +object Failure { + + val failureSchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "failure", "jsonschema", SchemaVer.Full(1, 0, 0)) + + case class SchemaViolation( + schemaViolation: FailureDetails.SchemaViolation, + source: String, + data: Json + ) extends Failure { + def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] = { + val feJson = fromSchemaViolation(this, timestamp, processor) + SelfDescribingData(failureSchemaKey, feJson.asJson) + } + + } + + case class EnrichmentFailure( + enrichmentFailure: FailureDetails.EnrichmentFailure + ) extends Failure { + def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] = { + val feJson = fromEnrichmentFailure(this, timestamp, processor) + SelfDescribingData(failureSchemaKey, feJson.asJson) + } + } + + case class FailureContext( + failureType: String, + errors: List[Json], + schema: Option[SchemaKey], + data: Option[Json], + timestamp: Instant, + componentName: String, + componentVersion: String + ) + + object FailureContext { + implicit val failureContextEncoder: Encoder[FailureContext] = deriveEncoder[FailureContext] + } + + def fromEnrichmentFailure( + ef: EnrichmentFailure, + timestamp: Instant, + processor: Processor + ): FailureContext = { + val failureType = s"EnrichmentError: ${ef.enrichmentFailure.enrichment.map(_.identifier).getOrElse("")}" + val schemaKey = ef.enrichmentFailure.enrichment.map(_.schemaKey) + val (errors, data) = ef.enrichmentFailure.message match { + case FailureDetails.EnrichmentFailureMessage.InputData(field, value, expectation) => + ( + List( + Json.obj( + "message" := s"$field - $expectation", + "source" := field + ) + ), + Json.obj(field := value).some + ) + case FailureDetails.EnrichmentFailureMessage.Simple(error) => + ( + List( + Json.obj( + "message" := error + ) + ), + None + ) + case FailureDetails.EnrichmentFailureMessage.IgluError(_, error) => + // EnrichmentFailureMessage.IgluError isn't used anywhere in the project. + // We are return this value for completeness. + ( + List( + Json.obj( + "message" := error + ) + ), + None + ) + } + FailureContext( + failureType = failureType, + errors = errors, + schema = schemaKey, + data = data, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + } + + def fromSchemaViolation( + v: SchemaViolation, + timestamp: Instant, + processor: Processor + ): FailureContext = { + val (failureType, errors, schema, data) = v.schemaViolation match { + case FailureDetails.SchemaViolation.NotJson(_, _, err) => + val error = Json.obj("message" := err, "source" := v.source) + ("NotJSON", List(error), None, Json.obj(v.source := v.data).some) + case FailureDetails.SchemaViolation.NotIglu(_, err) => + val message = err.message("").split(":").headOption + val error = Json.obj("message" := message, "source" := v.source) + ("NotIglu", List(error), None, v.data.some) + case FailureDetails.SchemaViolation.CriterionMismatch(schemaKey, schemaCriterion) => + val message = s"Unexpected schema: ${schemaKey.toSchemaUri} does not match the criterion" + val error = Json.obj( + "message" := message, + "source" := v.source, + "criterion" := schemaCriterion.asString + ) + ("CriterionMismatch", List(error), schemaKey.some, v.data.some) + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ResolutionError(lh)) => + val message = s"Resolution error: schema ${schemaKey.toSchemaUri} not found" + val lookupHistory = lh.toList + .map { + case (repo, lookups) => + lookups.asJson.deepMerge(Json.obj("repository" := repo.asJson)) + } + val error = Json.obj( + "message" := message, + "source" := v.source, + "lookupHistory" := lookupHistory + ) + ("ResolutionError", List(error), schemaKey.some, v.data.some) + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidData(e), _)) => + val isAtomicField = schemaKey == AtomicFields.atomicSchema + // If error is for atomic field, we want to set the source to atomic field name. Since ValidatorReport.path + // is set to atomic field name, we are using path as source. + def source(r: ValidatorReport) = if (isAtomicField) r.path.getOrElse(v.source) else v.source + val errors = e.toList.map { r => + Json.obj( + "message" := r.message, + "source" := source(r), + "path" := r.path, + "keyword" := r.keyword, + "targets" := r.targets + ) + } + ("ValidationError", errors, schemaKey.some, v.data.some) + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidSchema(e), _)) => + val errors = e.toList.map { r => + Json.obj( + "message" := s"Invalid schema: ${schemaKey.toSchemaUri} - ${r.message}", + "source" := v.source, + "path" := r.path + ) + } + ("ValidationError", errors, schemaKey.some, v.data.some) + } + FailureContext( + failureType = failureType, + errors = errors, + schema = schema, + data = data, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + } +} diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/FailureEntity.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/FailureEntity.scala deleted file mode 100644 index 7b69a4d59..000000000 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/FailureEntity.scala +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Copyright (c) 2024-present Snowplow Analytics Ltd. - * All rights reserved. - * - * This software is made available by Snowplow Analytics, Ltd., - * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 - * located at https://docs.snowplow.io/limited-use-license-1.0 - * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION - * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. - */ -package com.snowplowanalytics.snowplow.enrich.common.enrichments - -import java.time.Instant - -import cats.syntax.option._ - -import io.circe.{Encoder, Json} -import io.circe.generic.semiauto._ -import io.circe.syntax._ - -import com.snowplowanalytics.snowplow.badrows._ - -import com.snowplowanalytics.iglu.client.ClientError -import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorReport} - -import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} -import com.snowplowanalytics.iglu.core.circe.implicits.schemaKeyCirceJsonEncoder - -/** - * Represents a failure encountered during enrichment of the event. - * Failure entities will be attached to incomplete events as derived contexts. - */ -sealed trait FailureEntity - -object FailureEntity { - - val failureEntitySchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "failure", "jsonschema", SchemaVer.Full(1, 0, 0)) - - case class SchemaViolation( - schemaViolation: FailureDetails.SchemaViolation, - source: String, - data: Json - ) extends FailureEntity - - case class EnrichmentFailure( - enrichmentFailure: FailureDetails.EnrichmentFailure - ) extends FailureEntity - - case class FailureEntityContext( - failureType: String, - errors: List[Json], - schema: Option[SchemaKey], - data: Option[Json], - timestamp: Instant, - componentName: String, - componentVersion: String - ) - - object FailureEntityContext { - implicit val failureEntityContextEncoder: Encoder[FailureEntityContext] = deriveEncoder[FailureEntityContext] - } - - def toSDJ( - fe: FailureEntity, - timestamp: Instant, - processor: Processor - ): SelfDescribingData[Json] = { - val feJson = fe match { - case sv: SchemaViolation => fromSchemaViolation(sv, timestamp, processor) - case ef: EnrichmentFailure => fromEnrichmentFailure(ef, timestamp, processor) - } - SelfDescribingData(failureEntitySchemaKey, feJson.asJson) - } - - def fromEnrichmentFailure( - ef: EnrichmentFailure, - timestamp: Instant, - processor: Processor - ): FailureEntityContext = { - val failureType = s"EnrichmentError: ${ef.enrichmentFailure.enrichment.map(_.identifier).getOrElse("")}" - val createContext = ef.enrichmentFailure.message match { - case m: FailureDetails.EnrichmentFailureMessage.InputData => - FailureEntityContext( - failureType = failureType, - errors = List( - Json.obj( - "message" := s"${m.field} - ${m.expectation}", - "source" := m.field - ) - ), - schema = ef.enrichmentFailure.enrichment.map(_.schemaKey), - data = Json.obj(m.field := m.value).some, - _, - _, - _ - ) - case m: FailureDetails.EnrichmentFailureMessage.Simple => - FailureEntityContext( - failureType = failureType, - errors = List( - Json.obj( - "message" := m.error - ) - ), - schema = ef.enrichmentFailure.enrichment.map(_.schemaKey), - data = None, - _, - _, - _ - ) - case m: FailureDetails.EnrichmentFailureMessage.IgluError => - // EnrichmentFailureMessage.IgluError isn't used anywhere in the project. - // We are return this value for completeness. - FailureEntityContext( - failureType = failureType, - errors = List( - Json.obj( - "message" := m.error - ) - ), - schema = ef.enrichmentFailure.enrichment.map(_.schemaKey), - data = None, - _, - _, - _ - ) - } - createContext(timestamp, processor.artifact, processor.version) - } - - def fromSchemaViolation( - v: SchemaViolation, - timestamp: Instant, - processor: Processor - ): FailureEntityContext = { - val createContext = v.schemaViolation match { - case f: FailureDetails.SchemaViolation.NotJson => - val error = Json.obj( - "message" := f.error, - "source" := v.source - ) - FailureEntityContext( - failureType = "NotJSON", - errors = List(error), - schema = None, - data = Json.obj(v.source := v.data).some, - _, - _, - _ - ) - case f: FailureDetails.SchemaViolation.NotIglu => - val message = f.error.message("").split(":").headOption - val error = Json.obj( - "message" := message, - "source" := v.source - ) - FailureEntityContext( - failureType = "NotIglu", - errors = List(error), - schema = None, - data = v.data.some, - _, - _, - _ - ) - case f: FailureDetails.SchemaViolation.CriterionMismatch => - val message = s"Unexpected schema: ${f.schemaKey.toSchemaUri} does not match the criterion" - val error = Json.obj( - "message" := message, - "source" := v.source, - "criterion" := f.schemaCriterion.asString - ) - FailureEntityContext( - failureType = "CriterionMismatch", - errors = List(error), - schema = f.schemaKey.some, - data = v.data.some, - _, - _, - _ - ) - case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ResolutionError(lh)) => - val message = s"Resolution error: schema ${schemaKey.toSchemaUri} not found" - val lookupHistory = lh.toList - .map { - case (repo, lookups) => - lookups.asJson.deepMerge(Json.obj("repository" := repo.asJson)) - } - val error = Json.obj( - "message" := message, - "source" := v.source, - "lookupHistory" := lookupHistory - ) - FailureEntityContext( - failureType = "ResolutionError", - errors = List(error), - schema = schemaKey.some, - data = v.data.some, - _, - _, - _ - ) - case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidData(e), _)) => - val isAtomicField = schemaKey == AtomicFields.atomicSchema - // If error is for atomic field, we want to set the source to atomic field name. Since ValidatorReport.path - // is set to atomic field name, we are using path as source. - def source(r: ValidatorReport) = if (isAtomicField) r.path.getOrElse(v.source) else v.source - val errors = e.toList.map { r => - Json.obj( - "message" := r.message, - "source" := source(r), - "path" := r.path, - "keyword" := r.keyword, - "targets" := r.targets - ) - } - FailureEntityContext( - failureType = "ValidationError", - errors = errors, - schema = schemaKey.some, - data = v.data.some, - _, - _, - _ - ) - case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidSchema(e), _)) => - val errors = e.toList.map { r => - Json.obj( - "message" := s"Invalid schema: ${schemaKey.toSchemaUri} - ${r.message}", - "source" := v.source, - "path" := r.path - ) - } - FailureEntityContext( - failureType = "ValidationError", - errors = errors, - schema = schemaKey.some, - data = v.data.some, - _, - _, - _ - ) - } - createContext(timestamp, processor.artifact, processor.version) - } -} diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala index 7f0d92868..6cea198d6 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala @@ -25,7 +25,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ -import com.snowplowanalytics.snowplow.enrich.common.enrichments.FailureEntity +import com.snowplowanalytics.snowplow.enrich.common.enrichments.Failure import com.snowplowanalytics.snowplow.badrows._ @@ -53,7 +53,7 @@ object IgluUtils { enriched: EnrichedEvent, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): IorT[F, NonEmptyList[FailureEntity.SchemaViolation], EventExtractResult] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], EventExtractResult] = for { contexts <- extractAndValidateInputContexts(enriched, client, registryLookup) unstruct <- extractAndValidateUnstructEvent(enriched, client, registryLookup) @@ -79,9 +79,9 @@ object IgluUtils { enriched: EnrichedEvent, client: IgluCirceClient[F], registryLookup: RegistryLookup[F], - field: String = "ue_properties", + field: String = "unstruct", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", 1, 0) - ): IorT[F, NonEmptyList[FailureEntity.SchemaViolation], Option[SdjExtractResult]] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], Option[SdjExtractResult]] = Option(enriched.unstruct_event) match { case Some(rawUnstructEvent) => val iorT = for { @@ -94,7 +94,7 @@ object IgluUtils { } yield unstructSDJ.some iorT.recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, None)) } case None => - IorT.rightT[F, NonEmptyList[FailureEntity.SchemaViolation]](none[SdjExtractResult]) + IorT.rightT[F, NonEmptyList[Failure.SchemaViolation]](none[SdjExtractResult]) } /** @@ -112,7 +112,7 @@ object IgluUtils { registryLookup: RegistryLookup[F], field: String = "contexts", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "contexts", "jsonschema", 1, 0) - ): IorT[F, NonEmptyList[FailureEntity.SchemaViolation], List[SdjExtractResult]] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], List[SdjExtractResult]] = Option(enriched.contexts) match { case Some(rawContexts) => val iorT = for { @@ -132,7 +132,7 @@ object IgluUtils { } yield contextsSdj iorT.recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, Nil)) } case None => - IorT.rightT[F, NonEmptyList[FailureEntity.SchemaViolation]](Nil) + IorT.rightT[F, NonEmptyList[Failure.SchemaViolation]](Nil) } /** @@ -148,12 +148,12 @@ object IgluUtils { client: IgluCirceClient[F], sdjs: List[SelfDescribingData[Json]], registryLookup: RegistryLookup[F] - ): IorT[F, NonEmptyList[FailureEntity.SchemaViolation], List[SelfDescribingData[Json]]] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], List[SelfDescribingData[Json]]] = checkList(client, sdjs, registryLookup) .leftMap( _.map { case (sdj, clientError) => - FailureEntity.SchemaViolation( + Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.IgluError(sdj.schema, clientError), source = "derived_contexts", data = sdj.asJson @@ -168,13 +168,13 @@ object IgluUtils { expectedCriterion: SchemaCriterion, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): EitherT[F, FailureEntity.SchemaViolation, Json] = + ): EitherT[F, Failure.SchemaViolation, Json] = for { // Parse Json string with the SDJ json <- JsonUtils .extractJson(rawJson) .leftMap(e => - FailureEntity.SchemaViolation( + Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.NotJson(field, rawJson.some, e), source = field, data = rawJson.asJson @@ -185,7 +185,7 @@ object IgluUtils { sdj <- SelfDescribingData .parse(json) .leftMap(e => - FailureEntity.SchemaViolation( + Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.NotIglu(json, e), source = field, data = json @@ -194,11 +194,11 @@ object IgluUtils { .toEitherT[F] // Check that the schema of SelfDescribingData[Json] is the expected one _ <- if (validateCriterion(sdj, expectedCriterion)) - EitherT.rightT[F, FailureEntity.SchemaViolation](sdj) + EitherT.rightT[F, Failure.SchemaViolation](sdj) else EitherT .leftT[F, SelfDescribingData[Json]]( - FailureEntity.SchemaViolation( + Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.CriterionMismatch(sdj.schema, expectedCriterion), source = field, data = sdj.asJson @@ -208,14 +208,14 @@ object IgluUtils { _ <- check(client, sdj, registryLookup) .leftMap { case (schemaKey, clientError) => - FailureEntity.SchemaViolation( + Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), source = field, data = sdj.asJson ) } // Extract .data of SelfDescribingData[Json] - data <- EitherT.rightT[F, FailureEntity.SchemaViolation](sdj.data) + data <- EitherT.rightT[F, Failure.SchemaViolation](sdj.data) } yield data /** Check that the schema of a SDJ matches the expected one */ @@ -257,12 +257,12 @@ object IgluUtils { client: IgluCirceClient[F], registryLookup: RegistryLookup[F], field: String - ): IorT[F, NonEmptyList[FailureEntity.SchemaViolation], SdjExtractResult] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], SdjExtractResult] = for { sdj <- IorT .fromEither[F](SelfDescribingData.parse(json)) - .leftMap[FailureEntity.SchemaViolation](e => - FailureEntity.SchemaViolation( + .leftMap[Failure.SchemaViolation](e => + Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.NotIglu(json, e), source = field, data = json.asJson @@ -272,7 +272,7 @@ object IgluUtils { supersedingSchema <- check(client, sdj, registryLookup) .leftMap { case (schemaKey, clientError) => - FailureEntity.SchemaViolation( + Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation .IgluError(schemaKey, clientError): FailureDetails.SchemaViolation, source = field, diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala index 590004f50..22b5e6baf 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala @@ -34,7 +34,7 @@ class AtomicFieldsSpec extends Specification { ValidatorReport(message = "testMessage", path = "testPath4".some, targets = List.empty, keyword = "testKeyword4".some) ) ) - val expected = FailureEntity.SchemaViolation( + val expected = Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.IgluError( schemaKey = AtomicFields.atomicSchema, error = ValidationError(ValidatorError.InvalidData(vrList), None) diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala index 73eea3bf6..ef787ca79 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala @@ -29,6 +29,7 @@ import io.circe.syntax._ import org.joda.time.DateTime import com.snowplowanalytics.snowplow.badrows._ +import com.snowplowanalytics.snowplow.badrows.{Failure => BadRowFailure} import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ @@ -176,10 +177,10 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations(_, - NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), - List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) - ) + BadRowFailure.SchemaViolations(_, + NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), + List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) + ) ), _ ) @@ -243,7 +244,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.EnrichmentFailures( _, - Failure.EnrichmentFailures( + BadRowFailure.EnrichmentFailures( _, NonEmptyList( FailureDetails.EnrichmentFailure( @@ -312,7 +313,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations( + BadRowFailure.SchemaViolations( _, NonEmptyList( _: FailureDetails.SchemaViolation.IgluError, @@ -1272,12 +1273,12 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = { val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get SpecHelpers.listContexts(enriched.derived_contexts) match { - case List(`emailSentSDJ`, SelfDescribingData(FailureEntity.`failureEntitySchemaKey`, feJson)) + case List(`emailSentSDJ`, SelfDescribingData(Failure.`failureSchemaKey`, feJson)) if feJson.field("failureType") == "ValidationError".asJson && feJson.field("errors") == Json.arr( Json.obj( "message" := "$.unallowedAdditionalField: is not defined in the schema and the schema does not allow additional properties", - "source" := "ue_properties", + "source" := "unstruct", "path" := "$", "keyword" := "additionalProperties", "targets" := List("unallowedAdditionalField") @@ -1361,7 +1362,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = { val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get SpecHelpers.listContexts(enriched.derived_contexts) match { - case List(`emailSentSDJ`, SelfDescribingData(FailureEntity.`failureEntitySchemaKey`, feJson)) + case List(`emailSentSDJ`, SelfDescribingData(Failure.`failureSchemaKey`, feJson)) if feJson.field("failureType") == "ValidationError".asJson && feJson.field("errors") == Json.arr( Json.obj( @@ -1460,7 +1461,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = { val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get SpecHelpers.listContexts(enriched.derived_contexts) match { - case List(`emailSentSDJ`, SelfDescribingData(FailureEntity.`failureEntitySchemaKey`, feJson)) + case List(`emailSentSDJ`, SelfDescribingData(Failure.`failureSchemaKey`, feJson)) if feJson.field("failureType") == "ValidationError".asJson && feJson.field("errors") == Json.arr( Json.obj( @@ -1543,7 +1544,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE SpecHelpers.listContexts(enriched.derived_contexts) match { case List( SelfDescribingData(SchemaKey("nl.basjes", "yauaa_context", "jsonschema", _), _), - SelfDescribingData(FailureEntity.`failureEntitySchemaKey`, feJson) + SelfDescribingData(Failure.`failureSchemaKey`, feJson) ) if feJson.field("failureType") == "EnrichmentError: Javascript enrichment".asJson && feJson.field("errors") == Json.arr( @@ -1613,8 +1614,8 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = SpecHelpers.listContexts(enriched.derived_contexts) match { case List( - SelfDescribingData(FailureEntity.`failureEntitySchemaKey`, validationError), - SelfDescribingData(FailureEntity.`failureEntitySchemaKey`, enrichmentError) + SelfDescribingData(Failure.`failureSchemaKey`, validationError), + SelfDescribingData(Failure.`failureSchemaKey`, enrichmentError) ) if validationError.field("failureType") == "ValidationError".asJson && validationError.field("errors") == Json.arr( @@ -1704,7 +1705,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE SpecHelpers.listContexts(enriched.derived_contexts) match { case List( SelfDescribingData(SchemaKey("nl.basjes", "yauaa_context", "jsonschema", _), _), - SelfDescribingData(FailureEntity.`failureEntitySchemaKey`, feJson) + SelfDescribingData(Failure.`failureSchemaKey`, feJson) ) if feJson.field("failureType") == "ValidationError".asJson && feJson.field("errors") == Json.arr( @@ -2230,7 +2231,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations(_, NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), Nil)), + BadRowFailure.SchemaViolations(_, NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), Nil)), _ ) ) => @@ -2311,10 +2312,10 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations(_, - NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), - List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) - ) + BadRowFailure.SchemaViolations(_, + NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), + List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) + ) ), _ ) diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureEntitySpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureSpec.scala similarity index 86% rename from modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureEntitySpec.scala rename to modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureSpec.scala index a83fecebc..678d65d51 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureEntitySpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureSpec.scala @@ -38,7 +38,7 @@ import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorRep import com.snowplowanalytics.iglu.client.resolver.LookupHistory import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEffect with ScalaCheck { +class FailureSpec extends Specification with ValidatedMatchers with CatsEffect with ScalaCheck { val timestamp = Instant.now() val processor = Processor("unit tests SCE", "v42") @@ -59,7 +59,7 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf errors <- Gen.listOf(jsonGen) data <- Gen.option(jsonGen) schema <- Gen.option(Gen.const(schemaKey)) - } yield FailureEntity.FailureEntityContext( + } yield Failure.FailureContext( failureType = failureType, errors = errors, schema = schema, @@ -69,8 +69,8 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf componentVersion = processor.version ) - Prop.forAll(genFeContext) { feContext: FailureEntity.FailureEntityContext => - val sdj = SelfDescribingData(schema = FailureEntity.failureEntitySchemaKey, data = feContext.asJson) + Prop.forAll(genFeContext) { feContext: Failure.FailureContext => + val sdj = SelfDescribingData(schema = Failure.failureSchemaKey, data = feContext.asJson) SpecHelpers.client .check(sdj) .value @@ -81,7 +81,7 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf "fromEnrichmentFailure" should { "convert InputData correctly" >> { - val ef = FailureEntity.EnrichmentFailure( + val ef = Failure.EnrichmentFailure( enrichmentFailure = FailureDetails.EnrichmentFailure( enrichment = FailureDetails .EnrichmentInformation( @@ -96,8 +96,8 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf ) ) ) - val result = FailureEntity.fromEnrichmentFailure(ef, timestamp, processor) - val expected = FailureEntity.FailureEntityContext( + val result = Failure.fromEnrichmentFailure(ef, timestamp, processor) + val expected = Failure.FailureContext( failureType = "EnrichmentError: enrichmentId", errors = List( Json.obj( @@ -115,7 +115,7 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf } "convert Simple correctly" >> { - val ef = FailureEntity.EnrichmentFailure( + val ef = Failure.EnrichmentFailure( enrichmentFailure = FailureDetails.EnrichmentFailure( enrichment = FailureDetails .EnrichmentInformation( @@ -126,8 +126,8 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf message = FailureDetails.EnrichmentFailureMessage.Simple(error = "testError") ) ) - val result = FailureEntity.fromEnrichmentFailure(ef, timestamp, processor) - val expected = FailureEntity.FailureEntityContext( + val result = Failure.fromEnrichmentFailure(ef, timestamp, processor) + val expected = Failure.FailureContext( failureType = "EnrichmentError: enrichmentId", errors = List(Json.obj("message" := "testError")), schema = schemaKey.some, @@ -142,7 +142,7 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf "fromSchemaViolation" should { "convert NotJson correctly" >> { - val sv = FailureEntity.SchemaViolation( + val sv = Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.NotJson( field = "testField", value = "testValue".some, @@ -151,8 +151,8 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf source = "testSource", data = "testData".asJson ) - val fe = FailureEntity.fromSchemaViolation(sv, timestamp, processor) - val expected = FailureEntity.FailureEntityContext( + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( failureType = "NotJSON", errors = List( Json.obj( @@ -170,7 +170,7 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf } "convert NotIglu correctly" >> { - val sv = FailureEntity.SchemaViolation( + val sv = Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.NotIglu( json = Json.Null, error = ParseError.InvalidSchema @@ -178,8 +178,8 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf source = "testSource", data = "testData".asJson ) - val fe = FailureEntity.fromSchemaViolation(sv, timestamp, processor) - val expected = FailureEntity.FailureEntityContext( + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( failureType = "NotIglu", errors = List( Json.obj( @@ -197,7 +197,7 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf } "convert CriterionMismatch correctly" >> { - val sv = FailureEntity.SchemaViolation( + val sv = Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.CriterionMismatch( schemaKey = schemaKey, schemaCriterion = schemaCriterion @@ -205,8 +205,8 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf source = "testSource", data = "testData".asJson ) - val fe = FailureEntity.fromSchemaViolation(sv, timestamp, processor) - val expected = FailureEntity.FailureEntityContext( + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( failureType = "CriterionMismatch", errors = List( Json.obj( @@ -225,7 +225,7 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf } "convert ResolutionError correctly" >> { - val sv = FailureEntity.SchemaViolation( + val sv = Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.IgluError( schemaKey = schemaKey, error = ClientError.ResolutionError( @@ -246,8 +246,8 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf source = "testSource", data = "testData".asJson ) - val fe = FailureEntity.fromSchemaViolation(sv, timestamp, processor) - val expected = FailureEntity.FailureEntityContext( + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( failureType = "ResolutionError", errors = List( Json.obj( @@ -270,7 +270,7 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf "convert InvalidData correctly" >> { def createSv(schemaKey: SchemaKey) = - FailureEntity.SchemaViolation( + Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.IgluError( schemaKey = schemaKey, error = ClientError.ValidationError( @@ -297,9 +297,9 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf val svWithAtomicSchema = createSv(AtomicFields.atomicSchema) val svWithOrdinarySchema = createSv(schemaKey) - val feWithAtomicSchema = FailureEntity.fromSchemaViolation(svWithAtomicSchema, timestamp, processor) - val feWithOrdinarySchema = FailureEntity.fromSchemaViolation(svWithOrdinarySchema, timestamp, processor) - val expectedWithAtomicSchema = FailureEntity.FailureEntityContext( + val feWithAtomicSchema = Failure.fromSchemaViolation(svWithAtomicSchema, timestamp, processor) + val feWithOrdinarySchema = Failure.fromSchemaViolation(svWithOrdinarySchema, timestamp, processor) + val expectedWithAtomicSchema = Failure.FailureContext( failureType = "ValidationError", errors = List( Json.obj("message" := "testMessage1", @@ -321,7 +321,7 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf componentName = processor.artifact, componentVersion = processor.version ) - val expectedWithOrdinarySchema = FailureEntity.FailureEntityContext( + val expectedWithOrdinarySchema = Failure.FailureContext( failureType = "ValidationError", errors = List( Json.obj("message" := "testMessage1", @@ -349,7 +349,7 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf } "convert InvalidSchema correctly" >> { - val sv = FailureEntity.SchemaViolation( + val sv = Failure.SchemaViolation( schemaViolation = FailureDetails.SchemaViolation.IgluError( schemaKey = schemaKey, error = ClientError.ValidationError( @@ -365,8 +365,8 @@ class FailureEntitySpec extends Specification with ValidatedMatchers with CatsEf source = "testSource", data = "testData".asJson ) - val fe = FailureEntity.fromSchemaViolation(sv, timestamp, processor) - val expected = FailureEntity.FailureEntityContext( + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( failureType = "ValidationError", errors = List( Json.obj( diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala index ed595cda4..38aa5500d 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala @@ -28,7 +28,7 @@ import com.snowplowanalytics.iglu.client.ClientError.{ResolutionError, Validatio import com.snowplowanalytics.snowplow.badrows._ import com.snowplowanalytics.snowplow.badrows.FailureDetails -import com.snowplowanalytics.snowplow.enrich.common.enrichments.FailureEntity +import com.snowplowanalytics.snowplow.enrich.common.enrichments.Failure import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent @@ -46,7 +46,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect val processor = Processor("unit tests SCE", "v42") val enriched = new EnrichedEvent() - val uePropertiesFieldName = "ue_properties" + val unstructFieldName = "unstruct" val contextsFieldName = "contexts" val derivedContextsFieldName = "derived_contexts" @@ -151,7 +151,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect } } - "return a FailureDetails.FailureDetails.SchemaViolation.NotJson if unstruct_event does not contain a properly formatted JSON string" >> { + "return a FailureDetails.SchemaViolation.NotJson if unstruct_event does not contain a properly formatted JSON string" >> { val input = new EnrichedEvent input.setUnstruct_event(notJson) @@ -161,7 +161,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - FailureEntity.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `uePropertiesFieldName`, `jsonNotJson`), + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `unstructFieldName`, `jsonNotJson`), _ ), None @@ -181,7 +181,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureEntity.SchemaViolation(_: FailureDetails.SchemaViolation.NotIglu, `uePropertiesFieldName`, `json`), _), + NonEmptyList(Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotIglu, `unstructFieldName`, `json`), _), None ) => ok @@ -200,7 +200,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - FailureEntity.SchemaViolation(_: FailureDetails.SchemaViolation.CriterionMismatch, `uePropertiesFieldName`, `json`), + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.CriterionMismatch, `unstructFieldName`, `json`), _ ), None @@ -221,7 +221,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both(NonEmptyList( - FailureEntity.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `uePropertiesFieldName`, `ueJson`), + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `unstructFieldName`, `ueJson`), _ ), None @@ -241,9 +241,9 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), - `uePropertiesFieldName`, - `ueJson` + case Ior.Both(NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `unstructFieldName`, + `ueJson` ), _ ), @@ -263,9 +263,9 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), - `uePropertiesFieldName`, - `json` + case Ior.Both(NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `unstructFieldName`, + `json` ), _ ), @@ -286,11 +286,9 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), - `uePropertiesFieldName`, - `json` - ), - _ + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), `unstructFieldName`, `json`), + _ ), None ) => @@ -376,9 +374,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureEntity.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `contextsFieldName`, `jsonNotJson`), - Nil - ), + NonEmptyList(Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `contextsFieldName`, `jsonNotJson`), Nil), Nil ) => ok @@ -396,7 +392,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureEntity.SchemaViolation(_: FailureDetails.SchemaViolation.NotIglu, `contextsFieldName`, `json`), Nil), + NonEmptyList(Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotIglu, `contextsFieldName`, `json`), Nil), Nil ) => ok @@ -414,7 +410,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both(NonEmptyList( - FailureEntity.SchemaViolation(_: FailureDetails.SchemaViolation.CriterionMismatch, `contextsFieldName`, `json`), + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.CriterionMismatch, `contextsFieldName`, `json`), Nil ), Nil @@ -436,11 +432,9 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), - `contextsFieldName`, - `json` - ), - Nil + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), `contextsFieldName`, `json`), + Nil ), Nil ) => @@ -459,11 +453,9 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), - `contextsFieldName`, - `json` - ), - Nil + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), `contextsFieldName`, `json`), + Nil ), Nil ) => @@ -482,11 +474,9 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), - `contextsFieldName`, - `json` - ), - Nil + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), `contextsFieldName`, `json`), + Nil ), Nil ) => @@ -506,14 +496,14 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both(NonEmptyList( - FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), - `contextsFieldName`, - `invalidEmailSentJson` + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `contextsFieldName`, + `invalidEmailSentJson` ), List( - FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), - `contextsFieldName`, - `noSchemaJson` + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), + `contextsFieldName`, + `noSchemaJson` ) ) ), @@ -534,7 +524,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both(NonEmptyList( - FailureEntity.SchemaViolation(_: FailureDetails.SchemaViolation.IgluError, `contextsFieldName`, `noSchemaJson`), + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.IgluError, `contextsFieldName`, `noSchemaJson`), Nil ), List(extract) @@ -602,9 +592,9 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), - `derivedContextsFieldName`, - `json` + NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `derivedContextsFieldName`, + `json` ), Nil ), @@ -628,14 +618,14 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both(NonEmptyList( - FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), - `derivedContextsFieldName`, - `invalidEmailSentJson` + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `derivedContextsFieldName`, + `invalidEmailSentJson` ), List( - FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), - `derivedContextsFieldName`, - `noSchemaJson` + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), + `derivedContextsFieldName`, + `noSchemaJson` ) ) ), @@ -658,9 +648,9 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both(NonEmptyList( - FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), - `derivedContextsFieldName`, - `invalidEmailSentJson` + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `derivedContextsFieldName`, + `invalidEmailSentJson` ), Nil ), @@ -702,7 +692,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - _: FailureEntity.SchemaViolation, + _: Failure.SchemaViolation, Nil ), IgluUtils.EventExtractResult(Nil, None, Nil) @@ -726,7 +716,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - _: FailureEntity.SchemaViolation, + _: Failure.SchemaViolation, Nil ), IgluUtils.EventExtractResult(Nil, None, Nil) @@ -751,8 +741,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - _: FailureEntity.SchemaViolation, - List(_: FailureEntity.SchemaViolation) + _: Failure.SchemaViolation, + List(_: Failure.SchemaViolation) ), IgluUtils.EventExtractResult(Nil, None, Nil) ) => @@ -801,9 +791,9 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), - `contextsFieldName`, - `invalidEmailSentJson` + NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `contextsFieldName`, + `invalidEmailSentJson` ), _ ), @@ -832,9 +822,9 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureEntity.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), - `uePropertiesFieldName`, - `invalidEmailSentJson` + NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `unstructFieldName`, + `invalidEmailSentJson` ), _ ),