Skip to content

Commit

Permalink
Resolver caches suffers from races and http server overload during th…
Browse files Browse the repository at this point in the history
…e cold start (close #227)
  • Loading branch information
istreeter committed Feb 22, 2023
1 parent c4864bb commit 5777ec3
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import cats.data.EitherT
import cats.effect.{Clock, IO}
import io.circe.{DecodingFailure, Json}
import com.snowplowanalytics.iglu.core.SelfDescribingData
import resolver.{InitListCache, InitSchemaCache}
import resolver.CreateResolverCache
import resolver.registries.{Registry, RegistryLookup}

/**
Expand Down Expand Up @@ -51,7 +51,7 @@ object Client {
val IgluCentral: Client[IO, Json] =
Client[IO, Json](Resolver(List(Registry.IgluCentral), None), CirceValidator)

def parseDefault[F[_]: Monad: InitSchemaCache: InitListCache](
def parseDefault[F[_]: Monad: CreateResolverCache](
json: Json
): EitherT[F, DecodingFailure, Client[F, Json]] =
EitherT(Resolver.parse[F](json)).map { resolver =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import cats.data.EitherT
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.resolver.{InitListCache, InitSchemaCache}
import com.snowplowanalytics.iglu.client.resolver.CreateResolverCache
import com.snowplowanalytics.iglu.client.validator.CirceValidator.WithCaching.{
InitValidatorCache,
SchemaEvaluationCache,
Expand Down Expand Up @@ -54,7 +54,7 @@ final class IgluCirceClient[F[_]] private (

object IgluCirceClient {

def parseDefault[F[_]: Monad: InitSchemaCache: InitListCache: InitValidatorCache](
def parseDefault[F[_]: Monad: CreateResolverCache: InitValidatorCache](
json: Json
): EitherT[F, DecodingFailure, IgluCirceClient[F]] =
for {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2018-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.iglu.client.resolver

import cats.Id
import cats.effect.Async
import cats.effect.std.Mutex
import cats.implicits._
import com.snowplowanalytics.lrumap.{CreateLruMap, LruMap}
import com.snowplowanalytics.iglu.core.SchemaKey

trait CreateResolverCache[F[_]] {

def createSchemaCache(size: Int): F[LruMap[F, SchemaKey, SchemaCacheEntry]]

def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]]

def createMutex[K]: F[ResolverMutex[F, K]]

}

object CreateResolverCache {

def apply[F[_]](implicit instance: CreateResolverCache[F]): CreateResolverCache[F] = instance

private trait SimpleCreateResolverCache[F[_]] extends CreateResolverCache[F] {

def createLruMap[K, V](size: Int): F[LruMap[F, K, V]]

override def createSchemaCache(size: Int): F[LruMap[F, SchemaKey, SchemaCacheEntry]] =
createLruMap(size)

override def createSchemaListCache(size: Int): F[LruMap[F, ListCacheKey, ListCacheEntry]] =
createLruMap(size)

}

implicit def idCreateResolverCache: CreateResolverCache[Id] =
new SimpleCreateResolverCache[Id] {
def createLruMap[K, V](size: Int): LruMap[Id, K, V] =
CreateLruMap[Id, K, V].create(size)

def createMutex[K]: ResolverMutex[Id, K] =
ResolverMutex.idResolverMutex(new Object, createLruMap[K, Object](1000))
}

implicit def asyncCreateResolverCache[F[_]: Async]: CreateResolverCache[F] =
new SimpleCreateResolverCache[F] {

def createLruMap[K, V](size: Int): F[LruMap[F, K, V]] =
CreateLruMap[F, K, V].create(size)

def createMutex[K]: F[ResolverMutex[F, K]] =
for {
mutex <- Mutex[F]
lrumap <- createLruMap[K, Mutex[F]](1000)
} yield ResolverMutex.asyncResolverMutex(mutex, lrumap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import cats.effect.Clock
import cats.implicits._
import cats.{Applicative, Id, Monad}
import com.snowplowanalytics.iglu.client.ClientError.ResolutionError
import com.snowplowanalytics.iglu.client.resolver.registries.Registry.Get
import com.snowplowanalytics.iglu.client.resolver.ResolverCache.TimestampedItem
import com.snowplowanalytics.iglu.client.resolver.registries.{
Registry,
Expand Down Expand Up @@ -74,15 +73,39 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
.pure[F]
}

def lockAndLookup: F[Either[ResolutionError, SchemaLookupResult]] =
withLockOnSchemaKey(schemaKey) {
getSchemaFromCache(schemaKey).flatMap {
case Some(TimestampedItem(Right(schema), timestamp)) =>
Monad[F].pure(Right(ResolverResult.Cached(schemaKey, schema, timestamp)))
case Some(TimestampedItem(Left(failures), _)) =>
for {
toBeRetried <- reposForRetry(failures)
result <- traverseRepos[F, Json](
get,
prioritize(schemaKey.vendor, toBeRetried),
failures
)
fixed <- handleAfterFetch(result)
} yield fixed
case None =>
traverseRepos[F, Json](get, prioritize(schemaKey.vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
}
}

getSchemaFromCache(schemaKey).flatMap {
case Some(TimestampedItem(Right(schema), timestamp)) =>
Monad[F].pure(Right(ResolverResult.Cached(schemaKey, schema, timestamp)))
case Some(TimestampedItem(Left(failures), _)) =>
retryCached[F, Json](get, schemaKey.vendor)(failures)
.flatMap(handleAfterFetch)
reposForRetry(failures).flatMap {
case Nil =>
Monad[F].pure(Left(resolutionError(failures)))
case _ =>
lockAndLookup
}
case None =>
traverseRepos[F, Json](get, prioritize(schemaKey.vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
lockAndLookup
}
}

Expand Down Expand Up @@ -164,19 +187,44 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
.pure[F]
}

def lockAndLookup: F[Either[ResolutionError, SchemaListLookupResult]] =
withLockOnSchemaModel(vendor, name, model) {
getSchemaListFromCache(vendor, name, model).flatMap {
case Some(TimestampedItem(Right(schemaList), timestamp)) =>
if (mustIncludeKey.forall(schemaList.schemas.contains))
Monad[F].pure(
Right(ResolverResult.Cached((vendor, name, model), schemaList, timestamp))
)
else
traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
case Some(TimestampedItem(Left(failures), _)) =>
for {
toBeRetried <- reposForRetry(failures)
result <- traverseRepos[F, SchemaList](get, prioritize(vendor, toBeRetried), failures)
fixed <- handleAfterFetch(result)
} yield fixed
case None =>
traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
}
}

getSchemaListFromCache(vendor, name, model).flatMap {
case Some(TimestampedItem(Right(schemaList), timestamp)) =>
if (mustIncludeKey.forall(schemaList.schemas.contains))
Monad[F].pure(Right(ResolverResult.Cached((vendor, name, model), schemaList, timestamp)))
else
traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
lockAndLookup
case Some(TimestampedItem(Left(failures), _)) =>
retryCached[F, SchemaList](get, vendor)(failures)
.flatMap(handleAfterFetch)
reposForRetry(failures).flatMap {
case Nil =>
Monad[F].pure(Left(resolutionError(failures)))
case _ =>
lockAndLookup
}
case None =>
traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
lockAndLookup
}
}

Expand Down Expand Up @@ -233,6 +281,18 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
case None => Monad[F].pure(None)
}

private def withLockOnSchemaKey[A](schemaKey: SchemaKey)(f: => F[A]): F[A] =
cache match {
case Some(c) => c.withLockOnSchemaKey(schemaKey)(f)
case None => f
}

private def withLockOnSchemaModel[A](vendor: Vendor, name: Name, model: Model)(f: => F[A]): F[A] =
cache match {
case Some(c) => c.withLockOnSchemaModel(vendor, name, model)(f)
case None => f
}

private def getSchemaListFromCache(
vendor: Vendor,
name: Name,
Expand Down Expand Up @@ -278,17 +338,12 @@ object Resolver {
/** The result of a lookup when the resolver is not configured to use a cache */
case class NotCached[A](value: A) extends ResolverResult[Nothing, A]
}
def retryCached[F[_]: Clock: Monad: RegistryLookup, A](
get: Get[F, A],
vendor: Vendor
)(
cachedErrors: LookupFailureMap
): F[Either[LookupFailureMap, A]] =
for {
now <- Clock[F].realTimeInstant
reposForRetry = getReposForRetry(cachedErrors, now)
result <- traverseRepos[F, A](get, prioritize(vendor, reposForRetry), cachedErrors)
} yield result

private def reposForRetry[F[_]: Clock: Monad](cachedErrors: LookupFailureMap): F[List[Registry]] =
Clock[F].realTimeInstant
.map { now =>
getReposForRetry(cachedErrors, now)
}

/**
* Tail-recursive function to find our schema in one of our repositories
Expand Down Expand Up @@ -353,7 +408,7 @@ object Resolver {
* @param refs Any RepositoryRef to add to this resolver
* @return a configured Resolver instance
*/
def init[F[_]: Monad: InitSchemaCache: InitListCache](
def init[F[_]: Monad: CreateResolverCache](
cacheSize: Int,
cacheTtl: Option[TTL],
refs: Registry*
Expand All @@ -370,7 +425,7 @@ object Resolver {
private[client] val EmbeddedSchemaCount = 4

/** A Resolver which only looks at our embedded repo */
def bootstrap[F[_]: Monad: InitSchemaCache: InitListCache]: F[Resolver[F]] =
def bootstrap[F[_]: Monad: CreateResolverCache]: F[Resolver[F]] =
Resolver.init[F](EmbeddedSchemaCount, None, Registry.EmbeddedRegistry)

final case class ResolverConfig(
Expand Down Expand Up @@ -398,7 +453,7 @@ object Resolver {
* for this resolver
* @return a configured Resolver instance
*/
def parse[F[_]: Monad: InitSchemaCache: InitListCache](
def parse[F[_]: Monad: CreateResolverCache](
config: Json
): F[Either[DecodingFailure, Resolver[F]]] = {
val result: EitherT[F, DecodingFailure, Resolver[F]] = for {
Expand All @@ -419,7 +474,7 @@ object Resolver {
} yield config
}

def fromConfig[F[_]: Monad: InitSchemaCache: InitListCache](
def fromConfig[F[_]: Monad: CreateResolverCache](
config: ResolverConfig
): EitherT[F, DecodingFailure, Resolver[F]] = {
for {
Expand Down Expand Up @@ -498,6 +553,7 @@ object Resolver {
private val MinBackoff = 500 // ms

// Count how many milliseconds the Resolver needs to wait before retrying
// TODO: This should not exceed TTL
private def backoff(retryCount: Int): Long =
retryCount match {
case c if c > 20 => 1200000L + (retryCount * 100L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.circe.Json
import scala.concurrent.duration.{DurationInt, FiniteDuration}

// LruMap
import com.snowplowanalytics.lrumap.{CreateLruMap, LruMap}
import com.snowplowanalytics.lrumap.LruMap

// Iglu core
import com.snowplowanalytics.iglu.core.SchemaKey
Expand All @@ -39,6 +39,8 @@ import com.snowplowanalytics.iglu.core.SchemaKey
class ResolverCache[F[_]] private (
schemas: LruMap[F, SchemaKey, SchemaCacheEntry],
schemaLists: LruMap[F, ListCacheKey, ListCacheEntry],
schemaMutex: ResolverMutex[F, SchemaKey],
schemaListMutex: ResolverMutex[F, ListCacheKey],
val ttl: Option[TTL]
) {

Expand Down Expand Up @@ -133,6 +135,14 @@ class ResolverCache[F[_]] private (
freshResult: ListLookup
)(implicit F: Monad[F], C: Clock[F]): F[Either[LookupFailureMap, TimestampedItem[SchemaList]]] =
putItemResult(schemaLists, (vendor, name, model), freshResult)

private[resolver] def withLockOnSchemaKey[A](key: SchemaKey)(f: => F[A]): F[A] =
schemaMutex.withLockOn(key)(f)

private[resolver] def withLockOnSchemaModel[A](vendor: Vendor, name: Name, model: Model)(
f: => F[A]
): F[A] =
schemaListMutex.withLockOn((vendor, name, model))(f)
}

object ResolverCache {
Expand All @@ -144,14 +154,15 @@ object ResolverCache {
size: Int,
ttl: Option[TTL]
)(implicit
C: CreateLruMap[F, SchemaKey, SchemaCacheEntry],
L: CreateLruMap[F, ListCacheKey, ListCacheEntry]
C: CreateResolverCache[F]
): F[Option[ResolverCache[F]]] = {
if (shouldCreateResolverCache(size, ttl)) {
for {
schemas <- CreateLruMap[F, SchemaKey, SchemaCacheEntry].create(size)
schemaLists <- CreateLruMap[F, ListCacheKey, ListCacheEntry].create(size)
} yield new ResolverCache[F](schemas, schemaLists, ttl).some
schemas <- C.createSchemaCache(size)
schemaLists <- C.createSchemaListCache(size)
schemaMutex <- C.createMutex[SchemaKey]
listMutex <- C.createMutex[ListCacheKey]
} yield new ResolverCache[F](schemas, schemaLists, schemaMutex, listMutex, ttl).some
} else
Applicative[F].pure(none)
}
Expand Down
Loading

0 comments on commit 5777ec3

Please sign in to comment.