Skip to content
This repository has been archived by the owner on Mar 27, 2023. It is now read-only.

Commit

Permalink
Refactor from scheduled stats collection towards ongoing calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
Michel Zimmer committed Jul 21, 2022
1 parent 4506161 commit c59c1b0
Show file tree
Hide file tree
Showing 22 changed files with 158 additions and 262 deletions.
2 changes: 1 addition & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
**/*
!project/build.properties
!project/plugins.sbt
!src/
!src/main/
!build.sbt
7 changes: 3 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ LABEL org.opencontainers.image.vendor="neuland – Büro für Informatik GmbH"
LABEL org.opencontainers.image.licenses="Apache-2.0"
LABEL org.opencontainers.image.title="bandwhichd-server"
LABEL org.opencontainers.image.description="bandwhichd server collecting measurements and calculating statistics"
LABEL org.opencontainers.image.version="0.6.0-rc3"
LABEL org.opencontainers.image.version="0.6.0-rc4"
USER guest
ENTRYPOINT ["/opt/java/openjdk/bin/java"]
CMD ["-jar", "/opt/bandwhichd-server.jar"]
EXPOSE 8080
HEALTHCHECK --interval=5s --timeout=1s --start-period=2s --retries=2 \
CMD wget --spider http://localhost:8080/v1/health || exit 1
COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.3/bandwhichd-server-assembly-0.6.0-rc3.jar /opt/bandwhichd-server.jar
STOPSIGNAL SIGTERM
COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.3/bandwhichd-server-assembly-0.6.0-rc4.jar /opt/bandwhichd-server.jar
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ lazy val root = (project in file("."))
.settings(
organization := "de.neuland-bfi",
name := "bandwhichd-server",
version := "0.6.0-rc3",
version := "0.6.0-rc4",
scalaVersion := "3.1.3",
Compile / scalaSource := baseDirectory.value / "src" / "main" / "scala",
Test / scalaSource := baseDirectory.value / "src" / "test" / "scala",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,29 +1,25 @@
package de.neuland.bandwhichd.server.adapter.in.v1.message

import io.circe.Encoder
import cats.data.Validated.{Invalid, Valid}
import cats.data.ValidatedNel
import cats.effect.{Concurrent, Resource}
import cats.implicits.*
import de.neuland.bandwhichd.server.adapter.in.v1.message.Message.MeasurementMessage
import de.neuland.bandwhichd.server.application.{
MeasurementApplicationService,
RecordMeasurementCommand
}
import de.neuland.bandwhichd.server.application.MeasurementApplicationService
import de.neuland.bandwhichd.server.boot.Configuration
import de.neuland.bandwhichd.server.domain.measurement.Timing
import de.neuland.bandwhichd.server.domain.stats.Stats
import de.neuland.bandwhichd.server.lib.http4s.Helpers
import de.neuland.bandwhichd.server.lib.time.Interval
import de.neuland.bandwhichd.server.lib.time.cats.TimeContext
import io.circe.{Decoder, Json}
import io.circe.{Decoder, Encoder, Json}
import org.http4s.dsl.{io as _, *}
import org.http4s.headers.Allow
import org.http4s.implicits.*
import org.http4s.{Message as _, *}

import java.time.{Instant, OffsetDateTime}
import java.time.format.DateTimeFormatter
import java.time.{Instant, OffsetDateTime}

class MessageController[F[_]: Concurrent](
private val configuration: Configuration,
Expand Down Expand Up @@ -93,9 +89,7 @@ class MessageController[F[_]: Concurrent](
_ <- {
message match
case Message.MeasurementMessage(measurement) =>
measurementApplicationService.recordMeasurement(
RecordMeasurementCommand(measurement)
)
measurementApplicationService.record(measurement)
}
response <- Ok("")
} yield response
Expand Down Expand Up @@ -125,5 +119,10 @@ class MessageController[F[_]: Concurrent](
)
)
)
case (None, None) => Stats.defaultTimeframe(timeContext)
case (None, None) =>
for {
now <- timeContext.now
} yield Timing.Timeframe(
Interval(now.minus(Stats.defaultTimeframeDuration), now)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ class StatsInMemoryRepository[F[_]: Sync] extends StatsRepository[F] {
Sync[F].blocking {
statsStore.get()
}

override def update(f: MonitoredStats => MonitoredStats): F[MonitoredStats] =
Sync[F].blocking {
statsStore.updateAndGet(stats => f(stats))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@ package de.neuland.bandwhichd.server.application
import cats.Monad
import cats.effect.Sync
import cats.implicits.*
import de.neuland.bandwhichd.server.domain.measurement.{
Measurement,
MeasurementsRepository,
Timing
}
import de.neuland.bandwhichd.server.domain.measurement.*
import de.neuland.bandwhichd.server.domain.stats.*
import de.neuland.bandwhichd.server.lib.time.cats.TimeContext
import fs2.Stream

class MeasurementApplicationService[F[_]: Sync](
private val measurementsRepository: MeasurementsRepository[F]
private val measurementsRepository: MeasurementsRepository[F],
private val statsRepository: StatsRepository[F],
private val timeContext: TimeContext[F]
) {
def get(timeframe: Timing.Timeframe): Stream[F, Measurement[Timing]] =
measurementsRepository.get(timeframe)

def recordMeasurement(
recordMeasurementCommand: RecordMeasurementCommand
): F[Unit] =
measurementsRepository.record(recordMeasurementCommand.measurement)
def record(measurement: Measurement[Timing]): F[Unit] =
for {
_ <- measurementsRepository.record(measurement)
now <- timeContext.now
_ <- statsRepository.update { stats =>
stats
.append(measurement)
.dropBefore(
Timing.Timestamp(now.minus(Stats.defaultTimeframeDuration))
)
}
} yield ()
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,8 @@ import de.neuland.bandwhichd.server.domain.stats.*
import de.neuland.bandwhichd.server.lib.time.cats.TimeContext

class StatsApplicationService[F[_]: Concurrent](
private val timeContext: TimeContext[F],
private val measurementsRepository: MeasurementsRepository[F],
private val statsRepository: StatsRepository[F]
) {
def get: F[MonitoredStats] =
statsRepository.get

def recalculate: F[Unit] =
for {
defaultTimeframe <- Stats.defaultTimeframe(timeContext)
stats <- Stats(measurementsRepository.get(defaultTimeframe))
_ <- statsRepository.safe(stats)
} yield ()
}
63 changes: 14 additions & 49 deletions src/main/scala/de/neuland/bandwhichd/server/boot/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package de.neuland.bandwhichd.server.boot
import cats.effect.*
import cats.effect.kernel.Outcome
import cats.implicits.*
import de.neuland.bandwhichd.server.adapter.in.scheduler.AggregationScheduler
import de.neuland.bandwhichd.server.adapter.in.v1.health.HealthController
import de.neuland.bandwhichd.server.adapter.in.v1.message.MessageController
import de.neuland.bandwhichd.server.adapter.in.v1.stats.StatsController
Expand All @@ -17,11 +16,6 @@ import de.neuland.bandwhichd.server.application.{
import de.neuland.bandwhichd.server.domain.measurement.MeasurementsRepository
import de.neuland.bandwhichd.server.domain.stats.StatsRepository
import de.neuland.bandwhichd.server.lib.cassandra.CassandraContext
import de.neuland.bandwhichd.server.lib.scheduling.{
Operator,
Scheduler,
SchedulersOperator
}
import de.neuland.bandwhichd.server.lib.time.cats.TimeContext
import org.http4s.dsl.io.*
import org.http4s.ember.server.EmberServerBuilder
Expand Down Expand Up @@ -53,29 +47,16 @@ open class App[F[_]: Async](
// application
lazy val measurementApplicationService: MeasurementApplicationService[F] =
MeasurementApplicationService[F](
measurementsRepository = measurementsRepository
measurementsRepository = measurementsRepository,
statsRepository = statsRepository,
timeContext = timeContext
)

lazy val statsApplicationService: StatsApplicationService[F] =
StatsApplicationService[F](
timeContext = timeContext,
measurementsRepository = measurementsRepository,
statsRepository = statsRepository
)

// in scheduling
lazy val aggregationScheduler: Scheduler[F] =
AggregationScheduler[F](
configuration = configuration,
statsApplicationService = statsApplicationService
)

// scheduling
lazy val schedulersOperator: SchedulersOperator[F] =
SchedulersOperator[F](
aggregationScheduler
)

// in http
lazy val healthController: HealthController[F] =
HealthController[F]()
Expand All @@ -102,9 +83,9 @@ open class App[F[_]: Async](
routes.routes.orNotFound
}

object App extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val outcomeR = for {
object App extends IOApp.Simple {
override def run: IO[Unit] =
(for {
configuration <- Configuration.resource[IO]
cassandraContext <- CassandraContext.resource[IO](configuration)
_ <- Resource.eval(
Expand All @@ -115,35 +96,19 @@ object App extends IOApp {
cassandraContext,
configuration
)
schedulerOutcomeF <- main.schedulersOperator.resource
server <- EmberServerBuilder
.default[IO]
.withHostOption(None)
.withHttpApp(main.httpApp)
.build
logger <- Resource.eval(Slf4jLogger.create[IO])
_ <- Resource.eval(
logger.info(
s"bandwhichd-server startup complete - ${main.schedulersOperator.size} scheduler - listening on ${server.address}"
)
)
lineF <- Resource.eval(IO.delay {
for {
line <- IO.interruptible {
StdIn.readLine()
}
_ <- if (line == null) IO.never else IO.unit
} yield ()
})
} yield schedulerOutcomeF.race(lineF)

outcomeR.use { outcomeF =>
} yield server).use { server =>
for {
outcome <- outcomeF
} yield outcome match
case Right(_) => ExitCode.Success
case Left(Outcome.Succeeded(_)) => ExitCode.Success
case _ => ExitCode.Error
logger <- Slf4jLogger.create[IO]
_ <- logger.info(
s"bandwhichd-server startup complete - listening on ${server.address}"
)
line <- IO.interruptible { StdIn.readLine() }
_ <- if (line == null) IO.never else IO.unit
} yield ()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package de.neuland.bandwhichd.server.domain.stats

import cats.Monad
import cats.effect.kernel.Concurrent
import cats.implicits.*
import cats.effect.Concurrent
import com.comcast.ip4s.*
import de.neuland.bandwhichd.server.domain.*
import de.neuland.bandwhichd.server.domain.measurement.{Measurement, Timing}
Expand Down Expand Up @@ -51,19 +49,11 @@ class Stats[L <: HostId, H <: AnyHost[L], R <: HostId] private (
)
}

type AnyStats = Stats[HostId, AnyHost[HostId], HostId]
type MonitoredStats = Stats[HostId.MachineId, MonitoredHost, HostId]

object Stats {
val defaultTimeframeDuration: Duration = Duration.ofHours(2)

def defaultTimeframe[F[_]: Monad](
timeContext: TimeContext[F]
): F[Timing.Timeframe] =
for {
now <- timeContext.now
} yield Timing.Timeframe(Interval(now.minus(defaultTimeframeDuration), now))

val empty: MonitoredStats = new Stats(Map.empty)

def apply[F[_]: Concurrent](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ trait StatsRepository[F[_]] {
def safe(stats: MonitoredStats): F[Unit]

def get: F[MonitoredStats]

def update(f: MonitoredStats => MonitoredStats): F[MonitoredStats]
}

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit c59c1b0

Please sign in to comment.