Skip to content

Commit

Permalink
Improve timings of overlapping windows (#64)
Browse files Browse the repository at this point in the history
In Lake Loader, there is an edge-case scenario where the time taken to
"commit" events to the Lake becomes longer than the duration of the
configured timed window.

Previously, because of how windows got timed, the edge case scenario caused the
window sizes to get smaller and smaller as it waited for the events to be
committed.

This PR changes how a window length is timed. After this PR, the processing
time of a window remains constant, even if the "commit" phase starts to
exceed the window size.  It allows the loader to stay healthier even
during edge-case scenarios.
  • Loading branch information
istreeter committed Mar 19, 2024
1 parent 91a3862 commit d545721
Showing 1 changed file with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,21 +241,20 @@ private[sources] object LowLevelSource {
case Some(q) => Pull.eval(q.offer(None)) >> Pull.done
}
case Some((Left(_), next)) =>
val openWindow =
Pull.eval(Logger[F].info(s"Opening new window with duration ${config.duration}")) >> next.timeout(config.duration)
current match {
case None => openWindow >> go(next, None)
case Some(q) => openWindow >> Pull.eval(q.offer(None)) >> go(next, None)
case None => go(next, None)
case Some(q) => Pull.eval(q.offer(None)) >> go(next, None)
}
case Some((Right(chunk), next)) =>
current match {
case None =>
for {
val pull = for {
q <- Pull.eval(Queue.synchronous[F, Option[A]])
_ <- Pull.output1(Stream.fromQueueNoneTerminated(q))
_ <- Pull.eval(chunk.traverse(a => q.offer(Some(a))))
_ <- go(next, Some(q))
} yield ()
_ <- Pull.eval(Logger[F].info(s"Opening new window with duration ${config.duration}")) >> next.timeout(config.duration)
} yield go(next, Some(q))
pull.flatten
case Some(q) =>
Pull.eval(chunk.traverse(a => q.offer(Some(a)))) >> go(next, Some(q))
}
Expand All @@ -265,11 +264,11 @@ private[sources] object LowLevelSource {
in.pull
.timed { timedPull: Pull.Timed[F, A] =>
val timeout = timeoutForFirstWindow(config)
for {
val pull = for {
_ <- Pull.eval(Logger[F].info(s"Opening first window with randomly adjusted duration of $timeout"))
_ <- timedPull.timeout(timeout)
_ <- go(timedPull, None)
} yield ()
} yield go(timedPull, None)
pull.flatten
}
.stream
.prefetch // This prefetch is required to pull items into the emitted stream
Expand Down

0 comments on commit d545721

Please sign in to comment.