From 34a20fe9293640e1e5acb16e2918a6abd0baacce Mon Sep 17 00:00:00 2001 From: "pavel.voropaev" Date: Fri, 18 Nov 2022 12:31:24 +0000 Subject: [PATCH] Add invalidation for the schema-list cache (close #215) --- .../client/resolver/Resolver.scala | 48 ++++++++++++++++- .../resolver/ResolverSpec.scala | 52 ++++++++++++++++++- 2 files changed, 96 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala index 2295bc50..604a78b3 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala @@ -108,10 +108,36 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac * Get list of available schemas for particular vendor and name part * Server supposed to return them in proper order */ + def listSchemasResult(vendor: Vendor, name: Name, model: Model)(implicit + F: Monad[F], + L: RegistryLookup[F], + C: Clock[F] + ): F[Either[ResolutionError, SchemaListLookupResult]] = + listSchemasResult(vendor, name, model, None) + + /** + * Vendor, name, model are extracted from supplied schema key to call on the `listSchemas`. The important difference + * from `listSchemas` is that it would invalidate cache, if returned list did not contain SchemaKey supplied in + * argument. Making it a safer option is latest schema bound is known. + */ + def listSchemasLikeResult(schemaKey: SchemaKey)(implicit + F: Monad[F], + L: RegistryLookup[F], + C: Clock[F] + ): F[Either[ResolutionError, SchemaListLookupResult]] = + listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model, Some(schemaKey)) + + /** + * Get list of available schemas for particular vendor and name part + * Has an extra argument `mustIncludeKey` which is used to invalidate cache if SchemaKey supplied in it is not in the + * list. + * Server supposed to return them in proper order + */ def listSchemasResult( vendor: Vendor, name: Name, - model: Model + model: Model, + mustIncludeKey: Option[SchemaKey] = None )(implicit F: Monad[F], L: RegistryLookup[F], @@ -140,7 +166,11 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac getSchemaListFromCache(vendor, name, model).flatMap { case Some(TimestampedItem(Right(schemaList), timestamp)) => - Monad[F].pure(Right(ResolverResult.Cached((vendor, name, model), schemaList, timestamp))) + if (mustIncludeKey.forall(schemaList.schemas.contains)) + Monad[F].pure(Right(ResolverResult.Cached((vendor, name, model), schemaList, timestamp))) + else + traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty) + .flatMap(handleAfterFetch) case Some(TimestampedItem(Left(failures), _)) => retryCached[F, SchemaList](get, vendor)(failures) .flatMap(handleAfterFetch) @@ -165,6 +195,19 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac ): F[Either[ResolutionError, SchemaList]] = listSchemasResult(vendor, name, model).map(_.map(_.value)) + /** + * Vendor, name, model are extracted from supplied schema key to call on the `listSchemas`. The important difference + * from `listSchemas` is that it would invalidate cache, if returned list did not contain SchemaKey supplied in + * argument. Making it a safer option is latest schema bound is known. + */ + def listSchemasLike(schemaKey: SchemaKey)(implicit + F: Monad[F], + L: RegistryLookup[F], + C: Clock[F] + ): F[Either[ResolutionError, SchemaList]] = + listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model, Some(schemaKey)) + .map(_.map(_.value)) + /** Get list of full self-describing schemas available on Iglu Server for particular vendor/name pair */ def fetchSchemas( vendor: Vendor, @@ -365,6 +408,7 @@ object Resolver { result.value } + def parseConfig( config: Json ): Either[DecodingFailure, ResolverConfig] = { diff --git a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpec.scala index 7143e05f..f1cbfff3 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpec.scala @@ -12,15 +12,17 @@ */ package com.snowplowanalytics.iglu.client.resolver -import java.net.URI +import com.snowplowanalytics.iglu.core.SchemaList + import java.time.Instant +import java.net.URI import scala.collection.immutable.SortedMap import scala.concurrent.duration.DurationInt // Cats import cats.Id import cats.effect.IO -import cats.implicits._ +import cats.syntax.all._ // circe import io.circe.Json @@ -69,6 +71,7 @@ class ResolverSpec extends Specification with CatsEffect { a Resolver should accumulate errors from all repositories $e8 we can construct a Resolver from a valid resolver 1-0-2 configuration JSON $e10 a Resolver should cache SchemaLists with different models separately $e11 + a Resolver should use schemaKey provided in SchemaListLike for result validation $e12 """ import ResolverSpec._ @@ -404,4 +407,49 @@ class ResolverSpec extends Specification with CatsEffect { case _ => ko("Unexpected result for two consequent listSchemas") } } + + def e12 = { + val IgluCentralServer = Registry.Http( + Registry.Config("Iglu Central EU1", 0, List("com.snowplowanalytics")), + Registry + .HttpConnection(URI.create("https://com-iglucentral-eu1-prod.iglu.snplow.net/api"), None) + ) + + val schema100 = SchemaKey( + "com.snowplowanalytics.snowplow", + "link_click", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val schema101 = SchemaKey( + "com.snowplowanalytics.snowplow", + "link_click", + "jsonschema", + SchemaVer.Full(1, 0, 1) + ) + + val resolverRef = Resolver.init[Id](10, None, IgluCentralServer) + val resolver = resolverRef.map(res => + new Resolver( + res.repos, + res.cache.flatMap { c => + c.putSchemaList( + "com.snowplowanalytics.snowplow", + "link_click", + 1, + SchemaList(List(schema100)).asRight + ) + c.some + } + ) + ) + + val resultOne = resolver.listSchemasLike(schema100) + val resultTwo = resolver.listSchemasLike(schema101) + + resultOne must beRight(SchemaList(List(schema100))) + resultTwo.map(s => s.copy(schemas = s.schemas.take(2))) must beRight( + SchemaList(List(schema100, schema101)) + ) + } }