diff --git a/.dockerignore b/.dockerignore index 684f5bf..c42202a 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,5 +1,5 @@ **/* !project/build.properties !project/plugins.sbt -!src/ +!src/main/ !build.sbt \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 016f45b..754f49e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,11 +12,10 @@ LABEL org.opencontainers.image.vendor="neuland – Büro für Informatik GmbH" LABEL org.opencontainers.image.licenses="Apache-2.0" LABEL org.opencontainers.image.title="bandwhichd-server" LABEL org.opencontainers.image.description="bandwhichd server collecting measurements and calculating statistics" -LABEL org.opencontainers.image.version="0.6.0-rc3" +LABEL org.opencontainers.image.version="0.6.0-rc4" USER guest ENTRYPOINT ["/opt/java/openjdk/bin/java"] CMD ["-jar", "/opt/bandwhichd-server.jar"] EXPOSE 8080 -HEALTHCHECK --interval=5s --timeout=1s --start-period=2s --retries=2 \ - CMD wget --spider http://localhost:8080/v1/health || exit 1 -COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.3/bandwhichd-server-assembly-0.6.0-rc3.jar /opt/bandwhichd-server.jar \ No newline at end of file +STOPSIGNAL SIGTERM +COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.3/bandwhichd-server-assembly-0.6.0-rc4.jar /opt/bandwhichd-server.jar \ No newline at end of file diff --git a/build.sbt b/build.sbt index f55addc..97c6134 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ lazy val root = (project in file(".")) .settings( organization := "de.neuland-bfi", name := "bandwhichd-server", - version := "0.6.0-rc3", + version := "0.6.0-rc4", scalaVersion := "3.1.3", Compile / scalaSource := baseDirectory.value / "src" / "main" / "scala", Test / scalaSource := baseDirectory.value / "src" / "test" / "scala", diff --git a/src/main/scala/de/neuland/bandwhichd/server/adapter/in/scheduler/AggregationScheduler.scala b/src/main/scala/de/neuland/bandwhichd/server/adapter/in/scheduler/AggregationScheduler.scala deleted file mode 100644 index 188085a..0000000 --- a/src/main/scala/de/neuland/bandwhichd/server/adapter/in/scheduler/AggregationScheduler.scala +++ /dev/null @@ -1,23 +0,0 @@ -package de.neuland.bandwhichd.server.adapter.in.scheduler - -import cats.Monad -import de.neuland.bandwhichd.server.application.StatsApplicationService -import de.neuland.bandwhichd.server.lib.scheduling.{Schedule, Scheduler, Work} - -import scala.concurrent.duration.{FiniteDuration, SECONDS} -import de.neuland.bandwhichd.server.boot.Configuration -import scala.jdk.DurationConverters.* - -class AggregationScheduler[F[_]: Monad]( - private val configuration: Configuration, - private val statsApplicationService: StatsApplicationService[F] -) extends Scheduler[F] { - override def schedule: F[Schedule[F]] = - Monad[F].pure( - Schedule.Pausing( - getClass.getSimpleName, - configuration.aggregationSchedulerInterval.toScala, - Work(statsApplicationService.recalculate) - ) - ) -} diff --git a/src/main/scala/de/neuland/bandwhichd/server/adapter/in/v1/message/MessageController.scala b/src/main/scala/de/neuland/bandwhichd/server/adapter/in/v1/message/MessageController.scala index 23a6cf8..2ff6ee7 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/adapter/in/v1/message/MessageController.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/adapter/in/v1/message/MessageController.scala @@ -1,29 +1,25 @@ package de.neuland.bandwhichd.server.adapter.in.v1.message -import io.circe.Encoder import cats.data.Validated.{Invalid, Valid} import cats.data.ValidatedNel import cats.effect.{Concurrent, Resource} import cats.implicits.* import de.neuland.bandwhichd.server.adapter.in.v1.message.Message.MeasurementMessage -import de.neuland.bandwhichd.server.application.{ - MeasurementApplicationService, - RecordMeasurementCommand -} +import de.neuland.bandwhichd.server.application.MeasurementApplicationService import de.neuland.bandwhichd.server.boot.Configuration import de.neuland.bandwhichd.server.domain.measurement.Timing import de.neuland.bandwhichd.server.domain.stats.Stats import de.neuland.bandwhichd.server.lib.http4s.Helpers import de.neuland.bandwhichd.server.lib.time.Interval import de.neuland.bandwhichd.server.lib.time.cats.TimeContext -import io.circe.{Decoder, Json} +import io.circe.{Decoder, Encoder, Json} import org.http4s.dsl.{io as _, *} import org.http4s.headers.Allow import org.http4s.implicits.* import org.http4s.{Message as _, *} -import java.time.{Instant, OffsetDateTime} import java.time.format.DateTimeFormatter +import java.time.{Instant, OffsetDateTime} class MessageController[F[_]: Concurrent]( private val configuration: Configuration, @@ -93,9 +89,7 @@ class MessageController[F[_]: Concurrent]( _ <- { message match case Message.MeasurementMessage(measurement) => - measurementApplicationService.recordMeasurement( - RecordMeasurementCommand(measurement) - ) + measurementApplicationService.record(measurement) } response <- Ok("") } yield response @@ -125,5 +119,10 @@ class MessageController[F[_]: Concurrent]( ) ) ) - case (None, None) => Stats.defaultTimeframe(timeContext) + case (None, None) => + for { + now <- timeContext.now + } yield Timing.Timeframe( + Interval(now.minus(Stats.defaultTimeframeDuration), now) + ) } diff --git a/src/main/scala/de/neuland/bandwhichd/server/adapter/out/stats/StatsInMemoryRepository.scala b/src/main/scala/de/neuland/bandwhichd/server/adapter/out/stats/StatsInMemoryRepository.scala index 753851c..26fee7f 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/adapter/out/stats/StatsInMemoryRepository.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/adapter/out/stats/StatsInMemoryRepository.scala @@ -19,4 +19,9 @@ class StatsInMemoryRepository[F[_]: Sync] extends StatsRepository[F] { Sync[F].blocking { statsStore.get() } + + override def update(f: MonitoredStats => MonitoredStats): F[MonitoredStats] = + Sync[F].blocking { + statsStore.updateAndGet(stats => f(stats)) + } } diff --git a/src/main/scala/de/neuland/bandwhichd/server/application/MeasurementApplicationService.scala b/src/main/scala/de/neuland/bandwhichd/server/application/MeasurementApplicationService.scala index a00104f..e629895 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/application/MeasurementApplicationService.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/application/MeasurementApplicationService.scala @@ -3,21 +3,29 @@ package de.neuland.bandwhichd.server.application import cats.Monad import cats.effect.Sync import cats.implicits.* -import de.neuland.bandwhichd.server.domain.measurement.{ - Measurement, - MeasurementsRepository, - Timing -} +import de.neuland.bandwhichd.server.domain.measurement.* +import de.neuland.bandwhichd.server.domain.stats.* +import de.neuland.bandwhichd.server.lib.time.cats.TimeContext import fs2.Stream class MeasurementApplicationService[F[_]: Sync]( - private val measurementsRepository: MeasurementsRepository[F] + private val measurementsRepository: MeasurementsRepository[F], + private val statsRepository: StatsRepository[F], + private val timeContext: TimeContext[F] ) { def get(timeframe: Timing.Timeframe): Stream[F, Measurement[Timing]] = measurementsRepository.get(timeframe) - def recordMeasurement( - recordMeasurementCommand: RecordMeasurementCommand - ): F[Unit] = - measurementsRepository.record(recordMeasurementCommand.measurement) + def record(measurement: Measurement[Timing]): F[Unit] = + for { + _ <- measurementsRepository.record(measurement) + now <- timeContext.now + _ <- statsRepository.update { stats => + stats + .append(measurement) + .dropBefore( + Timing.Timestamp(now.minus(Stats.defaultTimeframeDuration)) + ) + } + } yield () } diff --git a/src/main/scala/de/neuland/bandwhichd/server/application/RecordMeasurementCommand.scala b/src/main/scala/de/neuland/bandwhichd/server/application/RecordMeasurementCommand.scala deleted file mode 100644 index bfae446..0000000 --- a/src/main/scala/de/neuland/bandwhichd/server/application/RecordMeasurementCommand.scala +++ /dev/null @@ -1,5 +0,0 @@ -package de.neuland.bandwhichd.server.application - -import de.neuland.bandwhichd.server.domain.measurement.{Measurement, Timing} - -case class RecordMeasurementCommand(measurement: Measurement[Timing]) diff --git a/src/main/scala/de/neuland/bandwhichd/server/application/StatsApplicationService.scala b/src/main/scala/de/neuland/bandwhichd/server/application/StatsApplicationService.scala index d8fbbfc..6ee49d5 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/application/StatsApplicationService.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/application/StatsApplicationService.scala @@ -7,17 +7,8 @@ import de.neuland.bandwhichd.server.domain.stats.* import de.neuland.bandwhichd.server.lib.time.cats.TimeContext class StatsApplicationService[F[_]: Concurrent]( - private val timeContext: TimeContext[F], - private val measurementsRepository: MeasurementsRepository[F], private val statsRepository: StatsRepository[F] ) { def get: F[MonitoredStats] = statsRepository.get - - def recalculate: F[Unit] = - for { - defaultTimeframe <- Stats.defaultTimeframe(timeContext) - stats <- Stats(measurementsRepository.get(defaultTimeframe)) - _ <- statsRepository.safe(stats) - } yield () } diff --git a/src/main/scala/de/neuland/bandwhichd/server/boot/App.scala b/src/main/scala/de/neuland/bandwhichd/server/boot/App.scala index fa9028c..e275544 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/boot/App.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/boot/App.scala @@ -3,7 +3,6 @@ package de.neuland.bandwhichd.server.boot import cats.effect.* import cats.effect.kernel.Outcome import cats.implicits.* -import de.neuland.bandwhichd.server.adapter.in.scheduler.AggregationScheduler import de.neuland.bandwhichd.server.adapter.in.v1.health.HealthController import de.neuland.bandwhichd.server.adapter.in.v1.message.MessageController import de.neuland.bandwhichd.server.adapter.in.v1.stats.StatsController @@ -17,11 +16,6 @@ import de.neuland.bandwhichd.server.application.{ import de.neuland.bandwhichd.server.domain.measurement.MeasurementsRepository import de.neuland.bandwhichd.server.domain.stats.StatsRepository import de.neuland.bandwhichd.server.lib.cassandra.CassandraContext -import de.neuland.bandwhichd.server.lib.scheduling.{ - Operator, - Scheduler, - SchedulersOperator -} import de.neuland.bandwhichd.server.lib.time.cats.TimeContext import org.http4s.dsl.io.* import org.http4s.ember.server.EmberServerBuilder @@ -53,29 +47,16 @@ open class App[F[_]: Async]( // application lazy val measurementApplicationService: MeasurementApplicationService[F] = MeasurementApplicationService[F]( - measurementsRepository = measurementsRepository + measurementsRepository = measurementsRepository, + statsRepository = statsRepository, + timeContext = timeContext ) lazy val statsApplicationService: StatsApplicationService[F] = StatsApplicationService[F]( - timeContext = timeContext, - measurementsRepository = measurementsRepository, statsRepository = statsRepository ) - // in scheduling - lazy val aggregationScheduler: Scheduler[F] = - AggregationScheduler[F]( - configuration = configuration, - statsApplicationService = statsApplicationService - ) - - // scheduling - lazy val schedulersOperator: SchedulersOperator[F] = - SchedulersOperator[F]( - aggregationScheduler - ) - // in http lazy val healthController: HealthController[F] = HealthController[F]() @@ -102,9 +83,9 @@ open class App[F[_]: Async]( routes.routes.orNotFound } -object App extends IOApp { - override def run(args: List[String]): IO[ExitCode] = { - val outcomeR = for { +object App extends IOApp.Simple { + override def run: IO[Unit] = + (for { configuration <- Configuration.resource[IO] cassandraContext <- CassandraContext.resource[IO](configuration) _ <- Resource.eval( @@ -115,35 +96,19 @@ object App extends IOApp { cassandraContext, configuration ) - schedulerOutcomeF <- main.schedulersOperator.resource server <- EmberServerBuilder .default[IO] .withHostOption(None) .withHttpApp(main.httpApp) .build - logger <- Resource.eval(Slf4jLogger.create[IO]) - _ <- Resource.eval( - logger.info( - s"bandwhichd-server startup complete - ${main.schedulersOperator.size} scheduler - listening on ${server.address}" - ) - ) - lineF <- Resource.eval(IO.delay { - for { - line <- IO.interruptible { - StdIn.readLine() - } - _ <- if (line == null) IO.never else IO.unit - } yield () - }) - } yield schedulerOutcomeF.race(lineF) - - outcomeR.use { outcomeF => + } yield server).use { server => for { - outcome <- outcomeF - } yield outcome match - case Right(_) => ExitCode.Success - case Left(Outcome.Succeeded(_)) => ExitCode.Success - case _ => ExitCode.Error + logger <- Slf4jLogger.create[IO] + _ <- logger.info( + s"bandwhichd-server startup complete - listening on ${server.address}" + ) + line <- IO.interruptible { StdIn.readLine() } + _ <- if (line == null) IO.never else IO.unit + } yield () } - } } diff --git a/src/main/scala/de/neuland/bandwhichd/server/domain/stats/Stats.scala b/src/main/scala/de/neuland/bandwhichd/server/domain/stats/Stats.scala index 2868206..0dc022d 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/domain/stats/Stats.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/domain/stats/Stats.scala @@ -1,8 +1,6 @@ package de.neuland.bandwhichd.server.domain.stats -import cats.Monad -import cats.effect.kernel.Concurrent -import cats.implicits.* +import cats.effect.Concurrent import com.comcast.ip4s.* import de.neuland.bandwhichd.server.domain.* import de.neuland.bandwhichd.server.domain.measurement.{Measurement, Timing} @@ -51,19 +49,11 @@ class Stats[L <: HostId, H <: AnyHost[L], R <: HostId] private ( ) } -type AnyStats = Stats[HostId, AnyHost[HostId], HostId] type MonitoredStats = Stats[HostId.MachineId, MonitoredHost, HostId] object Stats { val defaultTimeframeDuration: Duration = Duration.ofHours(2) - def defaultTimeframe[F[_]: Monad]( - timeContext: TimeContext[F] - ): F[Timing.Timeframe] = - for { - now <- timeContext.now - } yield Timing.Timeframe(Interval(now.minus(defaultTimeframeDuration), now)) - val empty: MonitoredStats = new Stats(Map.empty) def apply[F[_]: Concurrent]( diff --git a/src/main/scala/de/neuland/bandwhichd/server/domain/stats/StatsRepository.scala b/src/main/scala/de/neuland/bandwhichd/server/domain/stats/StatsRepository.scala index 23a1c67..8207a9d 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/domain/stats/StatsRepository.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/domain/stats/StatsRepository.scala @@ -4,4 +4,6 @@ trait StatsRepository[F[_]] { def safe(stats: MonitoredStats): F[Unit] def get: F[MonitoredStats] + + def update(f: MonitoredStats => MonitoredStats): F[MonitoredStats] } diff --git a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Operator.scala b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Operator.scala deleted file mode 100644 index dee1f67..0000000 --- a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Operator.scala +++ /dev/null @@ -1,10 +0,0 @@ -package de.neuland.bandwhichd.server.lib.scheduling - -import cats.effect.{Async, Outcome, Resource} -import cats.implicits.* - -import java.util.concurrent.atomic.AtomicBoolean - -trait Operator[F[_]] { - def resource: Resource[F, F[Outcome[F, Throwable, Unit]]] -} diff --git a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Schedule.scala b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Schedule.scala deleted file mode 100644 index 0ca60cc..0000000 --- a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Schedule.scala +++ /dev/null @@ -1,15 +0,0 @@ -package de.neuland.bandwhichd.server.lib.scheduling - -import scala.concurrent.duration.FiniteDuration - -sealed trait Schedule[F[_]] { - def work: Work[F] -} - -object Schedule { - case class Pausing[F[_]]( - name: String, - pauseDuration: FiniteDuration, - work: Work[F] - ) extends Schedule[F] -} diff --git a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Scheduler.scala b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Scheduler.scala deleted file mode 100644 index a995c03..0000000 --- a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Scheduler.scala +++ /dev/null @@ -1,5 +0,0 @@ -package de.neuland.bandwhichd.server.lib.scheduling - -trait Scheduler[F[_]] { - def schedule: F[Schedule[F]] -} diff --git a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulerOperator.scala b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulerOperator.scala deleted file mode 100644 index 431a201..0000000 --- a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulerOperator.scala +++ /dev/null @@ -1,33 +0,0 @@ -package de.neuland.bandwhichd.server.lib.scheduling - -import cats.effect.* -import cats.implicits.* -import org.typelevel.log4cats.slf4j.Slf4jLogger - -import java.util.concurrent.atomic.AtomicBoolean -import scala.util.{Failure, Success, Try} - -class SchedulerOperator[F[_]: Async]( - private val scheduler: Scheduler[F] -) extends Operator[F] { - override def resource: Resource[F, F[Outcome[F, Throwable, Unit]]] = - for { - logger <- Resource.eval(Slf4jLogger.create[F]) - schedule <- Resource.eval(Async[F].defer(scheduler.schedule)) - outcome: F[Outcome[F, Throwable, Unit]] <- Async[F].background { - schedule match - case Schedule.Pausing(name, pauseDuration, work) => - def cycle: F[Unit] = - for { - _ <- logger.debug(s"Running $name") - _ <- Async[F].onError(work.run) { case e => - logger.error(e)(s"Scheduler $name failed") - } - _ <- logger.debug(s"Pausing $name for $pauseDuration") - _ <- Async[F].sleep(pauseDuration) - _ <- cycle - } yield () - cycle - } - } yield outcome -} diff --git a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulersOperator.scala b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulersOperator.scala deleted file mode 100644 index 0c1dda5..0000000 --- a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/SchedulersOperator.scala +++ /dev/null @@ -1,42 +0,0 @@ -package de.neuland.bandwhichd.server.lib.scheduling - -import cats.effect.{Async, Outcome, Resource} -import cats.implicits.* - -class SchedulersOperator[F[_]: Async](private val schedulers: Scheduler[F]*) - extends Operator[F] { - def size: Int = schedulers.size - - override def resource: Resource[F, F[Outcome[F, Throwable, Unit]]] = - schedulers - .map(scheduler => SchedulerOperator(scheduler)) - .map(schedulerOperator => schedulerOperator.resource) - .foldLeft[Resource[F, F[Outcome[F, Throwable, Unit]]]]( - Resource.make( - Async[F].pure( - Async[F].pure( - Outcome.succeeded( - Async[F].pure(()) - ) - ) - ) - )(_ => Async[F].pure(())) - ) { case (accR, curR) => - for { - accF <- accR - curF <- curR - } yield { - for { - accAndCur: ( - Outcome[F, Throwable, Unit], - Outcome[F, Throwable, Unit] - ) <- Async[F].both(accF, curF) - } yield { - if (accAndCur._1.isError) - accAndCur._1 - else - accAndCur._2 - } - } - } -} diff --git a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Work.scala b/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Work.scala deleted file mode 100644 index b6c6ebd..0000000 --- a/src/main/scala/de/neuland/bandwhichd/server/lib/scheduling/Work.scala +++ /dev/null @@ -1,5 +0,0 @@ -package de.neuland.bandwhichd.server.lib.scheduling - -class Work[F[_]](f: => F[Unit]) { - def run: F[Unit] = f -} diff --git a/src/test/scala/de/neuland/bandwhichd/server/BandwhichdServerApiV1Spec.scala b/src/test/scala/de/neuland/bandwhichd/server/BandwhichdServerApiV1Spec.scala index ea9979f..74893fa 100644 --- a/src/test/scala/de/neuland/bandwhichd/server/BandwhichdServerApiV1Spec.scala +++ b/src/test/scala/de/neuland/bandwhichd/server/BandwhichdServerApiV1Spec.scala @@ -129,7 +129,7 @@ class BandwhichdServerApiV1Spec ) val httpApp = inMemoryApp[IO]( - unimplementedTimeContext, + fixedTimeContext(MeasurementFixtures.fullTimeframe.end.instant), configuration ).httpApp @@ -277,12 +277,10 @@ class BandwhichdServerApiV1Spec val httpApp = app.httpApp for { - _ <- app.measurementsRepository + _ <- app.measurementApplicationService .record(MeasurementFixtures.exampleNetworkConfigurationMeasurement) - _ <- app.measurementsRepository + _ <- app.measurementApplicationService .record(MeasurementFixtures.exampleNetworkUtilizationMeasurement) - aggregationSchedule <- app.aggregationScheduler.schedule - _ <- aggregationSchedule.work.run // when result <- httpApp.run(request) @@ -331,14 +329,12 @@ class BandwhichdServerApiV1Spec val httpApp = app.httpApp for { - _ <- app.measurementsRepository.record( + _ <- app.measurementApplicationService.record( MeasurementFixtures.exampleNetworkConfigurationMeasurement ) - _ <- app.measurementsRepository.record( + _ <- app.measurementApplicationService.record( MeasurementFixtures.exampleNetworkUtilizationMeasurement ) - aggregationSchedule <- app.aggregationScheduler.schedule - _ <- aggregationSchedule.work.run // when result <- httpApp.run(request) diff --git a/src/test/scala/de/neuland/bandwhichd/server/application/MeasurementApplicationServiceSpec.scala b/src/test/scala/de/neuland/bandwhichd/server/application/MeasurementApplicationServiceSpec.scala new file mode 100644 index 0000000..22fc5dc --- /dev/null +++ b/src/test/scala/de/neuland/bandwhichd/server/application/MeasurementApplicationServiceSpec.scala @@ -0,0 +1,74 @@ +package de.neuland.bandwhichd.server.application + +import cats.effect.IO +import cats.effect.testing.scalatest.AsyncIOSpec +import com.comcast.ip4s.* +import de.neuland.bandwhichd.server.adapter.out.measurement.MeasurementsInMemoryRepository +import de.neuland.bandwhichd.server.adapter.out.stats.StatsInMemoryRepository +import de.neuland.bandwhichd.server.domain.* +import de.neuland.bandwhichd.server.domain.measurement.* +import de.neuland.bandwhichd.server.domain.stats.* +import de.neuland.bandwhichd.server.lib.time.cats.TimeContextMocks +import de.neuland.bandwhichd.server.lib.time.cats.TimeContextMocks.fixedTimeContext +import de.neuland.bandwhichd.server.test.Arbitraries.{sample, given} +import org.scalacheck.Gen +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.{AnyWordSpec, AsyncWordSpec} + +class MeasurementApplicationServiceSpec + extends AsyncWordSpec + with AsyncIOSpec + with Matchers { + + "MeasurementApplicationService" should { + "drop outdated and not updated data when recording messages" in { + val nc1 = ncGen().copy( + hostname = host"host1" + ) + val nc2 = ncGen().copy( + timing = Timing.Timestamp( + nc1.timing.instant + .plus(Stats.defaultTimeframeDuration) + ), + hostname = host"host2" + ) + val nc3 = ncGen().copy( + timing = Timing.Timestamp( + nc1.timing.instant + .plus(Stats.defaultTimeframeDuration.multipliedBy(2)) + ), + hostname = host"host3" + ) + + val now = + nc2.timing.instant.plus(Stats.defaultTimeframeDuration.dividedBy(2)) + + val statsRepository = new StatsInMemoryRepository[IO]() + + val service = new MeasurementApplicationService[IO]( + measurementsRepository = new MeasurementsInMemoryRepository(), + statsRepository = statsRepository, + timeContext = fixedTimeContext(now) + ) + + for { + // given + _ <- statsRepository.update(_.append(nc1)) + _ <- statsRepository.update(_.append(nc2)) + + // when + _ <- service.record(nc3) + result <- statsRepository.get + + // then + } yield { + result.hosts.map(_.hostname) should contain theSameElementsAs Set( + host"host2", + host"host3" + ) + } + } + } + + private def ncGen = sample[Measurement.NetworkConfiguration] +} diff --git a/src/test/scala/de/neuland/bandwhichd/server/domain/stats/StatsSpec.scala b/src/test/scala/de/neuland/bandwhichd/server/domain/stats/StatsSpec.scala index fe082ab..679a920 100644 --- a/src/test/scala/de/neuland/bandwhichd/server/domain/stats/StatsSpec.scala +++ b/src/test/scala/de/neuland/bandwhichd/server/domain/stats/StatsSpec.scala @@ -4,18 +4,18 @@ import cats.effect.IO import cats.effect.testing.scalatest.AsyncIOSpec import cats.implicits.* import com.comcast.ip4s.* +import com.comcast.ip4s.Arbitraries.given import de.neuland.bandwhichd.server.domain.* -import de.neuland.bandwhichd.server.domain.stats.* import de.neuland.bandwhichd.server.domain.measurement.* -import de.neuland.bandwhichd.server.test.Arbitraries.given +import de.neuland.bandwhichd.server.domain.stats.* +import de.neuland.bandwhichd.server.lib.time.Interval +import de.neuland.bandwhichd.server.test.Arbitraries.{sample, given} import fs2.Stream +import org.scalacheck.Gen +import org.scalatest.Assertion import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.{AnyWordSpec, AsyncWordSpec} import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks -import com.comcast.ip4s.Arbitraries.given -import de.neuland.bandwhichd.server.lib.time.Interval -import org.scalacheck.Gen -import org.scalatest.Assertion import java.time.{Duration, Instant, ZoneOffset, ZonedDateTime} import java.util.UUID @@ -357,8 +357,8 @@ class StatsSpec "dropping" should { "not keep host without update after drop" in { // given - val ncTemplate = ncGen.sample.get - val nuTemplate = nuGen.sample.get + val ncTemplate = ncGen() + val nuTemplate = nuGen() val baseTiming = ncTemplate.timestamp.instant @@ -385,7 +385,7 @@ class StatsSpec "keep host with configuration update after drop" in { // given - val ncTemplate = ncGen.sample.get + val ncTemplate = ncGen() val baseTiming = ncTemplate.timestamp.instant @@ -410,8 +410,8 @@ class StatsSpec "keep host with utilization update timeframe ending after drop" in { // given - val ncTemplate = ncGen.sample.get - val nuTemplate = nuGen.sample.get.copy(agentId = ncTemplate.agentId) + val ncTemplate = ncGen() + val nuTemplate = nuGen().copy(agentId = ncTemplate.agentId) val baseTiming = ncTemplate.timestamp.instant @@ -442,15 +442,15 @@ class StatsSpec val host2 = host"host2" val host3 = host"host3" - val ncTemplate = ncGen.sample.get - val nuTemplate = nuGen.sample.get.copy(agentId = ncTemplate.agentId) - val con1 = conGen.sample.get.copy( + val ncTemplate = ncGen() + val nuTemplate = nuGen().copy(agentId = ncTemplate.agentId) + val con1 = conGen().copy( remoteSocket = Remote(SocketAddress(host1, port"8080")) ) - val con2 = conGen.sample.get.copy( + val con2 = conGen().copy( remoteSocket = Remote(SocketAddress(host2, port"8080")) ) - val con3 = conGen.sample.get.copy( + val con3 = conGen().copy( remoteSocket = Remote(SocketAddress(host3, port"8080")) ) @@ -489,9 +489,9 @@ class StatsSpec } } - private def ncGen = summon[Gen[Measurement.NetworkConfiguration]] - private def nuGen = summon[Gen[Measurement.NetworkUtilization]] - private def conGen = summon[Gen[Connection]] + private def ncGen = sample[Measurement.NetworkConfiguration] + private def nuGen = sample[Measurement.NetworkUtilization] + private def conGen = sample[Connection] private def buildStats(measurements: Measurement[Timing]*): MonitoredStats = measurements.foldLeft(Stats.empty) { case (stats, measurement) => diff --git a/src/test/scala/de/neuland/bandwhichd/server/test/Arbitraries.scala b/src/test/scala/de/neuland/bandwhichd/server/test/Arbitraries.scala index 0c6e262..d8ead1e 100644 --- a/src/test/scala/de/neuland/bandwhichd/server/test/Arbitraries.scala +++ b/src/test/scala/de/neuland/bandwhichd/server/test/Arbitraries.scala @@ -12,6 +12,11 @@ object Arbitraries { given genArbitrary[A](using Gen[A]): Arbitrary[A] = Arbitrary[A].apply(summon[Gen[A]]) + def sample[A](using Gen[A]): () => A = { + val provider = LazyList.continually(summon[Gen[A]].sample).take(100).flatten + () => provider.head + } + /////////////////////// given Gen[Instant] = Gen.calendar.map(_.toInstant)