Skip to content

Commit

Permalink
DRY RunUntilCaughtUp
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Oct 11, 2023
1 parent 4af4596 commit b26b76b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 46 deletions.
25 changes: 3 additions & 22 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
namespace Propulsion.DynamoStore

open Equinox.DynamoStore
open FSharp.Control
open FSharp.Control // taskSeq
open Propulsion.Internal
open System
open System.Threading
open System.Threading.Tasks

module private Impl =

Expand Down Expand Up @@ -194,22 +192,5 @@ type DynamoStoreSource
default x.Start() = base.Start(x.Pump)

/// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed
member x.RunUntilCaughtUp(timeout: TimeSpan, statsInterval: IntervalTimer) = task {
let sw = Stopwatch.start ()
// Kick off reading from the source (Disposal will Stop it if we're exiting due to a timeout; we'll spin up a fresh one when re-triggered)
use pipeline = x.Start()

try // In the case of sustained activity and/or catch-up scenarios, proactively trigger an orderly shutdown of the Source
// in advance of the Lambda being killed (no point starting new work or incurring DynamoDB CU consumption that won't finish)
Task.Delay(timeout).ContinueWith(fun _ -> pipeline.Stop()) |> ignore

// If for some reason we're not provisioned well enough to read something within 1m, no point for paying for a full lambda timeout
let initialReaderTimeout = TimeSpan.FromMinutes 1.
do! pipeline.Monitor.AwaitCompletion(initialReaderTimeout, awaitFullyCaughtUp = true, logInterval = TimeSpan.FromSeconds 30)
// Shut down all processing (we create a fresh Source per Lambda invocation)
pipeline.Stop()

if sw.ElapsedSeconds > 2 then statsInterval.Trigger()
// force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals)
return! x.Checkpoint(CancellationToken.None)
finally statsInterval.SleepUntilTriggerCleared() }
member x.RunUntilCaughtUp(timeout: TimeSpan, statsInterval: IntervalTimer) =
Propulsion.Feed.Core.FeedMonitor.runUntilCaughtUp x.Start x.Checkpoint (timeout, statsInterval)
35 changes: 28 additions & 7 deletions src/Propulsion.Feed/FeedSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ open Propulsion.Feed
open Propulsion.Internal
open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks

/// Drives reading and checkpointing for a set of feeds (tranches) of a custom source feed
type FeedSourceBase internal
Expand All @@ -30,7 +28,7 @@ type FeedSourceBase internal
finally ingester.Stop() }
let mutable partitions = Array.empty<struct(Ingestion.Ingester<_> * FeedReader)>
let dumpStats () = for _i, r in partitions do r.DumpStats()
let rec pumpStats ct: Task = task {
let rec pumpStats ct: System.Threading.Tasks.Task = task {
try do! Task.delay statsInterval ct
finally dumpStats () // finally is so we do a final write after we are cancelled, which would otherwise stop us after the sleep
return! pumpStats ct }
Expand Down Expand Up @@ -68,7 +66,7 @@ type FeedSourceBase internal

member x.Start(pump) =
let ct, stop =
let cts = new CancellationTokenSource()
let cts = new System.Threading.CancellationTokenSource()
let stop disposing =
if not cts.IsCancellationRequested && not disposing then log.Information "Source stopping..."
cts.Cancel()
Expand Down Expand Up @@ -256,6 +254,31 @@ and FeedMonitor internal (log: Serilog.ILogger, positions: TranchePositions, sin
if sink.IsCompleted && not sink.RanToCompletion then
return! sink.Wait() }

module FeedMonitor =
/// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed
let runUntilCaughtUp
(start: unit -> SourcePipeline<FeedMonitor>)
(checkpoint: CancellationToken -> Task<'R>)
(timeout: TimeSpan, statsInterval: IntervalTimer) = task {
let sw = Stopwatch.start ()
// Kick off reading from the source (Disposal will Stop it if we're exiting due to a timeout; we'll spin up a fresh one when re-triggered)
use pipeline = start ()

try // In the case of sustained activity and/or catch-up scenarios, proactively trigger an orderly shutdown of the Source
// in advance of the Lambda being killed (no point starting new work or incurring DynamoDB CU consumption that won't finish)
Task.Delay(timeout).ContinueWith(fun _ -> pipeline.Stop()) |> ignore

// If for some reason we're not provisioned well enough to read something within 1m, no point for paying for a full lambda timeout
let initialReaderTimeout = TimeSpan.FromMinutes 1.
do! pipeline.Monitor.AwaitCompletion(initialReaderTimeout, awaitFullyCaughtUp = true, logInterval = TimeSpan.FromSeconds 30)
// Shut down all processing (we create a fresh Source per Lambda invocation)
pipeline.Stop()

if sw.ElapsedSeconds > 2 then statsInterval.Trigger()
// force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals)
return! checkpoint CancellationToken.None
finally statsInterval.SleepUntilTriggerCleared() }

/// Drives reading and checkpointing from a source that contains data from multiple streams
type TailingFeedSource
( log: Serilog.ILogger, statsInterval: TimeSpan,
Expand Down Expand Up @@ -342,11 +365,9 @@ module Categories =

namespace Propulsion.Feed

open FSharp.Control
open FSharp.Control // taskSeq
open Propulsion.Internal
open System
open System.Threading
open System.Threading.Tasks

[<NoComparison; NoEquality>]
type Page<'F> = { items: FsCodec.ITimelineEvent<'F>[]; checkpoint: Position; isTail: bool }
Expand Down
19 changes: 2 additions & 17 deletions src/Propulsion.MessageDb/MessageDbSource.fs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
namespace Propulsion.MessageDb

open FSharp.Control
open Npgsql
open NpgsqlTypes
open Propulsion.Feed
open Propulsion.Internal
open System
open System.Threading
open System.Threading.Tasks

module private GetCategoryMessages =
[<Literal>]
Expand Down Expand Up @@ -109,17 +106,5 @@ type MessageDbSource =
default x.Start() = base.Start(x.Pump)

/// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed
member x.RunUntilCaughtUp(timeout: TimeSpan, statsInterval: IntervalTimer) = task {
let sw = Stopwatch.start ()
use pipeline = x.Start()

try Task.Delay(timeout).ContinueWith(fun _ -> pipeline.Stop()) |> ignore

let initialReaderTimeout = TimeSpan.FromMinutes 1.
do! pipeline.Monitor.AwaitCompletion(initialReaderTimeout, awaitFullyCaughtUp = true, logInterval = TimeSpan.FromSeconds 30)
pipeline.Stop()

if sw.ElapsedSeconds > 2 then statsInterval.Trigger()
// force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals)
return! x.Checkpoint(CancellationToken.None)
finally statsInterval.SleepUntilTriggerCleared() }
member x.RunUntilCaughtUp(timeout: TimeSpan, statsInterval: IntervalTimer) =
Core.FeedMonitor.runUntilCaughtUp x.Start x.Checkpoint (timeout, statsInterval)

0 comments on commit b26b76b

Please sign in to comment.