From b26b76b8e446bb1a755d0dd67ecf5c9e9cb3dff8 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 11 Oct 2023 16:44:45 +0100 Subject: [PATCH] DRY RunUntilCaughtUp --- .../DynamoStoreSource.fs | 25 ++----------- src/Propulsion.Feed/FeedSource.fs | 35 +++++++++++++++---- src/Propulsion.MessageDb/MessageDbSource.fs | 19 ++-------- 3 files changed, 33 insertions(+), 46 deletions(-) diff --git a/src/Propulsion.DynamoStore/DynamoStoreSource.fs b/src/Propulsion.DynamoStore/DynamoStoreSource.fs index ef49f5fd..f2a1be59 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreSource.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreSource.fs @@ -1,11 +1,9 @@ namespace Propulsion.DynamoStore open Equinox.DynamoStore -open FSharp.Control +open FSharp.Control // taskSeq open Propulsion.Internal open System -open System.Threading -open System.Threading.Tasks module private Impl = @@ -194,22 +192,5 @@ type DynamoStoreSource default x.Start() = base.Start(x.Pump) /// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed - member x.RunUntilCaughtUp(timeout: TimeSpan, statsInterval: IntervalTimer) = task { - let sw = Stopwatch.start () - // Kick off reading from the source (Disposal will Stop it if we're exiting due to a timeout; we'll spin up a fresh one when re-triggered) - use pipeline = x.Start() - - try // In the case of sustained activity and/or catch-up scenarios, proactively trigger an orderly shutdown of the Source - // in advance of the Lambda being killed (no point starting new work or incurring DynamoDB CU consumption that won't finish) - Task.Delay(timeout).ContinueWith(fun _ -> pipeline.Stop()) |> ignore - - // If for some reason we're not provisioned well enough to read something within 1m, no point for paying for a full lambda timeout - let initialReaderTimeout = TimeSpan.FromMinutes 1. - do! pipeline.Monitor.AwaitCompletion(initialReaderTimeout, awaitFullyCaughtUp = true, logInterval = TimeSpan.FromSeconds 30) - // Shut down all processing (we create a fresh Source per Lambda invocation) - pipeline.Stop() - - if sw.ElapsedSeconds > 2 then statsInterval.Trigger() - // force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals) - return! x.Checkpoint(CancellationToken.None) - finally statsInterval.SleepUntilTriggerCleared() } + member x.RunUntilCaughtUp(timeout: TimeSpan, statsInterval: IntervalTimer) = + Propulsion.Feed.Core.FeedMonitor.runUntilCaughtUp x.Start x.Checkpoint (timeout, statsInterval) diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion.Feed/FeedSource.fs index d9cd941f..2b9944ff 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion.Feed/FeedSource.fs @@ -7,8 +7,6 @@ open Propulsion.Feed open Propulsion.Internal open System open System.Collections.Generic -open System.Threading -open System.Threading.Tasks /// Drives reading and checkpointing for a set of feeds (tranches) of a custom source feed type FeedSourceBase internal @@ -30,7 +28,7 @@ type FeedSourceBase internal finally ingester.Stop() } let mutable partitions = Array.empty * FeedReader)> let dumpStats () = for _i, r in partitions do r.DumpStats() - let rec pumpStats ct: Task = task { + let rec pumpStats ct: System.Threading.Tasks.Task = task { try do! Task.delay statsInterval ct finally dumpStats () // finally is so we do a final write after we are cancelled, which would otherwise stop us after the sleep return! pumpStats ct } @@ -68,7 +66,7 @@ type FeedSourceBase internal member x.Start(pump) = let ct, stop = - let cts = new CancellationTokenSource() + let cts = new System.Threading.CancellationTokenSource() let stop disposing = if not cts.IsCancellationRequested && not disposing then log.Information "Source stopping..." cts.Cancel() @@ -256,6 +254,31 @@ and FeedMonitor internal (log: Serilog.ILogger, positions: TranchePositions, sin if sink.IsCompleted && not sink.RanToCompletion then return! sink.Wait() } +module FeedMonitor = + /// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed + let runUntilCaughtUp + (start: unit -> SourcePipeline) + (checkpoint: CancellationToken -> Task<'R>) + (timeout: TimeSpan, statsInterval: IntervalTimer) = task { + let sw = Stopwatch.start () + // Kick off reading from the source (Disposal will Stop it if we're exiting due to a timeout; we'll spin up a fresh one when re-triggered) + use pipeline = start () + + try // In the case of sustained activity and/or catch-up scenarios, proactively trigger an orderly shutdown of the Source + // in advance of the Lambda being killed (no point starting new work or incurring DynamoDB CU consumption that won't finish) + Task.Delay(timeout).ContinueWith(fun _ -> pipeline.Stop()) |> ignore + + // If for some reason we're not provisioned well enough to read something within 1m, no point for paying for a full lambda timeout + let initialReaderTimeout = TimeSpan.FromMinutes 1. + do! pipeline.Monitor.AwaitCompletion(initialReaderTimeout, awaitFullyCaughtUp = true, logInterval = TimeSpan.FromSeconds 30) + // Shut down all processing (we create a fresh Source per Lambda invocation) + pipeline.Stop() + + if sw.ElapsedSeconds > 2 then statsInterval.Trigger() + // force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals) + return! checkpoint CancellationToken.None + finally statsInterval.SleepUntilTriggerCleared() } + /// Drives reading and checkpointing from a source that contains data from multiple streams type TailingFeedSource ( log: Serilog.ILogger, statsInterval: TimeSpan, @@ -342,11 +365,9 @@ module Categories = namespace Propulsion.Feed -open FSharp.Control +open FSharp.Control // taskSeq open Propulsion.Internal open System -open System.Threading -open System.Threading.Tasks [] type Page<'F> = { items: FsCodec.ITimelineEvent<'F>[]; checkpoint: Position; isTail: bool } diff --git a/src/Propulsion.MessageDb/MessageDbSource.fs b/src/Propulsion.MessageDb/MessageDbSource.fs index 6cb3b057..37656826 100644 --- a/src/Propulsion.MessageDb/MessageDbSource.fs +++ b/src/Propulsion.MessageDb/MessageDbSource.fs @@ -1,13 +1,10 @@ namespace Propulsion.MessageDb -open FSharp.Control open Npgsql open NpgsqlTypes open Propulsion.Feed open Propulsion.Internal open System -open System.Threading -open System.Threading.Tasks module private GetCategoryMessages = [] @@ -109,17 +106,5 @@ type MessageDbSource = default x.Start() = base.Start(x.Pump) /// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed - member x.RunUntilCaughtUp(timeout: TimeSpan, statsInterval: IntervalTimer) = task { - let sw = Stopwatch.start () - use pipeline = x.Start() - - try Task.Delay(timeout).ContinueWith(fun _ -> pipeline.Stop()) |> ignore - - let initialReaderTimeout = TimeSpan.FromMinutes 1. - do! pipeline.Monitor.AwaitCompletion(initialReaderTimeout, awaitFullyCaughtUp = true, logInterval = TimeSpan.FromSeconds 30) - pipeline.Stop() - - if sw.ElapsedSeconds > 2 then statsInterval.Trigger() - // force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals) - return! x.Checkpoint(CancellationToken.None) - finally statsInterval.SleepUntilTriggerCleared() } + member x.RunUntilCaughtUp(timeout: TimeSpan, statsInterval: IntervalTimer) = + Core.FeedMonitor.runUntilCaughtUp x.Start x.Checkpoint (timeout, statsInterval)