Skip to content

Commit

Permalink
Feed: Add SinglePassFeedSource (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Oct 20, 2022
1 parent 451f59f commit d3246c9
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -12,6 +12,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `Feed`: `Monitor.AwaitCompletion` enables **quasi** deterministic waiting for the processing of async reactions within integration tests [#170](https://github.com/jet/propulsion/pull/170)
- `Feed`: `Checkpoint` enables committing progress (and obtaining the achieved positions) without stopping the Sink [#162](https://github.com/jet/propulsion/pull/162)
- `Feed.SinglePassFeedSource`: Coordinates reads of a set of tranches until each reaches its Tail [#179](https://github.com/jet/propulsion/pull/179)
- `Scheduler`: `purgeInterval` to control memory usage [#97](https://github.com/jet/propulsion/pull/97)
- `Scheduler`: `wakeForResults` option to maximize throughput (without having to drop sleep interval to zero) [#161](https://github.com/jet/propulsion/pull/161)
- `Ingester`: Expose optional `ingesterStatsInterval` control [#154](https://github.com/jet/propulsion/pull/154)
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/ChangeFeedProcessor.fs
Expand Up @@ -30,7 +30,7 @@ type internal SourcePipeline =

static member Start(log : ILogger, start, maybeStartChild, stop, observer : IDisposable) =
let cts = new CancellationTokenSource()
let triggerStop () =
let triggerStop _disposing =
let level = if cts.IsCancellationRequested then Events.LogEventLevel.Debug else Events.LogEventLevel.Information
log.Write(level, "Source stopping...")
observer.Dispose()
Expand Down
6 changes: 4 additions & 2 deletions src/Propulsion.Feed/FeedReader.fs
Expand Up @@ -114,7 +114,9 @@ type FeedReader
// permitted to throw if it fails; failures are counted and/or retried with throttling
-> Async<unit>,
renderPos,
?logCommitFailure) =
?logCommitFailure,
// Stop processing when the crawl function yields a Batch that isTail. Default false.
?stopAtTail) =

let log = log.ForContext("source", sourceId).ForContext("tranche", trancheId)
let stats = Stats(log, statsInterval, sourceId, trancheId, renderPos)
Expand Down Expand Up @@ -147,7 +149,7 @@ type FeedReader
let! ct = Async.CancellationToken
Task.start (fun () -> stats.Pump ct)
let mutable currentPos, lastWasTail = initialPosition, false
while not ct.IsCancellationRequested do
while not (ct.IsCancellationRequested || (lastWasTail && defaultArg stopAtTail false)) do
for readLatency, batch in crawl (lastWasTail, currentPos) do
do! submitPage (readLatency, batch) |> Async.AwaitTaskCorrect
currentPos <- batch.checkpoint
Expand Down
76 changes: 46 additions & 30 deletions src/Propulsion.Feed/FeedSource.fs
Expand Up @@ -13,14 +13,15 @@ type FeedSourceBase internal
checkpoints : IFeedCheckpointStore, establishOrigin : (TrancheId -> Async<Position>) option,
sink : Propulsion.Streams.Default.Sink,
renderPos : Position -> string,
?logCommitFailure) =
?logCommitFailure, ?stopAtTail) =
let log = log.ForContext("source", sourceId)
let logForTranche trancheId = log.ForContext("tranche", trancheId)
let positions = TranchePositions()
let pumpPartition crawl (ingester : Ingestion.Ingester<_>) trancheId = async {
let log = logForTranche trancheId
let ingest = positions.Intercept(trancheId) >> ingester.Ingest
let reader = FeedReader(log, sourceId, trancheId, statsInterval, crawl trancheId, ingest, checkpoints.Commit, renderPos, ?logCommitFailure = logCommitFailure)
let reader = FeedReader(log, sourceId, trancheId, statsInterval, crawl trancheId, ingest, checkpoints.Commit, renderPos,
?logCommitFailure = logCommitFailure, ?stopAtTail = stopAtTail)
try try let! freq, pos = checkpoints.Start(sourceId, trancheId, ?establishOrigin = (establishOrigin |> Option.map (fun f -> f trancheId)))
log.Information("Reading {source:l}/{tranche:l} From {pos} Checkpoint Event interval {checkpointFreq:n1}m",
sourceId, trancheId, renderPos pos, freq.TotalMinutes)
Expand Down Expand Up @@ -56,21 +57,25 @@ type FeedSourceBase internal
member x.Start(pump) =
let ct, stop =
let cts = new System.Threading.CancellationTokenSource()
let stop () =
if not cts.IsCancellationRequested then log.Information "Source stopping..."
let stop disposing =
if not cts.IsCancellationRequested && not disposing then log.Information "Source stopping..."
cts.Cancel()
cts.Token, stop

let supervise, markCompleted, outcomeTask =
let tcs = System.Threading.Tasks.TaskCompletionSource<unit>()
let markCompleted () = tcs.TrySetResult () |> ignore
let recordExn (e : exn) = tcs.TrySetException e |> ignore
// first exception from a supervised task becomes the outcome if that happens
let supervise inner = async {
try do! inner
// If the source completes all reading cleanly, declare completion
log.Information "Source drained..."
markCompleted ()
with e ->
log.Warning(e, "Exception encountered while running source, exiting loop")
recordExn e }
supervise, (fun () -> tcs.TrySetResult () |> ignore), tcs.Task
supervise, markCompleted, tcs.Task

let supervise () = task {
// external cancellation should yield a success result (in the absence of failures from the supervised tasks)
Expand Down Expand Up @@ -114,27 +119,23 @@ and FeedMonitor internal (log : Serilog.ILogger, positions : TranchePositions, s

let notEol () = not sink.IsCompleted && not (completed ())
let choose f (xs : KeyValuePair<_, _> array) = [| for x in xs do match f x.Value with ValueNone -> () | ValueSome v' -> struct (x.Key, v') |]
let activeReadPositions () =
match positions.Current() with
| xs when xs |> Array.forall (fun (kv : KeyValuePair<_, TrancheState>) -> kv.Value.IsEmpty) -> Array.empty
| originals -> originals |> choose (fun v -> v.read)
// Waits for up to propagationDelay, returning the opening tranche positions observed (or empty if the wait has timed out)
let awaitPropagation (sleepMs : int) (propagationDelay : TimeSpan) = async {
let awaitPropagation (sleepMs : int) (propagationDelay : TimeSpan) positions = async {
let timeout = IntervalTimer propagationDelay
let mutable startPositions = activeReadPositions ()
let mutable startPositions = positions ()
while Array.isEmpty startPositions && not timeout.IsDue && notEol () do
do! Async.Sleep sleepMs
startPositions <- activeReadPositions ()
startPositions <- positions ()
return startPositions }
// Waits for up to lingerTime for work to arrive. If any work arrives, it waits for activity (including extra work that arrived during the wait) to quiesce.
// If the lingerTime expires without work having completed, returns the start positions from when activity commenced
let awaitLinger (sleepMs : int) (lingerTime : TimeSpan) = async {
let awaitLinger (sleepMs : int) (lingerTime : TimeSpan) positions = async {
let timeout = IntervalTimer lingerTime
let mutable startPositions = activeReadPositions ()
let mutable startPositions = positions ()
let mutable worked = false
while (Array.any startPositions || not worked) && not timeout.IsDue && notEol () do
do! Async.Sleep sleepMs
let current = activeReadPositions ()
let current = positions ()
if not worked && Array.any current then startPositions <- current; worked <- true // Starting: Record start position (for if we exit due to timeout)
elif worked && Array.isEmpty current then startPositions <- current // Finished now: clear starting position record, triggering normal exit of loop
return startPositions }
Expand Down Expand Up @@ -196,21 +197,26 @@ and FeedMonitor internal (log : Serilog.ILogger, positions : TranchePositions, s
let sw = Stopwatch.start ()
let delayMs = delay |> Option.map (fun (d : TimeSpan) -> int d.TotalMilliseconds) |> Option.defaultValue 1
let currentCompleted = seq { for kv in positions.Current() -> struct (kv.Key, ValueOption.toNullable kv.Value.completed) }
match! awaitPropagation delayMs propagationDelay with
let waitMode =
match ignoreSubsequent, awaitFullyCaughtUp with
| Some true, Some true -> invalidArg (nameof awaitFullyCaughtUp) "cannot be combine with ignoreSubsequent"
| _, Some true -> AwaitFullyCaughtUp
| Some true, _ -> IncludeSubsequent
| _ -> OriginalWorkOnly
let requireTail = match waitMode with AwaitFullyCaughtUp -> true | _ -> false
let activeTranches () =
match positions.Current() with
| xs when xs |> Array.forall (fun (kv : KeyValuePair<_, TrancheState>) -> kv.Value.IsEmpty && (not requireTail || kv.Value.isTail)) -> Array.empty
| originals -> originals |> choose (fun v -> v.read)
match! awaitPropagation delayMs propagationDelay activeTranches with
| [||] ->
if propagationDelay = TimeSpan.Zero then log.Debug("FeedSource Wait Skipped; no processing pending. Completed {completed}", currentCompleted)
else log.Information("FeedMonitor Wait {propagationDelay:n1}s Timeout. Completed {completed}", sw.ElapsedSeconds, currentCompleted)
| starting ->
let propUsed = sw.Elapsed
let logInterval = defaultArg logInterval (TimeSpan.FromSeconds 5.)
let swProcessing = Stopwatch.start ()
let waitMode =
match ignoreSubsequent, awaitFullyCaughtUp with
| Some true, Some true -> invalidArg (nameof awaitFullyCaughtUp) "cannot be combine with ignoreSubsequent"
| _, Some true -> AwaitFullyCaughtUp
| Some true, _ -> IncludeSubsequent
| _ -> OriginalWorkOnly
do! awaitCompletion (delayMs , logInterval) swProcessing starting waitMode
do! awaitCompletion (delayMs, logInterval) swProcessing starting waitMode
let procUsed = swProcessing.Elapsed
let isDrainedNow () = positions.Current() |> isDrained
let linger = match lingerTime with None -> TimeSpan.Zero | Some lingerF -> lingerF (isDrainedNow ()) propagationDelay propUsed procUsed
Expand All @@ -223,7 +229,7 @@ and FeedMonitor internal (log : Serilog.ILogger, positions : TranchePositions, s
sw.ElapsedSeconds, propUsed.TotalSeconds, propagationDelay.TotalSeconds, procUsed.TotalSeconds, isDrainedNow (), starting, completed)
if not skipLinger then
let swLinger = Stopwatch.start ()
match! awaitLinger delayMs linger with
match! awaitLinger delayMs linger activeTranches with
| [||] ->
log.Information("FeedMonitor Wait {totalTime:n1}s OK Propagate {propagate:n1}/{propTimeout:n1}s Process {process:n1}s Linger {lingered:n1}/{linger:n1}s Tail {allAtTail}. Starting {starting} Completed {completed}",
sw.ElapsedSeconds, propUsed.TotalSeconds, propagationDelay.TotalSeconds, procUsed.TotalSeconds, swLinger.ElapsedSeconds, linger.TotalSeconds, isDrainedNow (), starting, originalCompleted)
Expand All @@ -235,18 +241,15 @@ and FeedMonitor internal (log : Serilog.ILogger, positions : TranchePositions, s
if sink.IsCompleted && not sink.RanToCompletion then
return! sink.AwaitShutdown() }

/// Drives reading and checkpointing from a source that contains data from multiple streams. While a TrancheId is always required,
/// it may have a default value of `"0"` if the underlying source representation does not involve autonomous shards/physical partitions etc
/// Drives reading and checkpointing from a source that contains data from multiple streams
type TailingFeedSource
( log : Serilog.ILogger, statsInterval : TimeSpan,
sourceId, tailSleepInterval : TimeSpan,
checkpoints : IFeedCheckpointStore, establishOrigin : (TrancheId -> Async<Position>) option, sink : Propulsion.Streams.Default.Sink,
crawl : TrancheId * Position -> AsyncSeq<struct (TimeSpan * Batch<_>)>,
renderPos,
?logReadFailure,
?readFailureSleepInterval : TimeSpan,
?logCommitFailure) =
inherit FeedSourceBase(log, statsInterval, sourceId, checkpoints, establishOrigin, sink, renderPos, ?logCommitFailure = logCommitFailure)
?logReadFailure, ?readFailureSleepInterval, ?logCommitFailure, ?stopAtTail) =
inherit FeedSourceBase(log, statsInterval, sourceId, checkpoints, establishOrigin, sink, renderPos, ?logCommitFailure = logCommitFailure, ?stopAtTail = stopAtTail)

let crawl trancheId (wasLast, startPos) = asyncSeq {
if wasLast then
Expand Down Expand Up @@ -290,6 +293,19 @@ type AllFeedSource
member x.Start() =
base.Start x.Pump

/// Drives reading from the Source until the Tail of each Tranche has been reached
/// Useful for ingestion
type SinglePassFeedSource
( log : Serilog.ILogger, statsInterval : TimeSpan,
sourceId,
checkpoints : IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
crawl : TrancheId * Position -> AsyncSeq<struct (TimeSpan * Batch<_>)>,
renderPos,
?logReadFailure, ?readFailureSleepInterval, ?logCommitFailure) =
inherit TailingFeedSource(log, statsInterval, sourceId, (*tailSleepInterval*)TimeSpan.Zero, checkpoints, (*establishOrigin*)None, sink, crawl, renderPos,
?logReadFailure = logReadFailure, ?readFailureSleepInterval = readFailureSleepInterval, ?logCommitFailure = logCommitFailure,
stopAtTail = true)

namespace Propulsion.Feed

open FSharp.Control
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.MemoryStore/MemoryStoreSource.fs
Expand Up @@ -51,7 +51,7 @@ type MemoryStoreSource<'F>(log, store : Equinox.MemoryStore.VolatileStore<'F>, c
member x.Start() =
let ct, stop =
let cts = new CancellationTokenSource()
cts.Token, fun () -> log.Information "Source stopping..."; cts.Cancel()
cts.Token, fun _disposing -> log.Information "Source stopping..."; cts.Cancel()

let setSuccess, awaitCompletion =
let tcs = System.Threading.Tasks.TaskCompletionSource<unit>()
Expand Down
16 changes: 8 additions & 8 deletions src/Propulsion/Pipeline.fs
Expand Up @@ -10,7 +10,7 @@ open System.Threading.Tasks
/// Conclusion of processing can be awaited by via `AwaitShutdown` or `AwaitWithStopOnCancellation` (or synchronously via IsCompleted)
type Pipeline(task : Task<unit>, triggerStop) =

interface IDisposable with member x.Dispose() = x.Stop()
interface IDisposable with member x.Dispose() = triggerStop true

/// Inspects current status of task representing the Pipeline's overall state
member _.Status = task.Status
Expand All @@ -22,7 +22,7 @@ type Pipeline(task : Task<unit>, triggerStop) =
member _.RanToCompletion = task.Status = TaskStatus.RanToCompletion

/// Request completion of processing and shutdown of the Pipeline
member _.Stop() = triggerStop ()
member _.Stop() = triggerStop false

/// Asynchronously waits until Stop()ped or the Pipeline Faults (in which case the underlying Exception is observed)
member _.AwaitShutdown() = Async.AwaitTaskCorrect task
Expand All @@ -36,9 +36,9 @@ type Pipeline(task : Task<unit>, triggerStop) =

static member Prepare(log : ILogger, pumpDispatcher, pumpScheduler, pumpSubmitter, ?pumpIngester) =
let cts = new CancellationTokenSource()
let triggerStop () =
let level = if cts.IsCancellationRequested then Events.LogEventLevel.Debug else Events.LogEventLevel.Information
log.Write(level, "Projector stopping...")
let triggerStop disposing =
let level = if disposing || cts.IsCancellationRequested then Events.LogEventLevel.Debug else Events.LogEventLevel.Information
log.Write(level, "Sink stopping...")
cts.Cancel()
let ct = cts.Token

Expand All @@ -53,10 +53,10 @@ type Pipeline(task : Task<unit>, triggerStop) =
let start (name : string) (f : CancellationToken -> Task<unit>) =
let wrap () = task {
try do! f ct
log.Information("Exiting {name}", name)
log.Information("... {name} stopped", name)
with e ->
log.Fatal(e, "Abend from {name}", name)
triggerStop () }
triggerStop false }
Task.start wrap

let supervise () = task {
Expand All @@ -71,7 +71,7 @@ type Pipeline(task : Task<unit>, triggerStop) =

// await for either handler-driven abend or external cancellation via Stop()
try return! tcs.Task
finally log.Information("... projector stopped") }
finally log.Information("... sink stopped") }

let task = Task.Run<unit>(supervise)
task, triggerStop
Expand Down

0 comments on commit d3246c9

Please sign in to comment.