Skip to content
This repository has been archived by the owner on Dec 10, 2018. It is now read-only.

Commit

Permalink
iss #6: storage actor: handle GetAggregatedResults: some tests & refa…
Browse files Browse the repository at this point in the history
…ctoring
  • Loading branch information
maizy committed Mar 25, 2016
1 parent 283f52e commit de85a5d
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 28 deletions.
10 changes: 5 additions & 5 deletions server/src/main/scala/ru/maizy/cheesecake/server/ServerApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import com.typesafe.config.ConfigFactory
import ru.maizy.cheesecake.server.resultsstorage.{ Aggregate, AggregateType, AggregatedResults, AllEndpoints }
import ru.maizy.cheesecake.server.resultsstorage.{ EndpointCheckResults, GetAggregatedResults, GetAllEndpoints }
import ru.maizy.cheesecake.server.resultsstorage.{ GetEndpointCheckResults, InMemoryResultStorageActor }
import ru.maizy.cheesecake.server.resultsstorage.SimpleAggregate
import ru.maizy.cheesecake.server.checker.HttpCheckerActor
import ru.maizy.cheesecake.server.resultsstorage.{ LastResultAggregate, SimpleAggregate }
import ru.maizy.cheesecake.server.checker.{ CheckStatus, HttpCheckerActor }
import ru.maizy.cheesecake.server.service.{ AddEndpoints, Endpoint, HttpEndpoint, Service, ServiceActor }
import ru.maizy.cheesecake.server.service.SymbolicAddress
import ru.maizy.cheesecake.server.utils.ActorUtils.escapeActorName
Expand Down Expand Up @@ -87,9 +87,9 @@ object ServerApp extends App {
)

val allAggregates: Seq[Aggregate] = Seq(
SimpleAggregate(AggregateType.LastFailedTimestamp),
SimpleAggregate(AggregateType.LastSuccessTimestamp),
SimpleAggregate(AggregateType.LastUnavailableTimestamp),
LastResultAggregate(CheckStatus.Unavailable),
LastResultAggregate(CheckStatus.Ok),
LastResultAggregate(CheckStatus.UnableToCheck),
SimpleAggregate(AggregateType.UptimeChecks),
SimpleAggregate(AggregateType.UptimeDuration)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ru.maizy.cheesecake.server.resultsstorage

import ru.maizy.cheesecake.server.checker.CheckStatus
/**
* Copyright (c) Nikita Kovaliov, maizy.ru, 2016
* See LICENSE.txt for details.
Expand All @@ -8,11 +9,15 @@ package ru.maizy.cheesecake.server.resultsstorage
object AggregateType extends Enumeration {
type TypeKey = Value

val UptimeDuration, UptimeChecks, LastSuccessTimestamp, LastFailedTimestamp, LastUnavailableTimestamp = Value
val UptimeDuration, UptimeChecks, LastResult = Value
}

sealed trait Aggregate {
def key: AggregateType.TypeKey
}

case class SimpleAggregate(key: AggregateType.TypeKey) extends Aggregate

case class LastResultAggregate(status: CheckStatus.Type) extends Aggregate {
val key = AggregateType.LastResult
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class InMemoryResultStorageActor extends ResultStorageActor with ActorLogging {
// TODO: make those vars configurable
val checksLimit: Int = 10
val maxFutureShift = 2000 // ms
val maxPastShift = 10000 // ms
val maxPastShift = 60 * 1000 // ms

def uptimeDuration(to: ZonedDateTime = ZonedDateTime.now()): Option[Duration] =
uptimeUntil.map(Duration.between(_, to))
Expand Down Expand Up @@ -110,7 +110,8 @@ class InMemoryResultStorageActor extends ResultStorageActor with ActorLogging {
sender() ! AllServices(results.keys.map(_.service).toSeq)

case GetAggregatedResults(endpointFqns, aggregates) =>
sender() ! AggregatedResults(endpointFqns.map(_ -> Seq.empty).toMap) // FIXME
val res = for (endpoint <- endpointFqns) yield endpoint -> getEndpointAggregates(endpoint, aggregates)
sender() ! AggregatedResults(res.toMap)

case GetEndpointCheckResults(endpointsFqn, limit) =>
val res = endpointsFqn.map { endpointFqn =>
Expand All @@ -124,6 +125,48 @@ class InMemoryResultStorageActor extends ResultStorageActor with ActorLogging {
sender() ! EndpointCheckResults(res)

}

def getEndpointAggregates(endpoint: EndpointFQN, aggregates: Seq[Aggregate]): Map[Aggregate, AggregateResult[Any]] = {
val pairs = for(
aggregate <- aggregates;
res <- getAggregate(endpoint, aggregate) match {
case Left(error) =>
log.warning(s"Unable to get aggregate $aggregate for $endpoint: $error")
None
case Right(result) =>
Some(result)
}
) yield aggregate -> res
pairs.toMap
}

def getAggregate(endpoint: EndpointFQN, aggregate: Aggregate): Either[String, AggregateResult[Any]] = {
val cell = results.get(endpoint)
aggregate match {
case LastResultAggregate(status) =>
Right(OptionalDateTimeResult(
aggregate,
cell.flatMap(_.lastStatus.get(status))
))

case SimpleAggregate(AggregateType.UptimeChecks) =>
Right(IntResult(
aggregate,
cell.map(_.uptimeChecks).getOrElse(0)
))

case SimpleAggregate(AggregateType.UptimeDuration) =>
Right(DurationResult(
aggregate,
cell
.flatMap(_.uptimeUntil)
.map { r => Duration.between(r, ZonedDateTime.now()) }
.getOrElse(Duration.ZERO)
))

case other: Aggregate => Left(s"Unknown aggregate: $other")
}
}
}

object InMemoryResultStorageActor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ case class AllServices(services: Seq[Service])
case class GetAggregatedResults(endpointsFqns: Seq[EndpointFQN], aggregates: Seq[Aggregate])
extends ResultStorageProtocol

case class AggregatedResults(results: Map[EndpointFQN, Seq[AggregateResult[Any]]])
case class AggregatedResults(results: Map[EndpointFQN, Map[Aggregate, AggregateResult[Any]]])


case class GetEndpointCheckResults(endpointsFqns: Seq[EndpointFQN], limit: Int = Int.MaxValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ package ru.maizy.cheesecake.server.tests.resultsstorage
import java.time.temporal.ChronoUnit
import java.time.{ Duration, ZonedDateTime }
import scala.collection.immutable.Queue
import scala.concurrent.duration.DurationInt
import akka.testkit.TestActorRef
import org.scalatest.FlatSpecLike
import ru.maizy.cheesecake.server.checker.{ CheckResult, CheckStatus, HttpCheckResult }
import ru.maizy.cheesecake.server.resultsstorage.{ ClearEndpointCheckResults, AddEndpointCheckResults, AggregateType }
import ru.maizy.cheesecake.server.resultsstorage.{ SimpleAggregate, EndpointCheckResults, GetEndpointCheckResults}
import ru.maizy.cheesecake.server.resultsstorage.{ AggregatedResults, GetAggregatedResults, AllEndpoints }
import ru.maizy.cheesecake.server.resultsstorage.{ GetAllEndpoints, InMemoryResultStorageActor }
import ru.maizy.cheesecake.server.service.{ EndpointFQN, Service, SymbolicAddress, HttpEndpoint }
import ru.maizy.cheesecake.server.resultsstorage.{ ClearEndpointCheckResults, OptionalDateTimeResult }
import ru.maizy.cheesecake.server.resultsstorage.{ AddEndpointCheckResults, AggregateType, SimpleAggregate }
import ru.maizy.cheesecake.server.resultsstorage.{ AggregatedResults, EndpointCheckResults, GetEndpointCheckResults }
import ru.maizy.cheesecake.server.resultsstorage.{ AllEndpoints, GetAggregatedResults, GetAllEndpoints }
import ru.maizy.cheesecake.server.resultsstorage.{ InMemoryResultStorageActor, LastResultAggregate }
import ru.maizy.cheesecake.server.service.{ EndpointFQN, HttpEndpoint, Service, SymbolicAddress }
import ru.maizy.cheesecake.server.tests.ActorSystemBaseSpec


Expand All @@ -34,21 +36,28 @@ class InMemoryResultStorageActorSpec extends ActorSystemBaseSpec with FlatSpecLi

val baseTime = ZonedDateTime.now()

val successfulCheckResults: IndexedSeq[HttpCheckResult] = Range(0, CHECKS_LIMIT + 5)
.map { i =>
HttpCheckResult(
CheckStatus.Ok,
baseTime.plus(Duration.of((i + 1) * 10, ChronoUnit.MILLIS))
)
}
val successfulCheckResults: IndexedSeq[HttpCheckResult] =
Range(0, CHECKS_LIMIT + 5).map(i => checkResult((i + 1) * 10, CheckStatus.Ok))


val lastUnavailable = LastResultAggregate(CheckStatus.Unavailable)
val lastOk = LastResultAggregate(CheckStatus.Ok)
val lastUnableToCheck = LastResultAggregate(CheckStatus.UnableToCheck)
val uptimeChecks = SimpleAggregate(AggregateType.UptimeChecks)
val uptimeDuration = SimpleAggregate(AggregateType.UptimeDuration)
val allAggregates = Seq(
SimpleAggregate(AggregateType.LastFailedTimestamp),
SimpleAggregate(AggregateType.LastSuccessTimestamp),
SimpleAggregate(AggregateType.LastUnavailableTimestamp),
SimpleAggregate(AggregateType.UptimeChecks),
SimpleAggregate(AggregateType.UptimeDuration)
lastUnavailable,
lastOk,
lastUnableToCheck,
uptimeChecks,
uptimeDuration
)

def checkResult(shift: Int, status: CheckStatus.Type): HttpCheckResult =
HttpCheckResult(
status,
baseTime.plus(Duration.of(shift, ChronoUnit.MILLIS))
)
}

trait WithSyncActorAndSampleData extends SampleData {
Expand Down Expand Up @@ -95,12 +104,76 @@ class InMemoryResultStorageActorSpec extends ActorSystemBaseSpec with FlatSpecLi

ref ! GetAggregatedResults(Seq(endpointFqn, otherEndpointFqn), Seq.empty)
expectMsg(AggregatedResults(Map(
endpointFqn -> Seq.empty,
otherEndpointFqn -> Seq.empty
endpointFqn -> Map.empty,
otherEndpointFqn -> Map.empty
)))

ref ! GetAggregatedResults(Seq.empty, allAggregates)
expectMsg(AggregatedResults(Map.empty))

// TODO: an empty result aggregate with an existing endpoint
}
}

it should "returns Last*" in {
new SampleData {
import CheckStatus._
val timeout = 3.seconds // may be increased for a debugger session

val timeShift = Stream.iterate(0)(_ + 1).iterator
for (status <- CheckStatus.values) {
println(s"Check $status")
val ref = system.actorOf(InMemoryResultStorageActor.props())
val aggregate = LastResultAggregate(status)
val resultBuilder = () => checkResult(timeShift.next(), status)
val otherResultsBuilder: () => Seq[HttpCheckResult] = () =>
CheckStatus.values.filterNot(_ == status).map(checkResult(timeShift.next(), _)).toSeq

// add other check status types
ref ! AddEndpointCheckResults(endpointFqn, otherResultsBuilder())

// should not affect the current aggregate
ref ! GetAggregatedResults(Seq(endpointFqn), Seq(aggregate))
expectMsg(timeout, AggregatedResults(Map(
endpointFqn -> Map(
aggregate -> OptionalDateTimeResult(aggregate, None)
)
)))

// add current type check
val result = resultBuilder()
ref ! AddEndpointCheckResults(endpointFqn, Seq(result))

// should returns it as a result
ref ! GetAggregatedResults(Seq(endpointFqn), Seq(aggregate))
expectMsg(timeout, AggregatedResults(Map(
endpointFqn -> Map(
aggregate -> OptionalDateTimeResult(aggregate, Some(result.checkTime))
)
)))

// add other check status types ...
ref ! AddEndpointCheckResults(endpointFqn, otherResultsBuilder())

// should not affect the current aggregate
ref ! GetAggregatedResults(Seq(endpointFqn), Seq(aggregate))
expectMsg(timeout, AggregatedResults(Map(
endpointFqn -> Map(
aggregate -> OptionalDateTimeResult(aggregate, Some(result.checkTime))
)
)))

// an old result should replaced by the new result
val newerResult = resultBuilder()
ref ! AddEndpointCheckResults(endpointFqn, Seq(newerResult))

ref ! GetAggregatedResults(Seq(endpointFqn), Seq(aggregate))
expectMsg(timeout, AggregatedResults(Map(
endpointFqn -> Map(
aggregate -> OptionalDateTimeResult(aggregate, Some(newerResult.checkTime))
)
)))
}
}
}

Expand Down

0 comments on commit de85a5d

Please sign in to comment.