diff --git a/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs b/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs index ff3b1ed4..011b073b 100644 --- a/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs +++ b/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs @@ -28,7 +28,10 @@ type [] ChangeFeedProcessor private () = return! observers.Ingest(ctx, changes, checkpointAsync, ct) with Exception.Log log () -> () } let notifyError = - let log = match notifyError with Some f -> f | None -> Action<_, _>(fun i ex -> observers.LogReaderExn(i, ex)) + let isNoise: exn -> bool = function // TODO tell MS to stop misreporting it, and remove the filter when that's released + | :? Microsoft.Azure.Cosmos.ChangeFeedProcessorUserException as e when (e.InnerException :? OperationCanceledException) -> true + | _ -> false + let log = match notifyError with Some f -> f | None -> Action<_, _>(fun i ex -> observers.LogReaderExn(i, ex, isNoise ex)) fun (TokenRangeId rangeId) ex -> log.Invoke(rangeId, ex); Task.CompletedTask let logStateChange acquired (TokenRangeId rangeId) = observers.RecordStateChange(rangeId, acquired); Task.CompletedTask monitored @@ -72,6 +75,7 @@ type [] ChangeFeedProcessor private () = return! processor.StartAsync() } let shutdown () = task { try do! processor.StopAsync() with _ -> () - (observers : IDisposable).Dispose() // Stop the ingesters do! estimateAndLog CancellationToken.None } - Propulsion.PipelineFactory.PrepareSource2(log, startup, shutdown) + // On shutdown, Readers waiting for Ingestion capacity need to be released + let stopIngesters = (observers : IDisposable).Dispose + Propulsion.PipelineFactory.PrepareSource2(log, startup, shutdown, stopIngesters) diff --git a/src/Propulsion.CosmosStore/FeedObserver.fs b/src/Propulsion.CosmosStore/FeedObserver.fs index 7bdaa2ff..7bba42f7 100644 --- a/src/Propulsion.CosmosStore/FeedObserver.fs +++ b/src/Propulsion.CosmosStore/FeedObserver.fs @@ -160,8 +160,8 @@ type internal Observers<'Items>(log: Serilog.ILogger, processorName, buildObserv member _.LogStart(leaseAcquireInterval: TimeSpan, leaseTtl: TimeSpan, leaseRenewInterval: TimeSpan, feedPollInterval: TimeSpan, startFromTail: bool, ?maxItems) = log.Information("ChangeFeed {processorName} Lease acquire {leaseAcquireIntervalS:n0}s ttl {ttlS:n0}s renew {renewS:n0}s feedPollInterval {feedPollIntervalS:n0}s Items limit {maxItems} fromTail {fromTail}", processorName, leaseAcquireInterval.TotalSeconds, leaseTtl.TotalSeconds, leaseRenewInterval.TotalSeconds, feedPollInterval.TotalSeconds, Option.toNullable maxItems, startFromTail) - member _.LogReaderExn(rangeId: int, ex: exn) = - log.Error(ex, "ChangeFeed {processorName}/{partition} error", processorName, rangeId) + member _.LogReaderExn(rangeId: int, ex: exn, isNoise: bool) = + log.Write((if isNoise then LogEventLevel.Debug else LogEventLevel.Error), ex, "ChangeFeed {processorName}/{partition} error", processorName, rangeId) member _.LogHandlerExn(rangeId: int, ex: exn) = log.Error(ex, "ChangeFeed {processorName}/{partition} Handler Threw", processorName, rangeId) member _.Ingest(context, docs, checkpoint, ct) = diff --git a/src/Propulsion.Feed/FeedReader.fs b/src/Propulsion.Feed/FeedReader.fs index 6c5af1b5..5c69945d 100644 --- a/src/Propulsion.Feed/FeedReader.fs +++ b/src/Propulsion.Feed/FeedReader.fs @@ -166,7 +166,7 @@ type FeedReader do! submitPage (readLatency, batch) currentPos <- batch.checkpoint lastWasTail <- batch.isTail - if stopAtTail then + if not ct.IsCancellationRequested && stopAtTail then stats.EnteringShutdown() let! struct (cur, max) = ingester.AwaitCompleted() stats.ShutdownCompleted(cur, max) } diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion.Feed/FeedSource.fs index 56f5110c..b08a4520 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion.Feed/FeedSource.fs @@ -26,7 +26,7 @@ type FeedSourceBase internal try let! pos = checkpoints.Start(sourceId, trancheId, establishOrigin = (establishOrigin |> Option.map establishTrancheOrigin), ct = ct) reader.LogPartitionStarting(pos) return! reader.Pump(pos, ct) - with//:? System.Threading.Tasks.TaskCanceledException when ct.IsCancellationRequested -> () + with:? System.Threading.Tasks.TaskCanceledException | :? OperationCanceledException -> () | Exception.Log reader.LogPartitionExn () -> () finally ingester.Stop() } @@ -66,7 +66,8 @@ type FeedSourceBase internal /// Would be protected if that existed - derived types are expected to use this in implementing a parameterless `Start()` member x.Start(pump): SourcePipeline = - let machine, triggerStop, outcomeTask = PipelineFactory.PrepareSource(log, pump) + let stopIngesters () = for i, _ in partitions do i.Stop() + let machine, triggerStop, outcomeTask = PipelineFactory.PrepareSource(log, pump, stopIngesters) let monitor = lazy FeedMonitor(log, positions.Current, sink, fun () -> outcomeTask.IsCompleted) new SourcePipeline<_, _>(Task.run machine, x.Checkpoint, triggerStop, monitor) diff --git a/src/Propulsion/Pipeline.fs b/src/Propulsion/Pipeline.fs index f674d1e0..e37dd2cb 100755 --- a/src/Propulsion/Pipeline.fs +++ b/src/Propulsion/Pipeline.fs @@ -48,7 +48,7 @@ type SinkPipeline<'Ingester> internal (task: Task, triggerStop, startInges type [] PipelineFactory private () = - static member PrepareSource(log: Serilog.ILogger, pump: CancellationToken -> Task) = + static member PrepareSource(log: Serilog.ILogger, pump: CancellationToken -> Task, markCompleted: unit -> unit) = let ct, stop = let cts = new System.Threading.CancellationTokenSource() cts.Token, fun disposing -> @@ -58,16 +58,19 @@ type [] PipelineFactory private () = let inner, outcomeTask, markCompleted = let tcs = System.Threading.Tasks.TaskCompletionSource() - let markCompleted () = tcs.TrySetResult () |> ignore + let markCompleted () = + markCompleted () + tcs.TrySetResult () |> ignore let recordExn (e: exn) = tcs.TrySetException e |> ignore let inner () = task { try do! pump ct - // If the source completes all reading cleanly, declare completion - log.Information "Source drained..." + // If the source completes all reading cleanly, convey that fact () + if not ct.IsCancellationRequested then log.Information "Source drained..." markCompleted () with e -> + let level = if ct.IsCancellationRequested then LogEventLevel.Debug else LogEventLevel.Warning // first exception from a supervised task becomes the outcome if that happens - log.Warning(e, "Exception encountered while running source, exiting loop") + log.Write(level, e, "Exception encountered while running source, exiting loop") recordExn e return! tcs.Task } inner, tcs.Task, markCompleted @@ -81,7 +84,7 @@ type [] PipelineFactory private () = finally log.Information "Source stopped" } machine, stop, outcomeTask - static member PrepareSource2(log: Serilog.ILogger, startup: CancellationToken -> Task, shutdown: unit -> Task) = + static member PrepareSource2(log: Serilog.ILogger, startup: CancellationToken -> Task, shutdown: unit -> Task, markCompleted) = let ct, stop = let cts = new System.Threading.CancellationTokenSource() cts.Token, fun disposing -> @@ -91,7 +94,9 @@ type [] PipelineFactory private () = let outcomeTask, markCompleted = let tcs = System.Threading.Tasks.TaskCompletionSource() - let markCompleted () = tcs.TrySetResult () |> ignore + let markCompleted () = + markCompleted () + tcs.TrySetResult () |> ignore tcs.Task, markCompleted let machine () = task {