-
Notifications
You must be signed in to change notification settings - Fork 36
/
AtomicFieldsLengthValidator.scala
83 lines (73 loc) · 2.83 KB
/
AtomicFieldsLengthValidator.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
* Copyright (c) 2022-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments
import org.slf4j.LoggerFactory
import cats.Monad
import cats.data.Validated.{Invalid, Valid}
import cats.data.{Ior, IorT, NonEmptyList}
import cats.implicits._
import com.snowplowanalytics.iglu.client.validator.ValidatorReport
import com.snowplowanalytics.snowplow.enrich.common.enrichments.AtomicFields.LimitedAtomicField
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
/**
* Atomic fields length validation inspired by
* https://github.com/snowplow/snowplow-scala-analytics-sdk/blob/master/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/validate/package.scala
*/
object AtomicFieldsLengthValidator {
private val logger = LoggerFactory.getLogger("InvalidEnriched")
def validate[F[_]: Monad](
event: EnrichedEvent,
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields,
emitIncomplete: Boolean
): IorT[F, Failure.SchemaViolation, Unit] =
IorT {
atomicFields.value
.map(validateField(event, _, emitIncomplete).toValidatedNel)
.combineAll match {
case Invalid(errors) if acceptInvalid =>
handleAcceptableErrors(invalidCount, event, errors) *> Monad[F].pure(Ior.Right(()))
case Invalid(errors) =>
Monad[F].pure(Ior.Both(AtomicFields.errorsToSchemaViolation(errors), ()))
case Valid(()) =>
Monad[F].pure(Ior.Right(()))
}
}
private def validateField(
event: EnrichedEvent,
atomicField: LimitedAtomicField,
emitIncomplete: Boolean
): Either[ValidatorReport, Unit] = {
val actualValue = atomicField.value.enrichedValueExtractor(event)
if (actualValue != null && actualValue.length > atomicField.limit) {
if (emitIncomplete) atomicField.value.nullify(event)
ValidatorReport(
s"Field is longer than maximum allowed size ${atomicField.limit}",
Some(atomicField.value.name),
Nil,
Some(actualValue)
).asLeft
} else
Right(())
}
private def handleAcceptableErrors[F[_]: Monad](
invalidCount: F[Unit],
event: EnrichedEvent,
errors: NonEmptyList[ValidatorReport]
): F[Unit] =
invalidCount *>
Monad[F].pure(
logger.debug(
s"Enriched event not valid against atomic schema. Event id: ${event.event_id}. Invalid fields: ${errors.map(_.path).toList.flatten.mkString(", ")}"
)
)
}