From 5777ec305b9947f9b5c00f2cf5823840f20e72f8 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 22 Feb 2023 07:11:21 +0000 Subject: [PATCH] Resolver caches suffers from races and http server overload during the cold start (close #227) --- .../client/Client.scala | 4 +- .../client/IgluCirceClient.scala | 4 +- .../client/resolver/CreateResolverCache.scala | 69 +++++++++++ .../client/resolver/Resolver.scala | 108 +++++++++++++----- .../client/resolver/ResolverCache.scala | 23 +++- .../client/resolver/ResolverMutex.scala | 77 +++++++++++++ .../client/resolver/package.scala | 8 +- .../client/resolver/registries/Registry.scala | 2 - .../resolver/ResolverCacheSpec.scala | 5 +- .../resolver/ResolverResultSpec.scala | 33 +++--- .../resolver/ResolverSpec.scala | 23 ++-- .../resolver/ResolverSpecHelpers.scala | 24 ++-- project/BuildSettings.scala | 6 +- 13 files changed, 295 insertions(+), 91 deletions(-) create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/CreateResolverCache.scala create mode 100644 modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverMutex.scala diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/Client.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/Client.scala index 5ba0f831..6587e712 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/Client.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/Client.scala @@ -17,7 +17,7 @@ import cats.data.EitherT import cats.effect.{Clock, IO} import io.circe.{DecodingFailure, Json} import com.snowplowanalytics.iglu.core.SelfDescribingData -import resolver.{InitListCache, InitSchemaCache} +import resolver.CreateResolverCache import resolver.registries.{Registry, RegistryLookup} /** @@ -51,7 +51,7 @@ object Client { val IgluCentral: Client[IO, Json] = Client[IO, Json](Resolver(List(Registry.IgluCentral), None), CirceValidator) - def parseDefault[F[_]: Monad: InitSchemaCache: InitListCache]( + def parseDefault[F[_]: Monad: CreateResolverCache]( json: Json ): EitherT[F, DecodingFailure, Client[F, Json]] = EitherT(Resolver.parse[F](json)).map { resolver => diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/IgluCirceClient.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/IgluCirceClient.scala index 73ecd5bf..23c77717 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/IgluCirceClient.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/IgluCirceClient.scala @@ -17,7 +17,7 @@ import cats.data.EitherT import cats.effect.Clock import cats.implicits._ import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.client.resolver.{InitListCache, InitSchemaCache} +import com.snowplowanalytics.iglu.client.resolver.CreateResolverCache import com.snowplowanalytics.iglu.client.validator.CirceValidator.WithCaching.{ InitValidatorCache, SchemaEvaluationCache, @@ -54,7 +54,7 @@ final class IgluCirceClient[F[_]] private ( object IgluCirceClient { - def parseDefault[F[_]: Monad: InitSchemaCache: InitListCache: InitValidatorCache]( + def parseDefault[F[_]: Monad: CreateResolverCache: InitValidatorCache]( json: Json ): EitherT[F, DecodingFailure, IgluCirceClient[F]] = for { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/CreateResolverCache.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/CreateResolverCache.scala new file mode 100644 index 00000000..caf7543b --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/CreateResolverCache.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2018-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.iglu.client.resolver + +import cats.Id +import cats.effect.Async +import cats.effect.std.Mutex +import cats.implicits._ +import com.snowplowanalytics.lrumap.{CreateLruMap, LruMap} +import com.snowplowanalytics.iglu.core.SchemaKey + +trait CreateResolverCache[F[_]] { + + def createSchemaCache(size: Int): F[LruMap[F, SchemaKey, SchemaCacheEntry]] + + def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]] + + def createMutex[K]: F[ResolverMutex[F, K]] + +} + +object CreateResolverCache { + + def apply[F[_]](implicit instance: CreateResolverCache[F]): CreateResolverCache[F] = instance + + private trait SimpleCreateResolverCache[F[_]] extends CreateResolverCache[F] { + + def createLruMap[K, V](size: Int): F[LruMap[F, K, V]] + + override def createSchemaCache(size: Int): F[LruMap[F, SchemaKey, SchemaCacheEntry]] = + createLruMap(size) + + override def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]] = + createLruMap(size) + + } + + implicit def idCreateResolverCache: CreateResolverCache[Id] = + new SimpleCreateResolverCache[Id] { + def createLruMap[K, V](size: Int): LruMap[Id, K, V] = + CreateLruMap[Id, K, V].create(size) + + def createMutex[K]: ResolverMutex[Id, K] = + ResolverMutex.idResolverMutex(new Object, createLruMap[K, Object](1000)) + } + + implicit def asyncCreateResolverCache[F[_]: Async]: CreateResolverCache[F] = + new SimpleCreateResolverCache[F] { + + def createLruMap[K, V](size: Int): F[LruMap[F, K, V]] = + CreateLruMap[F, K, V].create(size) + + def createMutex[K]: F[ResolverMutex[F, K]] = + for { + mutex <- Mutex[F] + lrumap <- createLruMap[K, Mutex[F]](1000) + } yield ResolverMutex.asyncResolverMutex(mutex, lrumap) + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala index 6a65d677..adeb1512 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/Resolver.scala @@ -18,7 +18,6 @@ import cats.effect.Clock import cats.implicits._ import cats.{Applicative, Id, Monad} import com.snowplowanalytics.iglu.client.ClientError.ResolutionError -import com.snowplowanalytics.iglu.client.resolver.registries.Registry.Get import com.snowplowanalytics.iglu.client.resolver.ResolverCache.TimestampedItem import com.snowplowanalytics.iglu.client.resolver.registries.{ Registry, @@ -74,15 +73,39 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac .pure[F] } + def lockAndLookup: F[Either[ResolutionError, SchemaLookupResult]] = + withLockOnSchemaKey(schemaKey) { + getSchemaFromCache(schemaKey).flatMap { + case Some(TimestampedItem(Right(schema), timestamp)) => + Monad[F].pure(Right(ResolverResult.Cached(schemaKey, schema, timestamp))) + case Some(TimestampedItem(Left(failures), _)) => + for { + toBeRetried <- reposForRetry(failures) + result <- traverseRepos[F, Json]( + get, + prioritize(schemaKey.vendor, toBeRetried), + failures + ) + fixed <- handleAfterFetch(result) + } yield fixed + case None => + traverseRepos[F, Json](get, prioritize(schemaKey.vendor, allRepos.toList), Map.empty) + .flatMap(handleAfterFetch) + } + } + getSchemaFromCache(schemaKey).flatMap { case Some(TimestampedItem(Right(schema), timestamp)) => Monad[F].pure(Right(ResolverResult.Cached(schemaKey, schema, timestamp))) case Some(TimestampedItem(Left(failures), _)) => - retryCached[F, Json](get, schemaKey.vendor)(failures) - .flatMap(handleAfterFetch) + reposForRetry(failures).flatMap { + case Nil => + Monad[F].pure(Left(resolutionError(failures))) + case _ => + lockAndLookup + } case None => - traverseRepos[F, Json](get, prioritize(schemaKey.vendor, allRepos.toList), Map.empty) - .flatMap(handleAfterFetch) + lockAndLookup } } @@ -164,19 +187,44 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac .pure[F] } + def lockAndLookup: F[Either[ResolutionError, SchemaListLookupResult]] = + withLockOnSchemaModel(vendor, name, model) { + getSchemaListFromCache(vendor, name, model).flatMap { + case Some(TimestampedItem(Right(schemaList), timestamp)) => + if (mustIncludeKey.forall(schemaList.schemas.contains)) + Monad[F].pure( + Right(ResolverResult.Cached((vendor, name, model), schemaList, timestamp)) + ) + else + traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty) + .flatMap(handleAfterFetch) + case Some(TimestampedItem(Left(failures), _)) => + for { + toBeRetried <- reposForRetry(failures) + result <- traverseRepos[F, SchemaList](get, prioritize(vendor, toBeRetried), failures) + fixed <- handleAfterFetch(result) + } yield fixed + case None => + traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty) + .flatMap(handleAfterFetch) + } + } + getSchemaListFromCache(vendor, name, model).flatMap { case Some(TimestampedItem(Right(schemaList), timestamp)) => if (mustIncludeKey.forall(schemaList.schemas.contains)) Monad[F].pure(Right(ResolverResult.Cached((vendor, name, model), schemaList, timestamp))) else - traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty) - .flatMap(handleAfterFetch) + lockAndLookup case Some(TimestampedItem(Left(failures), _)) => - retryCached[F, SchemaList](get, vendor)(failures) - .flatMap(handleAfterFetch) + reposForRetry(failures).flatMap { + case Nil => + Monad[F].pure(Left(resolutionError(failures))) + case _ => + lockAndLookup + } case None => - traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty) - .flatMap(handleAfterFetch) + lockAndLookup } } @@ -233,6 +281,18 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac case None => Monad[F].pure(None) } + private def withLockOnSchemaKey[A](schemaKey: SchemaKey)(f: => F[A]): F[A] = + cache match { + case Some(c) => c.withLockOnSchemaKey(schemaKey)(f) + case None => f + } + + private def withLockOnSchemaModel[A](vendor: Vendor, name: Name, model: Model)(f: => F[A]): F[A] = + cache match { + case Some(c) => c.withLockOnSchemaModel(vendor, name, model)(f) + case None => f + } + private def getSchemaListFromCache( vendor: Vendor, name: Name, @@ -278,17 +338,12 @@ object Resolver { /** The result of a lookup when the resolver is not configured to use a cache */ case class NotCached[A](value: A) extends ResolverResult[Nothing, A] } - def retryCached[F[_]: Clock: Monad: RegistryLookup, A]( - get: Get[F, A], - vendor: Vendor - )( - cachedErrors: LookupFailureMap - ): F[Either[LookupFailureMap, A]] = - for { - now <- Clock[F].realTimeInstant - reposForRetry = getReposForRetry(cachedErrors, now) - result <- traverseRepos[F, A](get, prioritize(vendor, reposForRetry), cachedErrors) - } yield result + + private def reposForRetry[F[_]: Clock: Monad](cachedErrors: LookupFailureMap): F[List[Registry]] = + Clock[F].realTimeInstant + .map { now => + getReposForRetry(cachedErrors, now) + } /** * Tail-recursive function to find our schema in one of our repositories @@ -353,7 +408,7 @@ object Resolver { * @param refs Any RepositoryRef to add to this resolver * @return a configured Resolver instance */ - def init[F[_]: Monad: InitSchemaCache: InitListCache]( + def init[F[_]: Monad: CreateResolverCache]( cacheSize: Int, cacheTtl: Option[TTL], refs: Registry* @@ -370,7 +425,7 @@ object Resolver { private[client] val EmbeddedSchemaCount = 4 /** A Resolver which only looks at our embedded repo */ - def bootstrap[F[_]: Monad: InitSchemaCache: InitListCache]: F[Resolver[F]] = + def bootstrap[F[_]: Monad: CreateResolverCache]: F[Resolver[F]] = Resolver.init[F](EmbeddedSchemaCount, None, Registry.EmbeddedRegistry) final case class ResolverConfig( @@ -398,7 +453,7 @@ object Resolver { * for this resolver * @return a configured Resolver instance */ - def parse[F[_]: Monad: InitSchemaCache: InitListCache]( + def parse[F[_]: Monad: CreateResolverCache]( config: Json ): F[Either[DecodingFailure, Resolver[F]]] = { val result: EitherT[F, DecodingFailure, Resolver[F]] = for { @@ -419,7 +474,7 @@ object Resolver { } yield config } - def fromConfig[F[_]: Monad: InitSchemaCache: InitListCache]( + def fromConfig[F[_]: Monad: CreateResolverCache]( config: ResolverConfig ): EitherT[F, DecodingFailure, Resolver[F]] = { for { @@ -498,6 +553,7 @@ object Resolver { private val MinBackoff = 500 // ms // Count how many milliseconds the Resolver needs to wait before retrying + // TODO: This should not exceed TTL private def backoff(retryCount: Int): Long = retryCount match { case c if c > 20 => 1200000L + (retryCount * 100L) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverCache.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverCache.scala index faa4c0a9..ef8e555d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverCache.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverCache.scala @@ -24,7 +24,7 @@ import io.circe.Json import scala.concurrent.duration.{DurationInt, FiniteDuration} // LruMap -import com.snowplowanalytics.lrumap.{CreateLruMap, LruMap} +import com.snowplowanalytics.lrumap.LruMap // Iglu core import com.snowplowanalytics.iglu.core.SchemaKey @@ -39,6 +39,8 @@ import com.snowplowanalytics.iglu.core.SchemaKey class ResolverCache[F[_]] private ( schemas: LruMap[F, SchemaKey, SchemaCacheEntry], schemaLists: LruMap[F, ListCacheKey, ListCacheEntry], + schemaMutex: ResolverMutex[F, SchemaKey], + schemaListMutex: ResolverMutex[F, ListCacheKey], val ttl: Option[TTL] ) { @@ -133,6 +135,14 @@ class ResolverCache[F[_]] private ( freshResult: ListLookup )(implicit F: Monad[F], C: Clock[F]): F[Either[LookupFailureMap, TimestampedItem[SchemaList]]] = putItemResult(schemaLists, (vendor, name, model), freshResult) + + private[resolver] def withLockOnSchemaKey[A](key: SchemaKey)(f: => F[A]): F[A] = + schemaMutex.withLockOn(key)(f) + + private[resolver] def withLockOnSchemaModel[A](vendor: Vendor, name: Name, model: Model)( + f: => F[A] + ): F[A] = + schemaListMutex.withLockOn((vendor, name, model))(f) } object ResolverCache { @@ -144,14 +154,15 @@ object ResolverCache { size: Int, ttl: Option[TTL] )(implicit - C: CreateLruMap[F, SchemaKey, SchemaCacheEntry], - L: CreateLruMap[F, ListCacheKey, ListCacheEntry] + C: CreateResolverCache[F] ): F[Option[ResolverCache[F]]] = { if (shouldCreateResolverCache(size, ttl)) { for { - schemas <- CreateLruMap[F, SchemaKey, SchemaCacheEntry].create(size) - schemaLists <- CreateLruMap[F, ListCacheKey, ListCacheEntry].create(size) - } yield new ResolverCache[F](schemas, schemaLists, ttl).some + schemas <- C.createSchemaCache(size) + schemaLists <- C.createSchemaListCache(size) + schemaMutex <- C.createMutex[SchemaKey] + listMutex <- C.createMutex[ListCacheKey] + } yield new ResolverCache[F](schemas, schemaLists, schemaMutex, listMutex, ttl).some } else Applicative[F].pure(none) } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverMutex.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverMutex.scala new file mode 100644 index 00000000..252add85 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/ResolverMutex.scala @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2018-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.iglu.client.resolver + +import cats.Id +import cats.effect.std.Mutex +import cats.effect.Async +import cats.implicits._ +import com.snowplowanalytics.lrumap.LruMap + +trait ResolverMutex[F[_], K] { + + def withLockOn[A](key: K)(f: => F[A]): F[A] + +} + +object ResolverMutex { + + private[resolver] def idResolverMutex[K]( + topMutex: Object, + keyedMutex: LruMap[Id, K, Object] + ): ResolverMutex[Id, K] = + new ResolverMutex[Id, K] { + def withLockOn[A](key: K)(f: => A): A = { + val mutexForKey = topMutex.synchronized { + val current = keyedMutex.get(key) + current match { + case Some(o) => + o + case None => + val o = new Object + keyedMutex.put(key, o) + o + } + } + + mutexForKey.synchronized(f) + } + } + + private[resolver] def asyncResolverMutex[F[_]: Async, K]( + topMutex: Mutex[F], + keyedMutex: LruMap[F, K, Mutex[F]] + ): ResolverMutex[F, K] = + new ResolverMutex[F, K] { + + def withLockOn[A](key: K)(f: => F[A]): F[A] = + topMutex.lock + .surround { + for { + current <- keyedMutex.get(key) + fixed <- current match { + case Some(m) => Async[F].pure(m) + case None => + for { + m <- Mutex[F] + _ <- keyedMutex.put(key, m) + } yield m + } + } yield fixed + } + .flatMap { mutexForKey => + mutexForKey.lock.surround(f) + } + } + +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/package.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/package.scala index 83bca016..b169f1ce 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/package.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/package.scala @@ -17,11 +17,8 @@ import io.circe.Json import scala.concurrent.duration.FiniteDuration -// LRU -import com.snowplowanalytics.lrumap.CreateLruMap - // Iglu Core -import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaList} +import com.snowplowanalytics.iglu.core.SchemaList // This project import resolver.registries.Registry @@ -82,7 +79,4 @@ package object resolver { /** Cache entry for schema list lookup results */ type ListCacheEntry = CacheEntry[ListLookup] - /** Ability to initialize the cache */ - type InitSchemaCache[F[_]] = CreateLruMap[F, SchemaKey, SchemaCacheEntry] - type InitListCache[F[_]] = CreateLruMap[F, ListCacheKey, ListCacheEntry] } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/registries/Registry.scala b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/registries/Registry.scala index 869de974..7d3dbd76 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/registries/Registry.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.iglu/client/resolver/registries/Registry.scala @@ -37,8 +37,6 @@ sealed trait Registry extends Product with Serializable { object Registry { import Utils._ - type Get[F[_], A] = Registry => F[Either[RegistryError, A]] - /** * An embedded repository is one which is embedded inside the calling code, * e.g. inside the jar's resources folder diff --git a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverCacheSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverCacheSpec.scala index da988503..3b7916b6 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverCacheSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverCacheSpec.scala @@ -116,7 +116,6 @@ class ResolverCacheSpec extends Specification { object ResolverCacheSpec { // No need to overwrite anything - implicit val a: InitSchemaCache[StaticLookup] = staticCache - implicit val c: InitListCache[StaticLookup] = staticCacheForList - implicit val b: Clock[StaticLookup] = staticClock + implicit val a: CreateResolverCache[StaticLookup] = staticResolverCache + implicit val b: Clock[StaticLookup] = staticClock } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverResultSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverResultSpec.scala index 82338fd8..af5a7be2 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverResultSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverResultSpec.scala @@ -195,15 +195,14 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE RegistryError.RepoFailure("shouldn't matter").asLeft[Json] val correctResult = Json.Null.asRight[RegistryError] - val time = Instant.ofEpochMilli(2L) + val time = Instant.ofEpochMilli(3L) val responses = List(timeoutError, correctResult) val httpRep = Registry.Http(Registry.Config("Mock Repo", 1, List("com.snowplowanalytics.iglu-test")), null) - implicit val cache = ResolverSpecHelpers.staticCache - implicit val cacheList = ResolverSpecHelpers.staticCacheForList - implicit val clock = ResolverSpecHelpers.staticClock + implicit val cache = ResolverSpecHelpers.staticResolverCache + implicit val clock = ResolverSpecHelpers.staticClock implicit val registryLookup: RegistryLookup[StaticLookup] = ResolverSpecHelpers.getLookup(responses, Nil) @@ -258,9 +257,8 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE val httpRep = Registry.Http(Registry.Config("Mock Repo", 1, List("com.snowplowanalytics.iglu-test")), null) - implicit val cache = ResolverSpecHelpers.staticCache - implicit val cacheList = ResolverSpecHelpers.staticCacheForList - implicit val clock = ResolverSpecHelpers.staticClock + implicit val cache = ResolverSpecHelpers.staticResolverCache + implicit val clock = ResolverSpecHelpers.staticClock implicit val registryLookup: RegistryLookup[StaticLookup] = ResolverSpecHelpers.getLookup(responses, Nil) @@ -318,9 +316,8 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE null ) - implicit val cache = ResolverSpecHelpers.staticCache - implicit val cacheList = ResolverSpecHelpers.staticCacheForList - implicit val clock = ResolverSpecHelpers.staticClock + implicit val cache = ResolverSpecHelpers.staticResolverCache + implicit val clock = ResolverSpecHelpers.staticClock implicit val registryLookup: RegistryLookup[StaticLookup] = ResolverSpecHelpers.getLookupByRepo( Map( "Mock Repo 1" -> List(error1.asLeft, error2.asLeft), @@ -331,12 +328,12 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE val expected = ResolutionError( SortedMap( - "Mock Repo 1" -> LookupHistory(Set(error1, error2), 2, Instant.ofEpochMilli(2008L)), - "Mock Repo 2" -> LookupHistory(Set(error3, error4), 2, Instant.ofEpochMilli(2009L)), + "Mock Repo 1" -> LookupHistory(Set(error1, error2), 2, Instant.ofEpochMilli(2010L)), + "Mock Repo 2" -> LookupHistory(Set(error3, error4), 2, Instant.ofEpochMilli(2011L)), "Iglu Client Embedded" -> LookupHistory( Set(RegistryError.NotFound), 1, - Instant.ofEpochMilli(4L) + Instant.ofEpochMilli(5L) ) ) ) @@ -452,9 +449,8 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE val httpRep = Registry.Http(Registry.Config("Mock Repo", 1, List("com.snowplowanalytics.iglu-test")), null) - implicit val cache = ResolverSpecHelpers.staticCache - implicit val cacheList = ResolverSpecHelpers.staticCacheForList - implicit val clock = ResolverSpecHelpers.staticClock + implicit val cache = ResolverSpecHelpers.staticResolverCache + implicit val clock = ResolverSpecHelpers.staticClock implicit val registryLookup: RegistryLookup[StaticLookup] = ResolverSpecHelpers.getLookup(responses, Nil) @@ -491,9 +487,8 @@ class ResolverResultSpec extends Specification with ValidatedMatchers with CatsE val httpRep = Registry.Http(Registry.Config("Mock Repo", 1, List("com.snowplowanalytics.iglu-test")), null) - implicit val cache = ResolverSpecHelpers.staticCache - implicit val cacheList = ResolverSpecHelpers.staticCacheForList - implicit val clock = ResolverSpecHelpers.staticClock + implicit val cache = ResolverSpecHelpers.staticResolverCache + implicit val clock = ResolverSpecHelpers.staticClock implicit val registryLookup: RegistryLookup[StaticLookup] = ResolverSpecHelpers.getLookup(responses, Nil) diff --git a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpec.scala index 54ba8772..6272f031 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpec.scala @@ -204,15 +204,14 @@ class ResolverSpec extends Specification with CatsEffect { RegistryError.RepoFailure("shouldn't matter").asLeft[Json] val correctSchema = Json.Null.asRight[RegistryError] - val time = Instant.ofEpochMilli(2L) + val time = Instant.ofEpochMilli(3L) val responses = List(timeoutError, correctSchema) val httpRep = Registry.Http(Registry.Config("Mock Repo", 1, List("com.snowplowanalytics.iglu-test")), null) - implicit val cache: InitSchemaCache[StaticLookup] = ResolverSpecHelpers.staticCache - implicit val cacheList: InitListCache[StaticLookup] = ResolverSpecHelpers.staticCacheForList - implicit val clock: Clock[StaticLookup] = ResolverSpecHelpers.staticClock + implicit val cache: CreateResolverCache[StaticLookup] = ResolverSpecHelpers.staticResolverCache + implicit val clock: Clock[StaticLookup] = ResolverSpecHelpers.staticClock implicit val registryLookup: RegistryLookup[StaticLookup] = ResolverSpecHelpers.getLookup(responses, Nil) @@ -264,9 +263,8 @@ class ResolverSpec extends Specification with CatsEffect { val httpRep = Registry.Http(Registry.Config("Mock Repo", 1, List("com.snowplowanalytics.iglu-test")), null) - implicit val cache: InitSchemaCache[StaticLookup] = ResolverSpecHelpers.staticCache - implicit val cacheList: InitListCache[StaticLookup] = ResolverSpecHelpers.staticCacheForList - implicit val clock: Clock[StaticLookup] = ResolverSpecHelpers.staticClock + implicit val cache: CreateResolverCache[StaticLookup] = ResolverSpecHelpers.staticResolverCache + implicit val clock: Clock[StaticLookup] = ResolverSpecHelpers.staticClock implicit val registryLookup: RegistryLookup[StaticLookup] = ResolverSpecHelpers.getLookup(responses, Nil) @@ -317,9 +315,8 @@ class ResolverSpec extends Specification with CatsEffect { null ) - implicit val cache: InitSchemaCache[StaticLookup] = ResolverSpecHelpers.staticCache - implicit val cacheList: InitListCache[StaticLookup] = ResolverSpecHelpers.staticCacheForList - implicit val clock: Clock[StaticLookup] = ResolverSpecHelpers.staticClock + implicit val cache: CreateResolverCache[StaticLookup] = ResolverSpecHelpers.staticResolverCache + implicit val clock: Clock[StaticLookup] = ResolverSpecHelpers.staticClock implicit val registryLookup: RegistryLookup[StaticLookup] = ResolverSpecHelpers.getLookupByRepo( Map( "Mock Repo 1" -> List(error1.asLeft, error2.asLeft), @@ -330,12 +327,12 @@ class ResolverSpec extends Specification with CatsEffect { val expected = ResolutionError( SortedMap( - "Mock Repo 1" -> LookupHistory(Set(error1, error2), 2, Instant.ofEpochMilli(2008L)), - "Mock Repo 2" -> LookupHistory(Set(error3, error4), 2, Instant.ofEpochMilli(2009L)), + "Mock Repo 1" -> LookupHistory(Set(error1, error2), 2, Instant.ofEpochMilli(2010L)), + "Mock Repo 2" -> LookupHistory(Set(error3, error4), 2, Instant.ofEpochMilli(2011L)), "Iglu Client Embedded" -> LookupHistory( Set(RegistryError.NotFound), 1, - Instant.ofEpochMilli(4L) + Instant.ofEpochMilli(5L) ) ) ) diff --git a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpecHelpers.scala b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpecHelpers.scala index 4cbfb9ad..202feac7 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpecHelpers.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.iglu.client/resolver/ResolverSpecHelpers.scala @@ -27,7 +27,7 @@ import io.circe.Json // LRU Map import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaList} -import com.snowplowanalytics.lrumap.{CreateLruMap, LruMap} +import com.snowplowanalytics.lrumap.LruMap // This project import com.snowplowanalytics.iglu.client.resolver.registries.{ @@ -89,15 +89,14 @@ object ResolverSpecHelpers { State(s => (s.copy(time = s.time + delta), ())) } - val staticCache: InitSchemaCache[StaticLookup] = - new CreateLruMap[StaticLookup, SchemaKey, SchemaCacheEntry] { - def create(size: Int): StaticLookup[LruMap[StaticLookup, SchemaKey, SchemaCacheEntry]] = + val staticResolverCache: CreateResolverCache[StaticLookup] = + new CreateResolverCache[StaticLookup] { + def createSchemaCache( + size: Int + ): StaticLookup[LruMap[StaticLookup, SchemaKey, SchemaCacheEntry]] = State(s => (s.copy(cacheSize = size), StateCache)) - } - val staticCacheForList: InitListCache[StaticLookup] = - new CreateLruMap[StaticLookup, ListCacheKey, ListCacheEntry] { - def create( + def createSchemaListCache( size: Int ): StaticLookup[LruMap[StaticLookup, ListCacheKey, ListCacheEntry]] = State { s => @@ -105,6 +104,9 @@ object ResolverSpecHelpers { val state = s.copy(cacheSize = size) (state, cache) } + + def createMutex[K]: StaticLookup[ResolverMutex[StaticLookup, K]] = + State.pure(stateMutex[K]) } val staticClock: Clock[StaticLookup] = @@ -215,4 +217,10 @@ object ResolverSpecHelpers { (state.copy(schemaLists = value :: state.schemaLists).tick, ()) } } + + private def stateMutex[K]: ResolverMutex[StaticLookup, K] = + new ResolverMutex[StaticLookup, K] { + def withLockOn[A](key: K)(f: => StaticLookup[A]): StaticLookup[A] = + f + } } diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 45a4c18f..fdbfc033 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -68,9 +68,9 @@ object BuildSettings { // clear-out mimaBinaryIssueFilters and mimaPreviousVersions. // Otherwise, add previous version to set without // removing other versions. - val mimaPreviousVersionsData = Set("2.1.0", "2.2.0") - val mimaPreviousVersionsCore = Set("2.1.0", "2.2.0") - val mimaPreviousVersionsHttp4s = Set("2.0.0", "2.1.0", "2.2.0") + val mimaPreviousVersionsData = Set() + val mimaPreviousVersionsCore = Set() + val mimaPreviousVersionsHttp4s = Set() val mimaPreviousVersionsScala3 = Set() lazy val mimaSettings = Seq(