Skip to content

Commit

Permalink
Stop invalidating cache on registry failures (close #77)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Aug 8, 2019
1 parent f74c66d commit fcc108a
Show file tree
Hide file tree
Showing 16 changed files with 476 additions and 260 deletions.
1 change: 1 addition & 0 deletions build.sbt
Expand Up @@ -28,6 +28,7 @@ lazy val root = (project in file("."))
Dependencies.Libraries.igluCore,
Dependencies.Libraries.igluCoreCirce,
Dependencies.Libraries.cats,
Dependencies.Libraries.circeTime,
Dependencies.Libraries.circeParser,
Dependencies.Libraries.lruMap,
Dependencies.Libraries.scalaj,
Expand Down
2 changes: 2 additions & 0 deletions project/BuildSettings.scala
Expand Up @@ -42,6 +42,8 @@ object BuildSettings {

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.8"),

addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.0"),

parallelExecution in Test := false // possible race bugs
)

Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Expand Up @@ -35,6 +35,7 @@ object Dependencies {
val igluCoreCirce = "com.snowplowanalytics" %% "iglu-core-circe" % V.igluCore
val cats = "org.typelevel" %% "cats-core" % V.cats
val circeParser = "io.circe" %% "circe-parser" % V.circe
val circeTime = "io.circe" %% "circe-java8" % V.circe
val circeJackson = "io.circe" %% "circe-jackson29" % V.circe
val lruMap = "com.snowplowanalytics" %% "scala-lru-map" % V.lruMap
val scalaj = "org.scalaj" %% "scalaj-http" % V.scalaj
Expand Down
Expand Up @@ -39,7 +39,7 @@ final case class Client[F[_], A](resolver: Resolver[F], validator: Validator[A])
L: RegistryLookup[F],
C: Clock[F]): EitherT[F, ClientError, Unit] =
for {
schema <- EitherT(resolver.lookupSchema(instance.schema, 3))
schema <- EitherT(resolver.lookupSchema(instance.schema))
schemaValidation = validator.validateSchema(schema)
_ <- EitherT.fromEither(schemaValidation).leftMap(_.toClientError)
validation = validator.validate(instance.data, schema)
Expand Down
48 changes: 26 additions & 22 deletions src/main/scala/com.snowplowanalytics.iglu/client/ClientError.scala
Expand Up @@ -12,17 +12,21 @@
*/
package com.snowplowanalytics.iglu.client

import java.time.Instant

import cats.Show
import cats.syntax.show._
import cats.syntax.either._

import io.circe.{Decoder, DecodingFailure, Encoder, Json}
import io.circe.java8.time._
import io.circe.syntax._

import validator.ValidatorError
import resolver.LookupHistory
import resolver.registries.RegistryError

/** Common type for Resolver's and Validator's errors */
sealed trait ClientError extends Product with Serializable {
def getMessage: String =
ClientError.igluClientResolutionErrorCirceEncoder(this).noSpaces
Expand Down Expand Up @@ -82,28 +86,6 @@ object ClientError {

}

// Auxiliary entity, helping to decode Map[String, LookupHistory]
private case class RepoLookupHistory(
repository: String,
errors: Set[RegistryError],
attempts: Int,
fatal: Boolean) {
def toField: (String, LookupHistory) =
(repository, LookupHistory(errors, attempts, fatal))
}

private object RepoLookupHistory {
implicit val repoLookupHistoryDecoder: Decoder[RepoLookupHistory] =
Decoder.instance { cursor =>
for {
repository <- cursor.downField("repository").as[String]
errors <- cursor.downField("errors").as[Set[RegistryError]]
attempts <- cursor.downField("attempts").as[Int]
fatal <- cursor.downField("fatal").as[Boolean]
} yield RepoLookupHistory(repository, errors, attempts, fatal)
}
}

implicit val igluClientShowInstance: Show[ClientError] =
Show.show {
case ClientError.ValidationError(ValidatorError.InvalidData(reports)) =>
Expand All @@ -128,4 +110,26 @@ object ClientError {
}
s"Schema cannot be resolved in following repositories:\n${errors.mkString("\n")}"
}

// Auxiliary entity, helping to decode Map[String, LookupHistory]
private case class RepoLookupHistory(
repository: String,
errors: Set[RegistryError],
attempts: Int,
lastAttempt: Instant) {
def toField: (String, LookupHistory) =
(repository, LookupHistory(errors, attempts, lastAttempt))
}

private object RepoLookupHistory {
implicit val repoLookupHistoryDecoder: Decoder[RepoLookupHistory] =
Decoder.instance { cursor =>
for {
repository <- cursor.downField("repository").as[String]
errors <- cursor.downField("errors").as[Set[RegistryError]]
attempts <- cursor.downField("attempts").as[Int]
last <- cursor.downField("lastAttempt").as[Instant]
} yield RepoLookupHistory(repository, errors, attempts, last)
}
}
}
Expand Up @@ -12,12 +12,15 @@
*/
package com.snowplowanalytics.iglu.client.resolver

import java.time.Instant

import cats.Semigroup
import cats.instances.set._
import cats.syntax.semigroup._
import cats.syntax.either._

import io.circe.{Decoder, Encoder, Json}
import io.circe.java8.time._
import io.circe.syntax._

import registries.RegistryError
Expand All @@ -27,10 +30,13 @@ import registries.RegistryError
* Using to aggregate all errors for single schema for single repo during all retries
*
* @param errors set of all errors happened during all attempts
* @param attempts amount of undertaken attempts
* @param fatal indicates whether among failures were unrecoverable ones (like invalid schema)
* @param attempts amount of undertaken attempts, *since last TTL invalidation*
* @param lastAttempt when Resolver tried to fetch it last time
*/
case class LookupHistory(errors: Set[RegistryError], attempts: Int, fatal: Boolean)
case class LookupHistory(errors: Set[RegistryError], attempts: Int, lastAttempt: Instant) {
def incrementAttempt: LookupHistory =
LookupHistory(errors, attempts + 1, lastAttempt)
}

object LookupHistory {

Expand All @@ -42,23 +48,26 @@ object LookupHistory {
override def combine(a: LookupHistory, b: LookupHistory): LookupHistory =
LookupHistory(
(a.errors |+| b.errors).take(MaxErrors),
a.attempts.max(b.attempts) + 1,
a.fatal || b.fatal)
a.attempts.max(b.attempts),
maxInstant(a.lastAttempt, b.lastAttempt))
}

implicit val lookupHistoryEncoder: Encoder[LookupHistory] = Encoder.instance { history =>
Json.obj(
"errors" := history.errors.asJson,
"attempts" := history.attempts.asJson,
"fatal" := history.fatal.asJson
"lastAttempt" := history.lastAttempt.asJson
)
}

implicit val lookupHistoryDecoder: Decoder[LookupHistory] = Decoder.instance { cursor =>
for {
errors <- cursor.downField("errors").as[Set[RegistryError]]
attempts <- cursor.downField("attempts").as[Int]
fatal <- cursor.downField("fatal").as[Boolean]
} yield LookupHistory(errors, attempts, fatal)
last <- cursor.downField("lastAttempt").as[Instant]
} yield LookupHistory(errors, attempts, last)
}

private def maxInstant(a: Instant, b: Instant): Instant =
if (a.isAfter(b)) a else b
}

0 comments on commit fcc108a

Please sign in to comment.