Skip to content

Commit

Permalink
Add health status reporter to the Source interface (close #40)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Dec 5, 2023
1 parent 43f351b commit acdc70b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object PubsubSource {
def checkpointer: PubSubCheckpointer[F] = pubsubCheckpointer

def stream: Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] =
Stream.emit(pubsubStream(config))
pubsubStream(config)
}

private def pubsubCheckpointer[F[_]: Async]: PubSubCheckpointer[F] = new PubSubCheckpointer[F] {
Expand Down Expand Up @@ -114,12 +114,11 @@ object PubsubSource {
}
}

private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]] =
private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, Stream[F, LowLevelEvents[Chunk[AckReplyConsumer]]]] =
for {
control <- Stream.eval(Control.build(config))
_ <- Stream.resource(runSubscriber(config, control))
events <- consumeFromQueue(config, control)
} yield events
} yield consumeFromQueue(config, control)

private def consumeFromQueue[F[_]: Sync](
config: PubsubSourceConfig,
Expand Down Expand Up @@ -212,7 +211,7 @@ object PubsubSource {
_ <- Sync[F].delay(apiService.stopAsync())
fiber <- drainQueue(control).start
_ <- Logger[F].info("Waiting for the PubSub Subscriber to finish cleanly...")
_ <- Sync[F].blocking(apiService.awaitTerminated(config.shutdownTimeout.toMillis, TimeUnit.MILLISECONDS))
_ <- Sync[F].blocking(apiService.awaitTerminated())
_ <- Sync[F].delay(control.phaser.forceTermination())
_ <- fiber.join
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/
package com.snowplowanalytics.snowplow.sources

import cats.Show
import cats.implicits._
import fs2.Stream
import scala.concurrent.duration.FiniteDuration

Expand All @@ -33,14 +35,46 @@ trait SourceAndAck[F[_]] {
def stream(config: EventProcessingConfig, processor: EventProcessor[F]): Stream[F, Nothing]

/**
* Measurement of how long the EventProcessor has spent processing any pending un-acked events.
* Reports on whether the source of events is healthy
*
* Note, unlike our statsd metrics, this measurement does not consider min/max values over a
* period of time. It is a snapshot measurement for a single point in time.
* @param maxAllowedProcessingLatency
* A maximum allowed value for how long the `EventProcessor` may spend processing any pending
* un-acked events. If this cutoff is exceeded then `isHealthy` returns an unhealthy status.
*
* This measurement is designed to be used as a health probe. If events are getting processed
* quickly then latency is low and the probe should report healthy. If any event is "stuck" then
* latency is high and the probe should report unhealthy.
* Note, unlike our statsd metrics, this latency measurement does not consider min/max values over
* a period of time. It is a snapshot measurement for a single point in time.
*
* If events are getting processed quickly then latency is low and the probe should report
* healthy. If any event is "stuck" then latency is high and the probe should report unhealthy.
*/
def processingLatency: F[FiniteDuration]
def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus]
}

object SourceAndAck {

sealed trait HealthStatus
case object Healthy extends HealthStatus
sealed trait Unhealthy extends HealthStatus

/**
* The health status expected if the source is at a stage of its lifecycle where cannot provide
* events
*
* For Pubsub this could be because the Subscriber is not yet running. For Kafka this could be due
* to re-balancing.
*/
case object Disconnected extends Unhealthy

/**
* The health status expected if an event is "stuck" in the EventProcessor
*
* @param latency
* How long the EventProcessor has spent trying to process the stuck event
*/
case class LaggingEventProcessor(latency: FiniteDuration) extends Unhealthy

implicit def showUnhealthy: Show[Unhealthy] = Show {
case Disconnected => "No connection to a source of events"
case LaggingEventProcessor(latency) => show"Processing latency is $latency"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import fs2.{Pipe, Pull, Stream}
import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger}
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration}
import scala.concurrent.duration.{DurationLong, FiniteDuration}
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents}

/**
Expand Down Expand Up @@ -76,16 +76,19 @@ private[sources] object LowLevelSource {
def toSourceAndAck[F[_]: Async, C](source: LowLevelSource[F, C]): F[SourceAndAck[F]] =
for {
latencyRef <- Ref[F].of(Option.empty[FiniteDuration])
} yield sourceAndAckImpl(source, latencyRef)
isConnectedRef <- Ref[F].of(false)
} yield sourceAndAckImpl(source, latencyRef, isConnectedRef)

private def sourceAndAckImpl[F[_]: Async, C](
source: LowLevelSource[F, C],
latencyRef: LatencyRef[F]
latencyRef: LatencyRef[F],
isConnectedRef: Ref[F, Boolean]
): SourceAndAck[F] = new SourceAndAck[F] {
def stream(config: EventProcessingConfig, processor: EventProcessor[F]): Stream[F, Nothing] = {
val str = for {
acksRef <- Stream.bracket(Ref[F].of(Map.empty[Unique.Token, C]))(nackUnhandled(source.checkpointer, _))
s2 <- source.stream
_ <- Stream.bracket(isConnectedRef.set(true))(_ => isConnectedRef.set(false))
} yield {
val tokenedSources = s2
.through(monitorLatency(latencyRef))
Expand All @@ -105,13 +108,13 @@ private[sources] object LowLevelSource {
str.flatten
}

def processingLatency: F[FiniteDuration] =
latencyRef.get.flatMap {
case None => Duration.Zero.pure[F]
case Some(lastPullTime) =>
Async[F].realTime.map { now =>
now - lastPullTime
}
def isHealthy(maxAllowedProcessingLatency: FiniteDuration): F[SourceAndAck.HealthStatus] =
(isConnectedRef.get, latencyRef.get, Sync[F].realTime).mapN {
case (false, _, _) =>
SourceAndAck.Disconnected
case (_, Some(lastPullTime), now) if now - lastPullTime > maxAllowedProcessingLatency =>
SourceAndAck.LaggingEventProcessor(now - lastPullTime)
case _ => SourceAndAck.Healthy
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.specs2.matcher.Matcher
import scala.concurrent.duration.{Duration, DurationInt}

import java.nio.charset.StandardCharsets
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents}

import java.nio.ByteBuffer

Expand All @@ -37,11 +37,12 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
cleanly checkpoint pending window when a stream is interrupted $e5
not checkpoint events if the event processor throws an exception $e6

When reporting processing latency
report zero latency when there are no events $e7
report non-zero latency while there are unprocessed events $e8
report zero latency if events are processed but not yet acked (e.g. a batch-oriented loader) $e9
report zero latency after all events have been processed and acked $e10
When reporting healthy status
report healthy when there are no events $e7
report lagging if there are unprocessed events $e8
report healthy if events are processed but not yet acked (e.g. a batch-oriented loader) $e9
report healthy after all events have been processed and acked $e10
report disconnected while source is in between two active streams of events (e.g. during kafka rebalance) $e11
"""

def e1 = {
Expand Down Expand Up @@ -226,7 +227,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
// A source that emits nothing
val lowLevelSource = new LowLevelSource[IO, Unit] {
def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit)
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.never[IO]
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] = Stream.emit(Stream.never[IO])
}

val io = for {
Expand All @@ -235,9 +236,9 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
processor = testProcessor(refProcessed)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(1.hour)
latency <- sourceAndAck.processingLatency
health <- sourceAndAck.isHealthy(Duration.Zero)
_ <- fiber.cancel
} yield latency must beEqualTo(Duration.Zero)
} yield health must beEqualTo(SourceAndAck.Healthy)

TestControl.executeEmbed(io)
}
Expand All @@ -264,9 +265,9 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(5.minutes)
latency <- sourceAndAck.processingLatency
health <- sourceAndAck.isHealthy(10.seconds)
_ <- fiber.cancel
} yield latency must beEqualTo(5.minutes)
} yield health must beEqualTo(SourceAndAck.LaggingEventProcessor(5.minutes))

TestControl.executeEmbed(io)
}
Expand All @@ -291,9 +292,9 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(342.seconds)
latency <- sourceAndAck.processingLatency
health <- sourceAndAck.isHealthy(90.seconds)
_ <- fiber.cancel
} yield latency must beEqualTo(42.seconds)
} yield health must beEqualTo(SourceAndAck.Healthy)

TestControl.executeEmbed(io)
}
Expand All @@ -320,9 +321,39 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(5.minutes)
latency <- sourceAndAck.processingLatency
health <- sourceAndAck.isHealthy(1.microsecond)
_ <- fiber.cancel
} yield latency must beEqualTo(Duration.Zero)
} yield health must beEqualTo(SourceAndAck.Healthy)

TestControl.executeEmbed(io)
}

def e11 = {

val config = EventProcessingConfig(EventProcessingConfig.NoWindowing)

// A source that emits one batch per inner stream, with a 5 minute "rebalancing" in between
val lowLevelSource = new LowLevelSource[IO, Unit] {
def checkpointer: Checkpointer[IO, Unit] = Checkpointer.acksOnly[IO, Unit](_ => IO.unit)
def stream: Stream[IO, Stream[IO, LowLevelEvents[Unit]]] =
Stream.fixedDelay[IO](5.minutes).map { _ =>
Stream.emit(LowLevelEvents(Chunk.empty, (), None))
}
}

// A processor which takes 5 seconds to process each batch
val processor: EventProcessor[IO] =
_.evalMap { case TokenedEvents(_, token, _) =>
IO.sleep(5.seconds).as(token)
}

val io = for {
sourceAndAck <- LowLevelSource.toSourceAndAck(lowLevelSource)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(2.minutes)
health <- sourceAndAck.isHealthy(1.microsecond)
_ <- fiber.cancel
} yield health must beEqualTo(SourceAndAck.Disconnected)

TestControl.executeEmbed(io)
}
Expand Down

0 comments on commit acdc70b

Please sign in to comment.