Skip to content

Commit

Permalink
Make superseding schema optional with lookupSchemaResult
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Apr 18, 2023
1 parent 981a33f commit c4d56a1
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ final class IgluCirceClient[F[_]] private (
C: Clock[F]
): EitherT[F, ClientError, SupersededBy] =
for {
resolverResult <- EitherT(resolver.lookupSchemaResult(instance.schema))
resolverResult <- EitherT(
resolver.lookupSchemaResult(instance.schema, resolveSupersedingSchema = true)
)
validation =
CirceValidator.WithCaching.validate(schemaEvaluationCache)(instance.data, resolverResult)
_ <- EitherT(validation).leftMap(e =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
* If any of repositories gives non-non-found error, lookup will retried
*
* @param schemaKey The SchemaKey uniquely identifying the schema in Iglu
* @param resolveSupersedingSchema Specify whether superseding schema version should be taken into account
* @return a [[Resolver.ResolverResult]] boxing the schema Json on success, or a ResolutionError on failure
*/
def lookupSchemaResult(
schemaKey: SchemaKey
schemaKey: SchemaKey,
resolveSupersedingSchema: Boolean = false
)(implicit
F: Monad[F],
L: RegistryLookup[F],
Expand Down Expand Up @@ -82,20 +84,25 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
.asLeft
}

val get: Registry => F[Either[RegistryError, SchemaItem]] = r =>
(for {
schema <- EitherT(L.lookup(r, schemaKey))
supersededByOpt <- EitherT.fromEither[F](extractSupersededBy(schema))
_ <- EitherT.fromEither[F](checkSupersedingVersion(schemaKey, supersededByOpt))
res <- supersededByOpt match {
case None =>
EitherT.rightT[F, RegistryError](SchemaItem(schema, Option.empty[SchemaVer.Full]))
case Some(supersededBy) =>
val supersedingSchemaKey = schemaKey.copy(version = supersededBy)
EitherT(L.lookup(r, supersedingSchemaKey))
.map(supersedingSchema => SchemaItem(supersedingSchema, supersededBy.some))
}
} yield res).value
val get: Registry => F[Either[RegistryError, SchemaItem]] = {
if (resolveSupersedingSchema)
r =>
(for {
schema <- EitherT(L.lookup(r, schemaKey))
supersededByOpt <- EitherT.fromEither[F](extractSupersededBy(schema))
_ <- EitherT.fromEither[F](checkSupersedingVersion(schemaKey, supersededByOpt))
res <- supersededByOpt match {
case None =>
EitherT.rightT[F, RegistryError](SchemaItem(schema, Option.empty[SchemaVer.Full]))
case Some(supersededBy) =>
val supersedingSchemaKey = schemaKey.copy(version = supersededBy)
EitherT(L.lookup(r, supersedingSchemaKey))
.map(supersedingSchema => SchemaItem(supersedingSchema, supersededBy.some))
}
} yield res).value
else
r => EitherT(L.lookup(r, schemaKey)).map(s => SchemaItem(s, Option.empty)).value
}

def handleAfterFetch(
result: Either[LookupFailureMap, SchemaItem]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE
a Resolver should return cached schema when ttl not exceeded $e13
a Resolver should return cached schema when ttl exceeded $e14
a Resolver should not spam the registry with similar requests $e15
a Resolver should return superseded schema $e16
a Resolver should return superseding schema if resolveSupersedingSchema is true $e16
a Resolver shouldn't return superseding schema if resolveSupersedingSchema is false $e17
"""

import ResolverSpec._
Expand Down Expand Up @@ -583,11 +584,42 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE

Resolver
.init[IO](cacheSize = 0, cacheTtl = None, refs = EmbeddedTest)
.flatMap(resolver => resolver.lookupSchemaResult(schemaKey))
.flatMap(resolver => resolver.lookupSchemaResult(schemaKey, resolveSupersedingSchema = true))
.map { result =>
result must beRight(
ResolverResult.NotCached(SchemaItem(expectedSchema, Some(SchemaVer.Full(1, 0, 2))))
)
}
}

def e17 = {

val expectedSchema: Json =
parse(
scala.io.Source
.fromInputStream(
getClass.getResourceAsStream(
"/iglu-test-embedded/schemas/com.snowplowanalytics.iglu-test/superseded-schema/jsonschema/1-0-0"
)
)
.mkString
)
.fold(e => throw new RuntimeException(s"Cannot parse superseded schema, $e"), identity)

val schemaKey = SchemaKey(
"com.snowplowanalytics.iglu-test",
"superseded-schema",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)

Resolver
.init[IO](cacheSize = 0, cacheTtl = None, refs = EmbeddedTest)
.flatMap(resolver => resolver.lookupSchemaResult(schemaKey))
.map { result =>
result must beRight(
ResolverResult.NotCached(SchemaItem(expectedSchema, None))
)
}
}
}

0 comments on commit c4d56a1

Please sign in to comment.