Skip to content

Commit

Permalink
Take superseding schema into account during validation (close #231)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes authored and istreeter committed May 25, 2023
1 parent dd6932d commit d94f20b
Show file tree
Hide file tree
Showing 19 changed files with 518 additions and 61 deletions.
Expand Up @@ -39,9 +39,9 @@ final case class Client[F[_], A](resolver: Resolver[F], validator: Validator[A])
for {
schema <- EitherT(resolver.lookupSchema(instance.schema))
schemaValidation = validator.validateSchema(schema)
_ <- EitherT.fromEither[F](schemaValidation).leftMap(_.toClientError)
_ <- EitherT.fromEither[F](schemaValidation).leftMap(_.toClientError(None))
validation = validator.validate(instance.data, schema)
_ <- EitherT.fromEither[F](validation).leftMap(_.toClientError)
_ <- EitherT.fromEither[F](validation).leftMap(_.toClientError(None))
} yield ()
}

Expand Down
Expand Up @@ -18,6 +18,7 @@ import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.resolver.CreateResolverCache
import com.snowplowanalytics.iglu.client.resolver.Resolver.SupersededBy
import com.snowplowanalytics.iglu.client.validator.CirceValidator.WithCaching.{
InitValidatorCache,
SchemaEvaluationCache,
Expand All @@ -43,13 +44,19 @@ final class IgluCirceClient[F[_]] private (
M: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): EitherT[F, ClientError, Unit] =
): EitherT[F, ClientError, SupersededBy] =
for {
resolverResult <- EitherT(resolver.lookupSchemaResult(instance.schema))
resolverResult <- EitherT(
resolver.lookupSchemaResult(instance.schema, resolveSupersedingSchema = true)
)
validation =
CirceValidator.WithCaching.validate(schemaEvaluationCache)(instance.data, resolverResult)
_ <- EitherT(validation).leftMap(_.toClientError)
} yield ()
_ <- EitherT(validation).leftMap(e =>
e.toClientError(resolverResult.value.supersededBy.map(_.asString))
)
// Returning superseding schema info as well since we want to inform caller that sdj is validated
// against superseding schema if it is superseded by another schema.
} yield resolverResult.value.supersededBy
}

object IgluCirceClient {
Expand Down
Expand Up @@ -26,7 +26,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.{
}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._
import com.snowplowanalytics.iglu.core._
import io.circe.{Decoder, DecodingFailure, HCursor, Json}
import io.circe.{Decoder, DecodingFailure, FailedCursor, HCursor, Json}

import java.time.Instant
import scala.collection.immutable.SortedMap
Expand All @@ -43,19 +43,69 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
* If any of repositories gives non-non-found error, lookup will retried
*
* @param schemaKey The SchemaKey uniquely identifying the schema in Iglu
* @param resolveSupersedingSchema Specify whether superseding schema version should be taken into account
* @return a [[Resolver.ResolverResult]] boxing the schema Json on success, or a ResolutionError on failure
*/
def lookupSchemaResult(
schemaKey: SchemaKey
schemaKey: SchemaKey,
resolveSupersedingSchema: Boolean = false
)(implicit
F: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): F[Either[ResolutionError, SchemaLookupResult]] = {
val get: Registry => F[Either[RegistryError, Json]] = r => L.lookup(r, schemaKey)
def extractSupersededBy(schema: Json): Either[RegistryError, SupersededBy] =
schema.hcursor.downField("$supersededBy") match {
case _: FailedCursor => None.asRight
case c =>
c.as[SchemaVer.Full]
.bimap(
e =>
RegistryError.ClientFailure(
s"Error while trying to decode superseding version: ${e.toString()}"
),
_.some
)
}

def checkSupersedingVersion(
schemaKey: SchemaKey,
supersededBy: SupersededBy
): Either[RegistryError, Unit] =
supersededBy match {
case None => ().asRight
case Some(superseding) =>
if (Ordering[SchemaVer.Full].gt(superseding, schemaKey.version)) ().asRight
else
RegistryError
.ClientFailure(
s"Superseding version ${superseding.asString} isn't greater than the version of schema ${schemaKey.toPath}"
)
.asLeft
}

val get: Registry => F[Either[RegistryError, SchemaItem]] = {
if (resolveSupersedingSchema)
r =>
(for {
schema <- EitherT(L.lookup(r, schemaKey))
supersededByOpt <- EitherT.fromEither[F](extractSupersededBy(schema))
_ <- EitherT.fromEither[F](checkSupersedingVersion(schemaKey, supersededByOpt))
res <- supersededByOpt match {
case None =>
EitherT.rightT[F, RegistryError](SchemaItem(schema, Option.empty[SchemaVer.Full]))
case Some(supersededBy) =>
val supersedingSchemaKey = schemaKey.copy(version = supersededBy)
EitherT(L.lookup(r, supersedingSchemaKey))
.map(supersedingSchema => SchemaItem(supersedingSchema, supersededBy.some))
}
} yield res).value
else
r => EitherT(L.lookup(r, schemaKey)).map(s => SchemaItem(s, Option.empty)).value
}

def handleAfterFetch(
result: Either[LookupFailureMap, Json]
result: Either[LookupFailureMap, SchemaItem]
): F[Either[ResolutionError, SchemaLookupResult]] =
cache match {
case Some(c) =>
Expand All @@ -81,15 +131,19 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
case Some(TimestampedItem(Left(failures), _)) =>
for {
toBeRetried <- reposForRetry(failures)
result <- traverseRepos[F, Json](
result <- traverseRepos[F, SchemaItem](
get,
prioritize(schemaKey.vendor, toBeRetried),
failures
)
fixed <- handleAfterFetch(result)
} yield fixed
case None =>
traverseRepos[F, Json](get, prioritize(schemaKey.vendor, allRepos.toList), Map.empty)
traverseRepos[F, SchemaItem](
get,
prioritize(schemaKey.vendor, allRepos.toList),
Map.empty
)
.flatMap(handleAfterFetch)
}
}
Expand Down Expand Up @@ -125,7 +179,7 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
L: RegistryLookup[F],
C: Clock[F]
): F[Either[ResolutionError, Json]] =
lookupSchemaResult(schemaKey).map(_.map(_.value))
lookupSchemaResult(schemaKey).map(_.map(_.value.schema))

/**
* Get list of available schemas for particular vendor and name part
Expand Down Expand Up @@ -312,8 +366,18 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
object Resolver {

type SchemaListKey = (Vendor, Name, Model)
type SchemaLookupResult = ResolverResult[SchemaKey, Json]
type SchemaLookupResult = ResolverResult[SchemaKey, SchemaItem]
type SchemaListLookupResult = ResolverResult[SchemaListKey, SchemaList]
type SupersededBy = Option[SchemaVer.Full]

/**
* The result of doing schema lookup
*
* @param schema Schema json
* @param supersededBy Superseding schema version if the schema is superseded by another schema.
* Otherwise, it is None.
*/
case class SchemaItem(schema: Json, supersededBy: SupersededBy)

/** The result of doing a lookup with the resolver, carrying information on whether the cache was used */
sealed trait ResolverResult[+K, +A] {
Expand Down
Expand Up @@ -17,17 +17,16 @@ import cats.{Applicative, Monad}
import cats.data.OptionT
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.iglu.core.SchemaList
// circe
import io.circe.Json

import scala.concurrent.duration.{DurationInt, FiniteDuration}

// LruMap
import com.snowplowanalytics.lrumap.LruMap

// Iglu core
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaList}

import Resolver.SchemaItem

/**
* Resolver cache and associated logic to (in)validate entities,
Expand Down Expand Up @@ -96,7 +95,7 @@ class ResolverCache[F[_]] private (
)(implicit
F: Monad[F],
C: Clock[F]
): F[Either[LookupFailureMap, TimestampedItem[Json]]] =
): F[Either[LookupFailureMap, TimestampedItem[SchemaItem]]] =
putItemResult(schemas, schemaKey, freshResult)

/** Lookup a `SchemaList`, no TTL is available */
Expand Down
Expand Up @@ -12,16 +12,14 @@
*/
package com.snowplowanalytics.iglu.client

// circe
import io.circe.Json

import scala.concurrent.duration.FiniteDuration

// Iglu Core
import com.snowplowanalytics.iglu.core.SchemaList

// This project
import resolver.registries.Registry
import resolver.Resolver.SchemaItem

package object resolver {

Expand All @@ -46,7 +44,7 @@ package object resolver {
* Json in case of success or Map of all currently failed repositories
* in case of failure
*/
type SchemaLookup = Either[LookupFailureMap, Json]
type SchemaLookup = Either[LookupFailureMap, SchemaItem]

/**
* Validated schema list lookup result containing, cache result which is
Expand Down
Expand Up @@ -20,7 +20,7 @@ import com.snowplowanalytics.iglu.core.circe.MetaSchemas
// Scala
import com.fasterxml.jackson.databind.JsonNode
import com.networknt.schema.uri.URIFetcher
import com.snowplowanalytics.iglu.client.resolver.Resolver.SchemaLookupResult
import com.snowplowanalytics.iglu.client.resolver.Resolver.{SchemaItem, SchemaLookupResult}
import java.io.{ByteArrayInputStream, InputStream}
import java.net.URI
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -168,7 +168,7 @@ object CirceValidator extends Validator[Json] {
evaluationCache: SchemaEvaluationCache[F]
)(result: SchemaLookupResult): F[Either[ValidatorError.InvalidSchema, JsonSchema]] = {
result match {
case ResolverResult.Cached(key, schema, timestamp) =>
case ResolverResult.Cached(key, SchemaItem(schema, _), timestamp) =>
evaluationCache.get((key, timestamp)).flatMap {
case Some(alreadyEvaluatedSchema) =>
alreadyEvaluatedSchema.pure[F]
Expand All @@ -177,7 +177,7 @@ object CirceValidator extends Validator[Json] {
.pure[F]
.flatTap(result => evaluationCache.put((key, timestamp), result))
}
case ResolverResult.NotCached(schema) =>
case ResolverResult.NotCached(SchemaItem(schema, _)) =>
provideNewJsonSchema(schema).pure[F]
}
}
Expand Down
@@ -0,0 +1,20 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"$supersededBy": "1-0",
"description": "Test schema",
"self": {
"vendor": "com.snowplowanalytics.iglu-test",
"name": "invalid-superseded-schema",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
"id": {
"type": "string"
}
},

"required": ["id"],
"additionalProperties": false
}
@@ -0,0 +1,24 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"$supersededBy": "1-0-3",
"description": "Test schema",
"self": {
"vendor": "com.snowplowanalytics.iglu-test",
"name": "invalid-superseded-schema",
"format": "jsonschema",
"version": "1-0-1"
},

"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
}
},

"required": ["id"],
"additionalProperties": false
}
@@ -0,0 +1,27 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"$supersededBy": "1-0-1",
"description": "Test schema",
"self": {
"vendor": "com.snowplowanalytics.iglu-test",
"name": "invalid-superseded-schema",
"format": "jsonschema",
"version": "1-0-2"
},

"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"price": {
"type": "number"
}
},

"required": ["id"],
"additionalProperties": false
}
@@ -0,0 +1,21 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"$supersededBy": "1-0-2",
"description": "Test schema",
"self": {
"vendor": "com.snowplowanalytics.iglu-test",
"name": "superseded-schema",
"format": "jsonschema",
"version": "1-0-0"
},

"type": "object",
"properties": {
"id": {
"type": "string"
}
},

"required": ["id"],
"additionalProperties": false
}
@@ -0,0 +1,23 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Test schema",
"self": {
"vendor": "com.snowplowanalytics.iglu-test",
"name": "superseded-schema",
"format": "jsonschema",
"version": "1-0-1"
},

"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
}
},

"required": ["id"],
"additionalProperties": false
}

0 comments on commit d94f20b

Please sign in to comment.