Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test for the mutex version of resolver #229

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import cats.data.EitherT
import cats.effect.{Clock, IO}
import io.circe.{DecodingFailure, Json}
import com.snowplowanalytics.iglu.core.SelfDescribingData
import resolver.{InitListCache, InitSchemaCache}
import resolver.CreateResolverCache
import resolver.registries.{Registry, RegistryLookup}

/**
Expand Down Expand Up @@ -51,7 +51,7 @@ object Client {
val IgluCentral: Client[IO, Json] =
Client[IO, Json](Resolver(List(Registry.IgluCentral), None), CirceValidator)

def parseDefault[F[_]: Monad: InitSchemaCache: InitListCache](
def parseDefault[F[_]: Monad: CreateResolverCache](
json: Json
): EitherT[F, DecodingFailure, Client[F, Json]] =
EitherT(Resolver.parse[F](json)).map { resolver =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import cats.data.EitherT
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.resolver.{InitListCache, InitSchemaCache}
import com.snowplowanalytics.iglu.client.resolver.CreateResolverCache
import com.snowplowanalytics.iglu.client.validator.CirceValidator.WithCaching.{
InitValidatorCache,
SchemaEvaluationCache,
Expand Down Expand Up @@ -54,7 +54,7 @@ final class IgluCirceClient[F[_]] private (

object IgluCirceClient {

def parseDefault[F[_]: Monad: InitSchemaCache: InitListCache: InitValidatorCache](
def parseDefault[F[_]: Monad: CreateResolverCache: InitValidatorCache](
json: Json
): EitherT[F, DecodingFailure, IgluCirceClient[F]] =
for {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2018-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.iglu.client.resolver

import cats.Id
import cats.effect.Async
import cats.effect.std.Mutex
import cats.implicits._
import com.snowplowanalytics.lrumap.{CreateLruMap, LruMap}
import com.snowplowanalytics.iglu.core.SchemaKey

trait CreateResolverCache[F[_]] {

def createSchemaCache(size: Int): F[LruMap[F, SchemaKey, SchemaCacheEntry]]

def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]]

def createMutex[K]: F[ResolverMutex[F, K]]

}

object CreateResolverCache {

def apply[F[_]](implicit instance: CreateResolverCache[F]): CreateResolverCache[F] = instance

private trait SimpleCreateResolverCache[F[_]] extends CreateResolverCache[F] {

def createLruMap[K, V](size: Int): F[LruMap[F, K, V]]

override def createSchemaCache(size: Int): F[LruMap[F, SchemaKey, SchemaCacheEntry]] =
createLruMap(size)

override def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]] =
createLruMap(size)

}

implicit def idCreateResolverCache: CreateResolverCache[Id] =
new SimpleCreateResolverCache[Id] {
def createLruMap[K, V](size: Int): LruMap[Id, K, V] =
CreateLruMap[Id, K, V].create(size)

def createMutex[K]: ResolverMutex[Id, K] =
ResolverMutex.idResolverMutex(new Object, createLruMap[K, Object](1000))
}

implicit def asyncCreateResolverCache[F[_]: Async]: CreateResolverCache[F] =
new SimpleCreateResolverCache[F] {

def createLruMap[K, V](size: Int): F[LruMap[F, K, V]] =
CreateLruMap[F, K, V].create(size)

def createMutex[K]: F[ResolverMutex[F, K]] =
for {
mutex <- Mutex[F]
lrumap <- createLruMap[K, Mutex[F]](1000)
} yield ResolverMutex.asyncResolverMutex(mutex, lrumap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import cats.effect.Clock
import cats.implicits._
import cats.{Applicative, Id, Monad}
import com.snowplowanalytics.iglu.client.ClientError.ResolutionError
import com.snowplowanalytics.iglu.client.resolver.registries.Registry.Get
import com.snowplowanalytics.iglu.client.resolver.ResolverCache.TimestampedItem
import com.snowplowanalytics.iglu.client.resolver.registries.{
Registry,
Expand Down Expand Up @@ -74,15 +73,39 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
.pure[F]
}

def lockAndLookup: F[Either[ResolutionError, SchemaLookupResult]] =
withLockOnSchemaKey(schemaKey) {
getSchemaFromCache(schemaKey).flatMap {
case Some(TimestampedItem(Right(schema), timestamp)) =>
Monad[F].pure(Right(ResolverResult.Cached(schemaKey, schema, timestamp)))
case Some(TimestampedItem(Left(failures), _)) =>
for {
toBeRetried <- reposForRetry(failures)
result <- traverseRepos[F, Json](
get,
prioritize(schemaKey.vendor, toBeRetried),
failures
)
fixed <- handleAfterFetch(result)
} yield fixed
case None =>
traverseRepos[F, Json](get, prioritize(schemaKey.vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
}
}

getSchemaFromCache(schemaKey).flatMap {
case Some(TimestampedItem(Right(schema), timestamp)) =>
Monad[F].pure(Right(ResolverResult.Cached(schemaKey, schema, timestamp)))
case Some(TimestampedItem(Left(failures), _)) =>
retryCached[F, Json](get, schemaKey.vendor)(failures)
.flatMap(handleAfterFetch)
reposForRetry(failures).flatMap {
case Nil =>
Monad[F].pure(Left(resolutionError(failures)))
case _ =>
lockAndLookup
}
case None =>
traverseRepos[F, Json](get, prioritize(schemaKey.vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
lockAndLookup
}
}

Expand Down Expand Up @@ -164,19 +187,44 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
.pure[F]
}

def lockAndLookup: F[Either[ResolutionError, SchemaListLookupResult]] =
withLockOnSchemaModel(vendor, name, model) {
getSchemaListFromCache(vendor, name, model).flatMap {
case Some(TimestampedItem(Right(schemaList), timestamp)) =>
if (mustIncludeKey.forall(schemaList.schemas.contains))
Monad[F].pure(
Right(ResolverResult.Cached((vendor, name, model), schemaList, timestamp))
)
else
traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
case Some(TimestampedItem(Left(failures), _)) =>
for {
toBeRetried <- reposForRetry(failures)
result <- traverseRepos[F, SchemaList](get, prioritize(vendor, toBeRetried), failures)
fixed <- handleAfterFetch(result)
} yield fixed
case None =>
traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
}
}

getSchemaListFromCache(vendor, name, model).flatMap {
case Some(TimestampedItem(Right(schemaList), timestamp)) =>
if (mustIncludeKey.forall(schemaList.schemas.contains))
Monad[F].pure(Right(ResolverResult.Cached((vendor, name, model), schemaList, timestamp)))
else
traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
lockAndLookup
case Some(TimestampedItem(Left(failures), _)) =>
retryCached[F, SchemaList](get, vendor)(failures)
.flatMap(handleAfterFetch)
reposForRetry(failures).flatMap {
case Nil =>
Monad[F].pure(Left(resolutionError(failures)))
case _ =>
lockAndLookup
}
case None =>
traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
lockAndLookup
}
}

Expand Down Expand Up @@ -233,6 +281,18 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
case None => Monad[F].pure(None)
}

private def withLockOnSchemaKey[A](schemaKey: SchemaKey)(f: => F[A]): F[A] =
cache match {
case Some(c) => c.withLockOnSchemaKey(schemaKey)(f)
case None => f
}

private def withLockOnSchemaModel[A](vendor: Vendor, name: Name, model: Model)(f: => F[A]): F[A] =
cache match {
case Some(c) => c.withLockOnSchemaModel(vendor, name, model)(f)
case None => f
}

private def getSchemaListFromCache(
vendor: Vendor,
name: Name,
Expand Down Expand Up @@ -278,17 +338,12 @@ object Resolver {
/** The result of a lookup when the resolver is not configured to use a cache */
case class NotCached[A](value: A) extends ResolverResult[Nothing, A]
}
def retryCached[F[_]: Clock: Monad: RegistryLookup, A](
get: Get[F, A],
vendor: Vendor
)(
cachedErrors: LookupFailureMap
): F[Either[LookupFailureMap, A]] =
for {
now <- Clock[F].realTimeInstant
reposForRetry = getReposForRetry(cachedErrors, now)
result <- traverseRepos[F, A](get, prioritize(vendor, reposForRetry), cachedErrors)
} yield result

private def reposForRetry[F[_]: Clock: Monad](cachedErrors: LookupFailureMap): F[List[Registry]] =
Clock[F].realTimeInstant
.map { now =>
getReposForRetry(cachedErrors, now)
}

/**
* Tail-recursive function to find our schema in one of our repositories
Expand Down Expand Up @@ -353,7 +408,7 @@ object Resolver {
* @param refs Any RepositoryRef to add to this resolver
* @return a configured Resolver instance
*/
def init[F[_]: Monad: InitSchemaCache: InitListCache](
def init[F[_]: Monad: CreateResolverCache](
cacheSize: Int,
cacheTtl: Option[TTL],
refs: Registry*
Expand All @@ -370,7 +425,7 @@ object Resolver {
private[client] val EmbeddedSchemaCount = 4

/** A Resolver which only looks at our embedded repo */
def bootstrap[F[_]: Monad: InitSchemaCache: InitListCache]: F[Resolver[F]] =
def bootstrap[F[_]: Monad: CreateResolverCache]: F[Resolver[F]] =
Resolver.init[F](EmbeddedSchemaCount, None, Registry.EmbeddedRegistry)

final case class ResolverConfig(
Expand Down Expand Up @@ -398,7 +453,7 @@ object Resolver {
* for this resolver
* @return a configured Resolver instance
*/
def parse[F[_]: Monad: InitSchemaCache: InitListCache](
def parse[F[_]: Monad: CreateResolverCache](
config: Json
): F[Either[DecodingFailure, Resolver[F]]] = {
val result: EitherT[F, DecodingFailure, Resolver[F]] = for {
Expand All @@ -419,7 +474,7 @@ object Resolver {
} yield config
}

def fromConfig[F[_]: Monad: InitSchemaCache: InitListCache](
def fromConfig[F[_]: Monad: CreateResolverCache](
config: ResolverConfig
): EitherT[F, DecodingFailure, Resolver[F]] = {
for {
Expand Down Expand Up @@ -498,6 +553,7 @@ object Resolver {
private val MinBackoff = 500 // ms

// Count how many milliseconds the Resolver needs to wait before retrying
// TODO: This should not exceed TTL
private def backoff(retryCount: Int): Long =
retryCount match {
case c if c > 20 => 1200000L + (retryCount * 100L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.circe.Json
import scala.concurrent.duration.{DurationInt, FiniteDuration}

// LruMap
import com.snowplowanalytics.lrumap.{CreateLruMap, LruMap}
import com.snowplowanalytics.lrumap.LruMap

// Iglu core
import com.snowplowanalytics.iglu.core.SchemaKey
Expand All @@ -39,6 +39,8 @@ import com.snowplowanalytics.iglu.core.SchemaKey
class ResolverCache[F[_]] private (
schemas: LruMap[F, SchemaKey, SchemaCacheEntry],
schemaLists: LruMap[F, ListCacheKey, ListCacheEntry],
schemaMutex: ResolverMutex[F, SchemaKey],
schemaListMutex: ResolverMutex[F, ListCacheKey],
val ttl: Option[TTL]
) {

Expand Down Expand Up @@ -133,6 +135,14 @@ class ResolverCache[F[_]] private (
freshResult: ListLookup
)(implicit F: Monad[F], C: Clock[F]): F[Either[LookupFailureMap, TimestampedItem[SchemaList]]] =
putItemResult(schemaLists, (vendor, name, model), freshResult)

private[resolver] def withLockOnSchemaKey[A](key: SchemaKey)(f: => F[A]): F[A] =
schemaMutex.withLockOn(key)(f)

private[resolver] def withLockOnSchemaModel[A](vendor: Vendor, name: Name, model: Model)(
f: => F[A]
): F[A] =
schemaListMutex.withLockOn((vendor, name, model))(f)
}

object ResolverCache {
Expand All @@ -144,14 +154,15 @@ object ResolverCache {
size: Int,
ttl: Option[TTL]
)(implicit
C: CreateLruMap[F, SchemaKey, SchemaCacheEntry],
L: CreateLruMap[F, ListCacheKey, ListCacheEntry]
C: CreateResolverCache[F]
): F[Option[ResolverCache[F]]] = {
if (shouldCreateResolverCache(size, ttl)) {
for {
schemas <- CreateLruMap[F, SchemaKey, SchemaCacheEntry].create(size)
schemaLists <- CreateLruMap[F, ListCacheKey, ListCacheEntry].create(size)
} yield new ResolverCache[F](schemas, schemaLists, ttl).some
schemas <- C.createSchemaCache(size)
schemaLists <- C.createSchemaListCache(size)
schemaMutex <- C.createMutex[SchemaKey]
listMutex <- C.createMutex[ListCacheKey]
} yield new ResolverCache[F](schemas, schemaLists, schemaMutex, listMutex, ttl).some
} else
Applicative[F].pure(none)
}
Expand Down
Loading