Skip to content

Commit

Permalink
Address Ben's comments - 2
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Apr 3, 2024
1 parent d1eb35e commit f229016
Show file tree
Hide file tree
Showing 10 changed files with 354 additions and 413 deletions.
Expand Up @@ -12,7 +12,6 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments

import cats.data.NonEmptyList

import io.circe.Json
import io.circe.syntax._

import com.snowplowanalytics.snowplow.badrows.FailureDetails
Expand Down Expand Up @@ -136,12 +135,12 @@ object AtomicFields {
AtomicFields(withLimits)
}

def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureEntity.SchemaViolation = {
def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): Failure.SchemaViolation = {
val clientError = ValidationError(ValidatorError.InvalidData(errors), None)

val failureData = Json.obj(errors.toList.flatMap(e => e.path.map(p => p := e.keyword)): _*)
val failureData = errors.toList.flatMap(e => e.path.map(p => p := e.keyword)).toMap.asJson

FailureEntity.SchemaViolation(
Failure.SchemaViolation(
schemaViolation = FailureDetails.SchemaViolation.IgluError(
AtomicFields.atomicSchema,
clientError
Expand Down
Expand Up @@ -35,7 +35,7 @@ object AtomicFieldsLengthValidator {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, FailureEntity.SchemaViolation, Unit] =
): IorT[F, Failure.SchemaViolation, Unit] =
IorT {
atomicFields.value
.map(validateField(event, _).toValidatedNel)
Expand Down
Expand Up @@ -30,7 +30,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows._
import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.badrows.{Failure => BadRowFailure}

import com.snowplowanalytics.snowplow.enrich.common.{EtlPipeline, QueryStringParameters, RawEventParameters}
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
Expand All @@ -46,15 +46,17 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{IgluUtils, Conversion

object EnrichmentManager {

case class EnrichedWithContexts(enriched: EnrichedEvent, contexts: List[SelfDescribingData[Json]])

private type EnrichmentResult[F[_]] =
IorT[F, NonEmptyList[IntermediateBadRow], (EnrichedEvent, List[SelfDescribingData[Json]])]
IorT[F, NonEmptyList[IntermediateBadRow], EnrichedWithContexts]

// We need this intermediate representation because we have to create partially enriched event
// right after an enrichment/validation step completed. If we don't do it like that and
// create partially enriched event in the end instead, we might get partially enriched event
// updated in the later steps.
private case class IntermediateBadRow(
failureEntities: NonEmptyList[FailureEntity],
failureEntities: NonEmptyList[Failure],
partiallyEnrichedEvent: Payload.PartiallyEnrichedEvent
)

Expand Down Expand Up @@ -122,14 +124,14 @@ object EnrichmentManager {
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
} yield (enriched, validContexts ::: extractResult.validationInfoContexts)
} yield EnrichedWithContexts(enriched, validContexts ::: extractResult.validationInfoContexts)

// derived contexts are set lastly because we want to include failure entities
// to derived contexts as well and we can get failure entities only in the end
// of the enrichment process
setDerivedContexts(iorT, processor)
.leftMap(createBadRow(_, RawEvent.toRawEvent(raw), processor))
.map(_._1)
.map(_.enriched)
}

private def createBadRow(
Expand All @@ -139,40 +141,40 @@ object EnrichmentManager {
): BadRow = {
val intermediateBadRow = fe.head
intermediateBadRow.failureEntities.head match {
case h: FailureEntity.SchemaViolation =>
val sv = intermediateBadRow.failureEntities.tail.collect { case f: FailureEntity.SchemaViolation => f }
case h: Failure.SchemaViolation =>
val sv = intermediateBadRow.failureEntities.tail.collect { case f: Failure.SchemaViolation => f }
buildSchemaViolationsBadRow(NonEmptyList(h, sv), intermediateBadRow.partiallyEnrichedEvent, re, processor)
case h: FailureEntity.EnrichmentFailure =>
val ef = intermediateBadRow.failureEntities.tail.collect { case f: FailureEntity.EnrichmentFailure => f }
case h: Failure.EnrichmentFailure =>
val ef = intermediateBadRow.failureEntities.tail.collect { case f: Failure.EnrichmentFailure => f }
buildEnrichmentFailuresBadRow(NonEmptyList(h, ef), intermediateBadRow.partiallyEnrichedEvent, re, processor)
}
}

private def setDerivedContexts[F[_]: Sync](enriched: EnrichmentResult[F], processor: Processor): EnrichmentResult[F] =
IorT(
enriched.value.flatTap(v =>
enriched.value.flatTap { v =>
Sync[F].delay {
val now = Instant.now()
val (derivedContexts, enriched) = v match {
case Ior.Right((e, l)) => (l, e.some)
case Ior.Left(l) => (convertFailureEntitiesToSDJ(l, now, processor), None)
case Ior.Both(b, (e, l)) => (l ::: convertFailureEntitiesToSDJ(b, now, processor), e.some)
}
val (derivedContexts, enriched) = v.fold(
l => (convertFailureEntitiesToSDJ(l, now, processor), None),
{ case EnrichedWithContexts(e, l) => (l, e.some) },
{ case (b, EnrichedWithContexts(e, l)) => (l ::: convertFailureEntitiesToSDJ(b, now, processor), e.some) }
)
for {
c <- ME.formatContexts(derivedContexts)
e <- enriched
_ = e.derived_contexts = c
} yield ()
}
)
}
)

private def convertFailureEntitiesToSDJ(
l: NonEmptyList[IntermediateBadRow],
timestamp: Instant,
processor: Processor
): List[SelfDescribingData[Json]] =
l.flatMap(_.failureEntities).map(FailureEntity.toSDJ(_, timestamp, processor)).toList
l.flatMap(_.failureEntities).map(_.toSDJ(timestamp, processor)).toList

private def mapAndValidateInput[F[_]: Sync](
raw: RawEvent,
Expand All @@ -187,7 +189,7 @@ object EnrichmentManager {
.leftMap(NonEmptyList.one)
extract <- IgluUtils
.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
.leftMap { l: NonEmptyList[FailureEntity] => l }
.leftMap { l: NonEmptyList[Failure] => l }
} yield extract

iorT.leftMap(v => IntermediateBadRow(v, EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent)))
Expand Down Expand Up @@ -215,7 +217,7 @@ object EnrichmentManager {
failures.toNel match {
case Some(nel) =>
Ior.both(
IntermediateBadRow(nel.map(FailureEntity.EnrichmentFailure), EnrichedEvent.toPartiallyEnrichedEvent(enriched)),
IntermediateBadRow(nel.map(Failure.EnrichmentFailure), EnrichedEvent.toPartiallyEnrichedEvent(enriched)),
contexts
)
case None =>
Expand All @@ -237,7 +239,7 @@ object EnrichmentManager {
validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup)
_ <- AtomicFieldsLengthValidator
.validate[F](enriched, acceptInvalid, invalidCount, atomicFields)
.leftMap { v: FailureEntity => NonEmptyList.one(v) }
.leftMap { v: Failure => NonEmptyList.one(v) }
} yield validContexts

iorT.leftMap(v => IntermediateBadRow(v, EnrichedEvent.toPartiallyEnrichedEvent(enriched)))
Expand Down Expand Up @@ -369,7 +371,7 @@ object EnrichmentManager {
e: EnrichedEvent,
etlTstamp: DateTime,
processor: Processor
): IorT[F, FailureEntity.SchemaViolation, Unit] =
): IorT[F, Failure.SchemaViolation, Unit] =
IorT {
Sync[F].delay {
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
Expand Down Expand Up @@ -881,29 +883,29 @@ object EnrichmentManager {
}

private def buildSchemaViolationsBadRow(
fe: NonEmptyList[FailureEntity.SchemaViolation],
fe: NonEmptyList[Failure.SchemaViolation],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
): BadRow = {
val now = Instant.now()
BadRow.SchemaViolations(
processor,
Failure.SchemaViolations(now, fe.map(_.schemaViolation)),
BadRowFailure.SchemaViolations(now, fe.map(_.schemaViolation)),
Payload.EnrichmentPayload(pee, re)
)
}

private def buildEnrichmentFailuresBadRow(
fe: NonEmptyList[FailureEntity.EnrichmentFailure],
fe: NonEmptyList[Failure.EnrichmentFailure],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
): BadRow = {
val now = Instant.now()
BadRow.EnrichmentFailures(
processor,
Failure.EnrichmentFailures(now, fe.map(_.enrichmentFailure)),
BadRowFailure.EnrichmentFailures(now, fe.map(_.enrichmentFailure)),
Payload.EnrichmentPayload(pee, re)
)
}
Expand Down
@@ -0,0 +1,195 @@
/*
* Copyright (c) 2024-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments

import java.time.Instant

import cats.syntax.option._

import io.circe.{Encoder, Json}
import io.circe.generic.semiauto._
import io.circe.syntax._

import com.snowplowanalytics.snowplow.badrows._

import com.snowplowanalytics.iglu.client.ClientError
import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorReport}

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits.schemaKeyCirceJsonEncoder

/**
* Represents a failure encountered during enrichment of the event.
* Failure entities will be attached to incomplete events as derived contexts.
*/
sealed trait Failure {
def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json]
}

object Failure {

val failureSchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "failure", "jsonschema", SchemaVer.Full(1, 0, 0))

case class SchemaViolation(
schemaViolation: FailureDetails.SchemaViolation,
source: String,
data: Json
) extends Failure {
def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] = {
val feJson = fromSchemaViolation(this, timestamp, processor)
SelfDescribingData(failureSchemaKey, feJson.asJson)
}

}

case class EnrichmentFailure(
enrichmentFailure: FailureDetails.EnrichmentFailure
) extends Failure {
def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] = {
val feJson = fromEnrichmentFailure(this, timestamp, processor)
SelfDescribingData(failureSchemaKey, feJson.asJson)
}
}

case class FailureContext(
failureType: String,
errors: List[Json],
schema: Option[SchemaKey],
data: Option[Json],
timestamp: Instant,
componentName: String,
componentVersion: String
)

object FailureContext {
implicit val failureContextEncoder: Encoder[FailureContext] = deriveEncoder[FailureContext]
}

def fromEnrichmentFailure(
ef: EnrichmentFailure,
timestamp: Instant,
processor: Processor
): FailureContext = {
val failureType = s"EnrichmentError: ${ef.enrichmentFailure.enrichment.map(_.identifier).getOrElse("")}"
val schemaKey = ef.enrichmentFailure.enrichment.map(_.schemaKey)
val (errors, data) = ef.enrichmentFailure.message match {
case FailureDetails.EnrichmentFailureMessage.InputData(field, value, expectation) =>
(
List(
Json.obj(
"message" := s"$field - $expectation",
"source" := field
)
),
Json.obj(field := value).some
)
case FailureDetails.EnrichmentFailureMessage.Simple(error) =>
(
List(
Json.obj(
"message" := error
)
),
None
)
case FailureDetails.EnrichmentFailureMessage.IgluError(_, error) =>
// EnrichmentFailureMessage.IgluError isn't used anywhere in the project.
// We are return this value for completeness.
(
List(
Json.obj(
"message" := error
)
),
None
)
}
FailureContext(
failureType = failureType,
errors = errors,
schema = schemaKey,
data = data,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
)
}

def fromSchemaViolation(
v: SchemaViolation,
timestamp: Instant,
processor: Processor
): FailureContext = {
val (failureType, errors, schema, data) = v.schemaViolation match {
case FailureDetails.SchemaViolation.NotJson(_, _, err) =>
val error = Json.obj("message" := err, "source" := v.source)
("NotJSON", List(error), None, Json.obj(v.source := v.data).some)
case FailureDetails.SchemaViolation.NotIglu(_, err) =>
val message = err.message("").split(":").headOption
val error = Json.obj("message" := message, "source" := v.source)
("NotIglu", List(error), None, v.data.some)
case FailureDetails.SchemaViolation.CriterionMismatch(schemaKey, schemaCriterion) =>
val message = s"Unexpected schema: ${schemaKey.toSchemaUri} does not match the criterion"
val error = Json.obj(
"message" := message,
"source" := v.source,
"criterion" := schemaCriterion.asString
)
("CriterionMismatch", List(error), schemaKey.some, v.data.some)
case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ResolutionError(lh)) =>
val message = s"Resolution error: schema ${schemaKey.toSchemaUri} not found"
val lookupHistory = lh.toList
.map {
case (repo, lookups) =>
lookups.asJson.deepMerge(Json.obj("repository" := repo.asJson))
}
val error = Json.obj(
"message" := message,
"source" := v.source,
"lookupHistory" := lookupHistory
)
("ResolutionError", List(error), schemaKey.some, v.data.some)
case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidData(e), _)) =>
val isAtomicField = schemaKey == AtomicFields.atomicSchema
// If error is for atomic field, we want to set the source to atomic field name. Since ValidatorReport.path
// is set to atomic field name, we are using path as source.
def source(r: ValidatorReport) = if (isAtomicField) r.path.getOrElse(v.source) else v.source
val errors = e.toList.map { r =>
Json.obj(
"message" := r.message,
"source" := source(r),
"path" := r.path,
"keyword" := r.keyword,
"targets" := r.targets
)
}
("ValidationError", errors, schemaKey.some, v.data.some)
case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidSchema(e), _)) =>
val errors = e.toList.map { r =>
Json.obj(
"message" := s"Invalid schema: ${schemaKey.toSchemaUri} - ${r.message}",
"source" := v.source,
"path" := r.path
)
}
("ValidationError", errors, schemaKey.some, v.data.some)
}
FailureContext(
failureType = failureType,
errors = errors,
schema = schema,
data = data,
timestamp = timestamp,
componentName = processor.artifact,
componentVersion = processor.version
)
}
}

0 comments on commit f229016

Please sign in to comment.