Skip to content

Commit

Permalink
Checkpoint outstanding records when loader terminates (close #69)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 30, 2021
1 parent 3c5aed8 commit 6e34071
Showing 1 changed file with 46 additions and 8 deletions.
Expand Up @@ -15,13 +15,14 @@ package com.snowplowanalytics.snowplow.postgres.loader
import scala.concurrent.ExecutionContext

import cats.data.EitherT
import cats.effect.{IOApp, IO, ExitCode}
import cats.effect.{IOApp, IO, ExitCase, ExitCode, Fiber}

import org.log4s.getLogger

import doobie.hikari.HikariTransactor

import fs2.Pipe
import fs2.{Pipe, Stream}
import fs2.concurrent.{Queue, NoneTerminatedQueue}

import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
import com.snowplowanalytics.snowplow.postgres.api.{State, DB}
Expand Down Expand Up @@ -72,15 +73,52 @@ object Main extends IOApp {
case Purpose.Enriched => utils.prepare[IO](cli.config.output.good.schema, xa)
case Purpose.SelfDescribing => IO.unit
}
_ <- env.source
.through(parsePayload(env.getPayload, cli.config.purpose))
.through(sinkAll(env.sinkPipe(xa), Sink.sinkResult(state, cli.iglu, processor, env.badRowSink)))
.through(env.checkpointer)
.compile
.drain
_ <- runWithShutdown(env.source) {
_.through(parsePayload(env.getPayload, cli.config.purpose))
.through(sinkAll(env.sinkPipe(xa), Sink.sinkResult(state, cli.iglu, processor, env.badRowSink)))
.through(env.checkpointer)
}
} yield ExitCode.Success
}

/**
* This is the machinery needed to make sure outstanding records are checkpointed before the app
* terminates
*
* The stream runs on a separate fiber. We call `join` on the fiber, so any runtime error on the
* stream causes the app to shut down.
*
* We use a queue as a level of indirection between the soure and the sink. When we receive a
* SIGINT then we terminate the fiber by pushing a `None` to the queue.
*
* We call `interruptWhen` on the `queue.dequeue` because it causes all outstanding records to
* get checkpointed before the app shuts down. We must not call `interruptWhen` on the source
* itself, because this would shutdown the kinesis scheduler too early, and then we cannot
* checkpoint the outstanding records.
*/
private def runWithShutdown[A](source: Stream[IO, A])(sink: Pipe[IO, A, Unit]): IO[Unit] =
Queue.synchronousNoneTerminated[IO, A].flatMap { queue =>
queue
.dequeue
.through(sink)
.concurrently(source.evalMap(x => queue.enqueue1(Some(x))))
.compile
.drain
.start
.bracketCase(_.join) {
case (fiber, ExitCase.Canceled) => terminateStream(queue, fiber)
case (_, ExitCase.Completed | ExitCase.Error(_)) => IO.unit
}
}


private def terminateStream[A](queue: NoneTerminatedQueue[IO, A], fiber: Fiber[IO, Unit]): IO[Unit] =
for {
_ <- IO.delay(println("Halting the source")) // can't use logger here or else it gets stuck
_ <- queue.enqueue1(None)
_ <- fiber.join
} yield ()

private def parsePayload[A](getPayload: A => IO[Either[BadRow, String]], purpose: Purpose): Pipe[IO, A, (A, Either[BadRow, Data])] =
_.evalMap { record =>
val p = for {
Expand Down

0 comments on commit 6e34071

Please sign in to comment.