Skip to content

Commit

Permalink
Merge 03b2ede into 70c3e80
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Apr 18, 2023
2 parents 70c3e80 + 03b2ede commit 4394dc8
Show file tree
Hide file tree
Showing 19 changed files with 512 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ final case class Client[F[_], A](resolver: Resolver[F], validator: Validator[A])
for {
schema <- EitherT(resolver.lookupSchema(instance.schema))
schemaValidation = validator.validateSchema(schema)
_ <- EitherT.fromEither[F](schemaValidation).leftMap(_.toClientError)
_ <- EitherT.fromEither[F](schemaValidation).leftMap(_.toClientError(None))
validation = validator.validate(instance.data, schema)
_ <- EitherT.fromEither[F](validation).leftMap(_.toClientError)
_ <- EitherT.fromEither[F](validation).leftMap(_.toClientError(None))
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.resolver.CreateResolverCache
import com.snowplowanalytics.iglu.client.resolver.Resolver.SupersededBy
import com.snowplowanalytics.iglu.client.validator.CirceValidator.WithCaching.{
InitValidatorCache,
SchemaEvaluationCache,
Expand All @@ -43,13 +44,19 @@ final class IgluCirceClient[F[_]] private (
M: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): EitherT[F, ClientError, Unit] =
): EitherT[F, ClientError, SupersededBy] =
for {
resolverResult <- EitherT(resolver.lookupSchemaResult(instance.schema))
resolverResult <- EitherT(
resolver.lookupSchemaResult(instance.schema, resolveSupersedingSchema = true)
)
validation =
CirceValidator.WithCaching.validate(schemaEvaluationCache)(instance.data, resolverResult)
_ <- EitherT(validation).leftMap(_.toClientError)
} yield ()
_ <- EitherT(validation).leftMap(e =>
e.toClientError(resolverResult.value.supersededBy.map(_.asString))
)
// Returning superseding schema info as well since we want to inform caller that sdj is validated
// against superseding schema if it is superseded by another schema.
} yield resolverResult.value.supersededBy
}

object IgluCirceClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.{
}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._
import com.snowplowanalytics.iglu.core._
import io.circe.{Decoder, DecodingFailure, HCursor, Json}
import io.circe.{Decoder, DecodingFailure, FailedCursor, HCursor, Json}

import java.time.Instant
import scala.collection.immutable.SortedMap
Expand All @@ -43,19 +43,69 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
* If any of repositories gives non-non-found error, lookup will retried
*
* @param schemaKey The SchemaKey uniquely identifying the schema in Iglu
* @param resolveSupersedingSchema Specify whether superseding schema version should be taken into account
* @return a [[Resolver.ResolverResult]] boxing the schema Json on success, or a ResolutionError on failure
*/
def lookupSchemaResult(
schemaKey: SchemaKey
schemaKey: SchemaKey,
resolveSupersedingSchema: Boolean = false
)(implicit
F: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): F[Either[ResolutionError, SchemaLookupResult]] = {
val get: Registry => F[Either[RegistryError, Json]] = r => L.lookup(r, schemaKey)
def extractSupersededBy(schema: Json): Either[RegistryError, SupersededBy] =
schema.hcursor.downField("$supersededBy") match {
case _: FailedCursor => None.asRight
case c =>
c.as[SchemaVer.Full]
.bimap(
e =>
RegistryError.ClientFailure(
s"Error while trying to decode superseding version: ${e.toString()}"
),
_.some
)
}

def checkSupersedingVersion(
schemaKey: SchemaKey,
supersededBy: SupersededBy
): Either[RegistryError, Unit] =
supersededBy match {
case None => ().asRight
case Some(superseding) =>
if (Ordering[SchemaVer.Full].gt(superseding, schemaKey.version)) ().asRight
else
RegistryError
.ClientFailure(
s"Superseding version ${superseding.asString} isn't greater than the version of schema ${schemaKey.toPath}"
)
.asLeft
}

val get: Registry => F[Either[RegistryError, SchemaItem]] = {
if (resolveSupersedingSchema)
r =>
(for {
schema <- EitherT(L.lookup(r, schemaKey))
supersededByOpt <- EitherT.fromEither[F](extractSupersededBy(schema))
_ <- EitherT.fromEither[F](checkSupersedingVersion(schemaKey, supersededByOpt))
res <- supersededByOpt match {
case None =>
EitherT.rightT[F, RegistryError](SchemaItem(schema, Option.empty[SchemaVer.Full]))
case Some(supersededBy) =>
val supersedingSchemaKey = schemaKey.copy(version = supersededBy)
EitherT(L.lookup(r, supersedingSchemaKey))
.map(supersedingSchema => SchemaItem(supersedingSchema, supersededBy.some))
}
} yield res).value
else
r => EitherT(L.lookup(r, schemaKey)).map(s => SchemaItem(s, Option.empty)).value
}

def handleAfterFetch(
result: Either[LookupFailureMap, Json]
result: Either[LookupFailureMap, SchemaItem]
): F[Either[ResolutionError, SchemaLookupResult]] =
cache match {
case Some(c) =>
Expand All @@ -81,15 +131,19 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
case Some(TimestampedItem(Left(failures), _)) =>
for {
toBeRetried <- reposForRetry(failures)
result <- traverseRepos[F, Json](
result <- traverseRepos[F, SchemaItem](
get,
prioritize(schemaKey.vendor, toBeRetried),
failures
)
fixed <- handleAfterFetch(result)
} yield fixed
case None =>
traverseRepos[F, Json](get, prioritize(schemaKey.vendor, allRepos.toList), Map.empty)
traverseRepos[F, SchemaItem](
get,
prioritize(schemaKey.vendor, allRepos.toList),
Map.empty
)
.flatMap(handleAfterFetch)
}
}
Expand Down Expand Up @@ -125,7 +179,7 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
L: RegistryLookup[F],
C: Clock[F]
): F[Either[ResolutionError, Json]] =
lookupSchemaResult(schemaKey).map(_.map(_.value))
lookupSchemaResult(schemaKey).map(_.map(_.value.schema))

/**
* Get list of available schemas for particular vendor and name part
Expand Down Expand Up @@ -312,8 +366,18 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
object Resolver {

type SchemaListKey = (Vendor, Name, Model)
type SchemaLookupResult = ResolverResult[SchemaKey, Json]
type SchemaLookupResult = ResolverResult[SchemaKey, SchemaItem]
type SchemaListLookupResult = ResolverResult[SchemaListKey, SchemaList]
type SupersededBy = Option[SchemaVer.Full]

/**
* The result of doing schema lookup
*
* @param schema Schema json
* @param supersededBy Superseding schema version if the schema is superseded by another schema.
* Otherwise, it is None.
*/
case class SchemaItem(schema: Json, supersededBy: SupersededBy)

/** The result of doing a lookup with the resolver, carrying information on whether the cache was used */
sealed trait ResolverResult[+K, +A] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ import cats.{Applicative, Monad}
import cats.data.OptionT
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.iglu.core.SchemaList
// circe
import io.circe.Json

import scala.concurrent.duration.{DurationInt, FiniteDuration}

// LruMap
import com.snowplowanalytics.lrumap.LruMap

// Iglu core
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaList}

import Resolver.SchemaItem

/**
* Resolver cache and associated logic to (in)validate entities,
Expand Down Expand Up @@ -96,7 +95,7 @@ class ResolverCache[F[_]] private (
)(implicit
F: Monad[F],
C: Clock[F]
): F[Either[LookupFailureMap, TimestampedItem[Json]]] =
): F[Either[LookupFailureMap, TimestampedItem[SchemaItem]]] =
putItemResult(schemas, schemaKey, freshResult)

/** Lookup a `SchemaList`, no TTL is available */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@
*/
package com.snowplowanalytics.iglu.client

// circe
import io.circe.Json

import scala.concurrent.duration.FiniteDuration

// Iglu Core
import com.snowplowanalytics.iglu.core.SchemaList

// This project
import resolver.registries.Registry
import resolver.Resolver.SchemaItem

package object resolver {

Expand All @@ -46,7 +44,7 @@ package object resolver {
* Json in case of success or Map of all currently failed repositories
* in case of failure
*/
type SchemaLookup = Either[LookupFailureMap, Json]
type SchemaLookup = Either[LookupFailureMap, SchemaItem]

/**
* Validated schema list lookup result containing, cache result which is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.snowplowanalytics.iglu.client.resolver.StorageTime
import com.snowplowanalytics.iglu.core.circe.MetaSchemas
// Scala
import com.fasterxml.jackson.databind.JsonNode
import com.snowplowanalytics.iglu.client.resolver.Resolver.SchemaLookupResult
import com.snowplowanalytics.iglu.client.resolver.Resolver.{SchemaItem, SchemaLookupResult}
import scala.jdk.CollectionConverters._

// Cats
Expand Down Expand Up @@ -155,7 +155,7 @@ object CirceValidator extends Validator[Json] {
evaluationCache: SchemaEvaluationCache[F]
)(result: SchemaLookupResult): F[Either[ValidatorError.InvalidSchema, JsonSchema]] = {
result match {
case ResolverResult.Cached(key, schema, timestamp) =>
case ResolverResult.Cached(key, SchemaItem(schema, _), timestamp) =>
evaluationCache.get((key, timestamp)).flatMap {
case Some(alreadyEvaluatedSchema) =>
alreadyEvaluatedSchema.pure[F]
Expand All @@ -164,7 +164,7 @@ object CirceValidator extends Validator[Json] {
.pure[F]
.flatTap(result => evaluationCache.put((key, timestamp), result))
}
case ResolverResult.NotCached(schema) =>
case ResolverResult.NotCached(SchemaItem(schema, _)) =>
provideNewJsonSchema(schema).pure[F]
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"$supersededBy": "1-0",
"description": "Test schema",
"self": {
"vendor": "com.snowplowanalytics.iglu-test",
"name": "invalid-superseded-schema",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
"id": {
"type": "string"
}
},

"required": ["id"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"$supersededBy": "1-0-3",
"description": "Test schema",
"self": {
"vendor": "com.snowplowanalytics.iglu-test",
"name": "invalid-superseded-schema",
"format": "jsonschema",
"version": "1-0-1"
},

"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
}
},

"required": ["id"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"$supersededBy": "1-0-1",
"description": "Test schema",
"self": {
"vendor": "com.snowplowanalytics.iglu-test",
"name": "invalid-superseded-schema",
"format": "jsonschema",
"version": "1-0-2"
},

"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"price": {
"type": "number"
}
},

"required": ["id"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"$supersededBy": "1-0-2",
"description": "Test schema",
"self": {
"vendor": "com.snowplowanalytics.iglu-test",
"name": "superseded-schema",
"format": "jsonschema",
"version": "1-0-0"
},

"type": "object",
"properties": {
"id": {
"type": "string"
}
},

"required": ["id"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Test schema",
"self": {
"vendor": "com.snowplowanalytics.iglu-test",
"name": "superseded-schema",
"format": "jsonschema",
"version": "1-0-1"
},

"type": "object",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
}
},

"required": ["id"],
"additionalProperties": false
}
Loading

0 comments on commit 4394dc8

Please sign in to comment.