Skip to content

Commit

Permalink
Checkpoint outstanding records when loader terminates (close #69)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 30, 2021
1 parent 3c5aed8 commit e3b253b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 17 deletions.
Expand Up @@ -19,6 +19,7 @@ import cats.effect.{Async, Blocker, ContextShift, Timer, ConcurrentEffect}
import doobie.hikari.HikariTransactor

import fs2.{Stream, Pipe}
import fs2.concurrent.SignallingRef

import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.postgres.streaming.{SinkPipe, StreamSink, DummyStreamSink}
Expand All @@ -33,7 +34,8 @@ final case class Environment[F[_], A](
badRowSink: StreamSink[F],
getPayload: A => F[Either[BadRow, String]],
checkpointer: Pipe[F, A, Unit],
sinkPipe: HikariTransactor[F] => SinkPipe[F]
sinkPipe: HikariTransactor[F] => SinkPipe[F],
halt: SignallingRef[F, Boolean]
)

object Environment {
Expand Down
Expand Up @@ -23,6 +23,7 @@ import cats.data.{EitherT, NonEmptyList}
import cats.effect.{Blocker, Clock, ConcurrentEffect, ContextShift, Resource, Sync, Timer}

import fs2.{Stream, Pipe}
import fs2.concurrent.SignallingRef
import fs2.aws.kinesis.{CommittableRecord, Kinesis}

import software.amazon.awssdk.regions.Region
Expand Down Expand Up @@ -57,12 +58,14 @@ object KinesisEnv {
dynamoClient <- mkDynamoDbClient[F](config.region)
cloudWatchClient <- mkCloudWatchClient[F](config.region)
kinesis = Kinesis.create(blocker, scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, metrics, _))
halt <- Resource.eval(SignallingRef(false))
} yield Environment(
getSource(kinesis),
badSink,
getPayload[F](purpose, _),
checkpointer(kinesis, config.checkpointSettings),
SinkPipe.OrderedPipe.forTransactor[F]
SinkPipe.OrderedPipe.forTransactor[F],
halt
)

private def getSource[F[_]](kinesis: Kinesis[F]): Stream[F, CommittableRecord] =
Expand Down
Expand Up @@ -17,6 +17,7 @@ import cats.implicits._
import cats.effect.{ContextShift, Blocker, Resource, Timer, ConcurrentEffect}

import fs2.{Pipe, Stream}
import fs2.concurrent.SignallingRef

import blobstore.fs.FileStore
import blobstore.Store
Expand All @@ -30,15 +31,16 @@ object LocalEnv {

def create[F[_]: ConcurrentEffect : ContextShift : Timer](blocker: Blocker, config: Source.Local, badSink: StreamSink[F]): Resource[F, Environment[F, String]] = {
Resource.eval {
ConcurrentEffect[F].delay(
SignallingRef(false).map { halt =>
Environment[F, String](
getSource(blocker, config),
badSink,
getPayload[F](_),
checkpointer,
SinkPipe.UnorderedPipe.forTransactor[F]
SinkPipe.UnorderedPipe.forTransactor[F],
halt
)
)
}
}
}

Expand Down
Expand Up @@ -17,6 +17,7 @@ import cats.implicits._
import cats.effect.{ContextShift, Blocker, Resource, Timer, ConcurrentEffect, Sync}

import fs2.Pipe
import fs2.concurrent.SignallingRef

import org.log4s.getLogger

Expand All @@ -41,17 +42,18 @@ object PubSubEnv {
val project = ProjectId(config.projectId)
val subscription = Subscription(config.subscriptionId)
val pubsubConfig = PubsubGoogleConsumerConfig[F](onFailedTerminate = pubsubOnFailedTerminate[F])
Resource.eval(
ConcurrentEffect[F].delay(
Resource.eval {
SignallingRef(false).map { halt =>
Environment[F, ConsumerRecord[F, String]](
PubsubGoogleConsumer.subscribe[F, String](blocker, project, subscription, pubsubErrorHandler[F], pubsubConfig),
badSink,
getPayload(_),
checkpointer(config.checkpointSettings.maxConcurrent),
SinkPipe.UnorderedPipe.forTransactor[F]
SinkPipe.UnorderedPipe.forTransactor[F],
halt
)
)
)
}
}
}

private def getPayload[F[_]: Applicative](record: ConsumerRecord[F, String]): F[Either[BadRow, String]] = record.value.asRight.pure[F]
Expand Down
Expand Up @@ -15,13 +15,14 @@ package com.snowplowanalytics.snowplow.postgres.loader
import scala.concurrent.ExecutionContext

import cats.data.EitherT
import cats.effect.{IOApp, IO, ExitCode}
import cats.effect.{IOApp, IO, ExitCase, ExitCode, Fiber}

import org.log4s.getLogger

import doobie.hikari.HikariTransactor

import fs2.Pipe
import fs2.concurrent.Queue

import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
import com.snowplowanalytics.snowplow.postgres.api.{State, DB}
Expand Down Expand Up @@ -72,15 +73,53 @@ object Main extends IOApp {
case Purpose.Enriched => utils.prepare[IO](cli.config.output.good.schema, xa)
case Purpose.SelfDescribing => IO.unit
}
_ <- env.source
.through(parsePayload(env.getPayload, cli.config.purpose))
.through(sinkAll(env.sinkPipe(xa), Sink.sinkResult(state, cli.iglu, processor, env.badRowSink)))
.through(env.checkpointer)
.compile
.drain
_ <- runWithShutdown(env) {
_.through(parsePayload(env.getPayload, cli.config.purpose))
.through(sinkAll(env.sinkPipe(xa), Sink.sinkResult(state, cli.iglu, processor, env.badRowSink)))
.through(env.checkpointer)
}
} yield ExitCode.Success
}

/**
* This is the machinery needed to make sure outstanding records are checkpointed before the app
* terminates
*
* The stream runs on a separate fiber. We call `join` on the fiber, so any runtime error on the
* stream causes the app to shut down. When we receive a SIGINT then we terminate the fiber by
* setting `env.halt` to true.
*
* We use a queue as a level of indirection between the soure and the sink. We call
* `interruptWhen` on the `queue.dequeue` because it causes all outstanding records to get
* checkpointed before the app shuts down.
*
* We must not call `interruptWhen` on the source itself, because this would shutdown the kinesis
* scheduler too early, and then we cannot checkpoint the outstanding records.
*/
private def runWithShutdown[A](env: Environment[IO, A])(sink: Pipe[IO, A, Unit]): IO[Unit] =
Queue.synchronous[IO, A].flatMap { queue =>
queue
.dequeue
.interruptWhen(env.halt)
.through(sink)
.concurrently(env.source.evalMap(queue.enqueue1))
.compile
.drain
.start
.bracketCase(_.join) {
case (fiber, ExitCase.Canceled) => terminateStream(env, fiber)
case (_, ExitCase.Completed | ExitCase.Error(_)) => IO.unit
}
}


private def terminateStream[A](env: Environment[IO, A], fiber: Fiber[IO, Unit]): IO[Unit] =
for {
_ <- IO.delay(println("Halting the source")) // can't use logger here or else it gets stuck
_ <- env.halt.set(true)
_ <- fiber.join
} yield ()

private def parsePayload[A](getPayload: A => IO[Either[BadRow, String]], purpose: Purpose): Pipe[IO, A, (A, Either[BadRow, Data])] =
_.evalMap { record =>
val p = for {
Expand Down

0 comments on commit e3b253b

Please sign in to comment.