Skip to content

Commit

Permalink
common: ignore API/SQL enrichments when failing (close #760)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed May 8, 2023
1 parent 6a11d15 commit f67673e
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 48 deletions.
5 changes: 3 additions & 2 deletions config/enrichments/api_request_enrichment_config.json
@@ -1,5 +1,5 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.enrichments/api_request_enrichment_config/jsonschema/1-0-0",
"schema": "iglu:com.snowplowanalytics.snowplow.enrichments/api_request_enrichment_config/jsonschema/1-0-2",

"data": {

Expand Down Expand Up @@ -53,7 +53,8 @@
"cache": {
"size": 3000,
"ttl": 60
}
},
"ignoreOnError": false
}
}
}
5 changes: 3 additions & 2 deletions config/enrichments/sql_query_enrichment_config.json
@@ -1,5 +1,5 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.enrichments/sql_query_enrichment_config/jsonschema/1-0-0",
"schema": "iglu:com.snowplowanalytics.snowplow.enrichments/sql_query_enrichment_config/jsonschema/1-0-1",

"data": {

Expand Down Expand Up @@ -53,7 +53,8 @@
"cache": {
"size": 3000,
"ttl": 60
}
},
"ignoreOnError": false
}
}
}
Expand Up @@ -67,7 +67,8 @@ class ApiRequestEnrichmentSpec extends Specification with CatsIO {
List(Input.Json("key1", "unstruct_event", SchemaCriterion("com.acme", "test", "jsonschema", 1), "$.path.id")),
HttpApi("GET", "http://localhost:8080/enrichment/api/{{key1}}", 2000, Authentication(None)),
List(RegistryOutput("iglu:com.acme/output/jsonschema/1-0-0", Some(JsonOutput("$")))),
Cache(1, 1000)
Cache(1, 1000),
ignoreOnError = false
)

val expected = Contexts(
Expand Down
Expand Up @@ -54,7 +54,8 @@ object EnrichmentConf {
inputs: List[apirequest.Input],
api: HttpApi,
outputs: List[apirequest.Output],
cache: apirequest.Cache
cache: apirequest.Cache,
ignoreOnError: Boolean
) extends EnrichmentConf {
def enrichment[F[_]: CreateApiRequestEnrichment]: F[ApiRequestEnrichment[F]] =
ApiRequestEnrichment[F](this)
Expand All @@ -76,7 +77,8 @@ object EnrichmentConf {
db: Rdbms,
query: SqlQueryEnrichment.Query,
output: sqlquery.Output,
cache: SqlQueryEnrichment.Cache
cache: SqlQueryEnrichment.Cache,
ignoreOnError: Boolean
) extends EnrichmentConf {
def enrichment[F[_]: Monad: CreateSqlQueryEnrichment](blocker: BlockerF[F], shifter: ShiftExecution[F]): F[SqlQueryEnrichment[F]] =
SqlQueryEnrichment[F](this, blocker, shifter)
Expand Down
Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest

import cats.Monad
import cats.data.{EitherT, NonEmptyList, ValidatedNel}
import cats.data.{EitherT, NonEmptyList, Validated, ValidatedNel}
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.iglu.core.circe.implicits._
Expand Down Expand Up @@ -69,9 +69,16 @@ object ApiRequestEnrichment extends ParseableEnrichment {
inputs,
CirceUtils.extract[HttpApi](c, "parameters", "api", "http").toValidatedNel,
CirceUtils.extract[List[Output]](c, "parameters", "outputs").toValidatedNel,
CirceUtils.extract[Cache](c, "parameters", "cache").toValidatedNel
).mapN { (inputs, api, outputs, cache) =>
ApiRequestConf(schemaKey, inputs, api, outputs, cache)
CirceUtils.extract[Cache](c, "parameters", "cache").toValidatedNel,
CirceUtils
.extract[Option[Boolean]](c, "parameters", "ignoreOnError")
.map {
case Some(value) => value
case None => false
}
.toValidatedNel
).mapN { (inputs, api, outputs, cache, ignoreOnError) =>
ApiRequestConf(schemaKey, inputs, api, outputs, cache, ignoreOnError)
}.toEither
}
.toValidated
Expand All @@ -96,7 +103,8 @@ final case class ApiRequestEnrichment[F[_]: Monad: HttpClient: Clock](
inputs: List[Input],
api: HttpApi,
outputs: List[Output],
apiRequestEvaluator: ApiRequestEvaluator[F]
apiRequestEvaluator: ApiRequestEvaluator[F],
ignoreOnError: Boolean
) extends Enrichment {
import ApiRequestEnrichment._

Expand Down Expand Up @@ -137,7 +145,10 @@ final case class ApiRequestEnrichment[F[_]: Monad: HttpClient: Clock](
outputs <- EitherT.fromEither[F](contexts)
} yield outputs

contexts.leftMap(failureDetails).toValidated
contexts.leftMap(failureDetails).toValidated.map {
case Validated.Invalid(_) if ignoreOnError => Validated.Valid(List.empty)
case other => other
}
}

/**
Expand Down Expand Up @@ -221,7 +232,8 @@ object CreateApiRequestEnrichment {
conf.inputs,
conf.api,
conf.outputs,
evaluator
evaluator,
conf.ignoreOnError
)
}
}
Expand Down
Expand Up @@ -64,7 +64,8 @@ object CreateSqlQueryEnrichment {
evaluator,
blocker,
shifter,
getDataSource(conf.db)
getDataSource(conf.db),
conf.ignoreOnError
)
}
}
Expand Down
Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery

import cats.Monad
import cats.data.{EitherT, NonEmptyList, ValidatedNel}
import cats.data.{EitherT, NonEmptyList, Validated, ValidatedNel}
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
Expand All @@ -37,7 +37,6 @@ object SqlQueryEnrichment extends ParseableEnrichment {
"sql_query_enrichment_config",
"jsonschema",
1,
0,
0
)

Expand Down Expand Up @@ -72,8 +71,15 @@ object SqlQueryEnrichment extends ParseableEnrichment {
CirceUtils.extract[Rdbms](c, "parameters", "database").toValidatedNel,
CirceUtils.extract[Query](c, "parameters", "query").toValidatedNel,
output,
CirceUtils.extract[Cache](c, "parameters", "cache").toValidatedNel
).mapN(SqlQueryConf(schemaKey, _, _, _, _, _)).toEither
CirceUtils.extract[Cache](c, "parameters", "cache").toValidatedNel,
CirceUtils
.extract[Option[Boolean]](c, "parameters", "ignoreOnError")
.map {
case Some(value) => value
case None => false
}
.toValidatedNel
).mapN(SqlQueryConf(schemaKey, _, _, _, _, _, _)).toEither
}.toValidated

def apply[F[_]: CreateSqlQueryEnrichment](
Expand Down Expand Up @@ -115,7 +121,8 @@ final case class SqlQueryEnrichment[F[_]: Monad: DbExecutor: ResourceF: Clock](
sqlQueryEvaluator: SqlQueryEvaluator[F],
blocker: BlockerF[F],
shifter: ShiftExecution[F],
dataSource: DataSource
dataSource: DataSource,
ignoreOnError: Boolean
) extends Enrichment {
private val enrichmentInfo =
FailureDetails.EnrichmentInformation(schemaKey, "sql-query").some
Expand Down Expand Up @@ -143,7 +150,10 @@ final case class SqlQueryEnrichment[F[_]: Monad: DbExecutor: ResourceF: Clock](
result <- maybeLookup(placeholders)
} yield result

contexts.leftMap(failureDetails).value.map(_.toValidated)
contexts.leftMap(failureDetails).toValidated.map {
case Validated.Invalid(_) if ignoreOnError => Validated.Valid(List.empty)
case other => other
}
}

private def maybeLookup(placeholders: Input.PlaceholderMap): EitherT[F, NonEmptyList[String], List[SelfDescribingData[Json]]] =
Expand Down
Expand Up @@ -13,19 +13,17 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest

import cats.Id
import cats.data.ValidatedNel
import cats.syntax.either._

import io.circe.Json
import io.circe.literal._
import io.circe.parser._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData}

import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.HttpClient
import com.snowplowanalytics.snowplow.enrich.common.utils.Clock.idClock

import org.specs2.Specification
import org.specs2.matcher.ValidatedMatchers
import org.specs2.mock.Mockito
Expand All @@ -37,6 +35,8 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with
skip incorrect input (both json and pojo) in configuration $e3
extract correct configuration for POST request and perform the request $e4
parse API output with number field successfully $e5
return enrichment failure when API returns an error $e6
return empty list of contexts when API returns an error $e7
"""

val SCHEMA_KEY =
Expand Down Expand Up @@ -79,7 +79,7 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with
}
val output = Output("iglu:com.acme/user/jsonschema/1-0-0", Some(JsonOutput("$.record")))
val cache = Cache(3000, 60)
val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache)
val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache, ignoreOnError = false)

val fakeEnrichedEvent = new EnrichedEvent {
app_id = "some-fancy-app-id"
Expand Down Expand Up @@ -316,7 +316,7 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with
val output =
Output(schema = "iglu:com.acme/user/jsonschema/1-0-0", json = Some(JsonOutput("$.record")))
val cache = Cache(size = 3000, ttl = 60)
val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache)
val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache, ignoreOnError = false)

val fakeEnrichedEvent = new EnrichedEvent {
app_id = "some-fancy-app-id"
Expand Down Expand Up @@ -441,7 +441,7 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with
}
val output = Output("iglu:com.acme/geo/jsonschema/1-0-0", Some(JsonOutput("$")))
val cache = Cache(3000, 60)
val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache)
val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache, ignoreOnError = false)

val expectedDerivation =
SelfDescribingData(
Expand All @@ -453,4 +453,32 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with

enrichedContextResult must beValid(List(expectedDerivation))
}

def e6 =
failingLookup(ignoreOnError = false) must beInvalid

def e7 =
failingLookup(ignoreOnError = true) must beValid(List.empty)

private def failingLookup(ignoreOnError: Boolean): ValidatedNel[FailureDetails.EnrichmentFailure, List[SelfDescribingData[Json]]] = {
val inputs = List()
val api = HttpApi("GET", "unused", 1000, Authentication(None))
implicit val idHttpClient: HttpClient[Id] = new HttpClient[Id] {
override def getResponse(
uri: String,
authUser: Option[String],
authPassword: Option[String],
body: Option[String],
method: String,
connectionTimeout: Option[Long],
readTimeout: Option[Long]
): Id[Either[Throwable, String]] =
Left(new RuntimeException("API failed!!!"))
}
val output = Output("unused", None)
val cache = Cache(3000, 60)
val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache, ignoreOnError)

config.enrichment[Id].lookup(new EnrichedEvent, Nil, Nil, None)
}
}
Expand Up @@ -60,7 +60,8 @@ class HttpApiSpec extends Specification with ValidatedMatchers with Mockito {
Nil,
HttpApi("GET", "http://thishostdoesntexist31337:8123/endpoint", 1000, Authentication(None)),
List(Output("", Some(JsonOutput("")))),
Cache(1, 1)
Cache(1, 1),
ignoreOnError = false
).enrichment[Id]

val event = new EnrichedEvent
Expand Down
Expand Up @@ -252,7 +252,8 @@ class InputSpec extends Specification with ValidatedMatchers {
List(input1, input2),
HttpApi("GET", uriTemplate, 1000, Authentication(None)),
List(Output("iglu:someschema", JsonOutput("$").some)),
Cache(10, 5)
Cache(10, 5),
ignoreOnError = false
).enrichment[Id]
val event = new EnrichedEvent
event.setUser_id("chuwy")
Expand Down
Expand Up @@ -40,10 +40,11 @@ import SqlQueryEnrichmentIntegrationTest._
class SqlQueryEnrichmentIntegrationTest extends Specification {
def is =
skipAllUnless(continuousIntegration) ^ s2"""
Basic case $e1
All-features test $e2
Null test case $e3
Invalid creds $e4
Basic case $e1
All-features test $e2
Null test case $e3
Invalid creds $e4
Invalid creds - ignore error $e5
"""

val SCHEMA_KEY =
Expand Down Expand Up @@ -415,7 +416,22 @@ class SqlQueryEnrichmentIntegrationTest extends Specification {
}

def e4 = {
val configuration = json"""
val result = invalidCreds(ignoreOnError = false)
result must beLeft.like {
case NonEmptyList(one, two :: Nil)
if one.toString.contains("Error while executing the sql lookup") &&
two.toString.contains("FATAL: password authentication failed for user") =>
ok
case left => ko(s"error(s) don't contain the expected error messages: $left")
}
}

def e5 =
invalidCreds(ignoreOnError = true) must beRight(List.empty)

private def invalidCreds(ignoreOnError: Boolean) = {
val configuration =
json"""
{
"vendor": "com.snowplowanalytics.snowplow.enrichments",
"name": "sql_query_enrichment_config",
Expand Down Expand Up @@ -446,21 +462,14 @@ class SqlQueryEnrichmentIntegrationTest extends Specification {
"cache": {
"size": 3000,
"ttl": 60
}
},
"ignoreOnError": $ignoreOnError
}
}
"""

val event = new EnrichedEvent
val config = SqlQueryEnrichment.parse(configuration, SCHEMA_KEY).map(_.enrichment[Id](BlockerF.noop, ShiftExecution.noop))
val context = config.toEither.flatMap(_.lookup(event, Nil, Nil, None).toEither)

context must beLeft.like {
case NonEmptyList(one, two :: Nil)
if one.toString.contains("Error while executing the sql lookup") &&
two.toString.contains("FATAL: password authentication failed for user") =>
ok
case left => ko(s"error(s) don't contain the expected error messages: $left")
}
config.toEither.flatMap(_.lookup(event, Nil, Nil, None).toEither)
}
}
Expand Up @@ -35,7 +35,7 @@ class SqlQueryEnrichmentSpec extends Specification with ValidatedMatchers {
"com.snowplowanalytics.snowplow.enrichments",
"sql_query_enrichment_config",
"jsonschema",
SchemaVer.Full(1, 0, 0)
SchemaVer.Full(1, 0, 1)
)

def e1 = {
Expand Down Expand Up @@ -68,7 +68,7 @@ class SqlQueryEnrichmentSpec extends Specification with ValidatedMatchers {
"SELECT username, email_address, date_of_birth FROM tbl_users WHERE user = ? AND client = ? LIMIT 1"
)
val config =
SqlQueryConf(SCHEMA_KEY, inputs, db, query, Output(output, Output.AtMostOne), cache)
SqlQueryConf(SCHEMA_KEY, inputs, db, query, Output(output, Output.AtMostOne), cache, ignoreOnError = true)

val configuration = parse(
"""
Expand Down Expand Up @@ -123,7 +123,8 @@ class SqlQueryEnrichmentSpec extends Specification with ValidatedMatchers {
"cache": {
"size": 3000,
"ttl": 60
}
},
"ignoreOnError": true
}
}"""
).toOption.get
Expand Down

0 comments on commit f67673e

Please sign in to comment.