Skip to content

Commit

Permalink
Extract runWithCancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 25, 2023
1 parent 5ddddec commit 1b4221b
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 16 deletions.
7 changes: 4 additions & 3 deletions src/Propulsion/Ingestion.fs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ type Ingester<'Items> private
while applyIncoming handleIncoming || applyMessages stats.Handle do ()
stats.RecordCycle()
if stats.Interval.IfDueRestart() then let struct (active, max) = maxRead.State in stats.DumpStats(active, max)
use cts = CancellationTokenSource.CreateLinkedTokenSource(ct)
do! Task.WhenAny(awaitIncoming cts.Token, awaitMessage cts.Token, Task.Delay(stats.Interval.RemainingMs, cts.Token)) :> Task
cts.Cancel() }
let startWaits ct = [| awaitIncoming ct :> Task
awaitMessage ct
Task.Delay(stats.Interval.RemainingMs, ct) |]
do! Task.runWithCancellation ct (fun ct -> Task.WhenAny(startWaits ct)) }

/// Starts an independent Task that handles
/// a) `unpack`ing of `incoming` items
Expand Down
5 changes: 5 additions & 0 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ module Task =
let inline Catch (t : Task<'t>) = task { try let! r = t in return Ok r with e -> return Error e }
let private parallel_ maxDop ct (xs: seq<CancellationToken -> Task<'t>>) : Task<'t []> =
Async.Parallel(xs |> Seq.map Async.call, ?maxDegreeOfParallelism = match maxDop with 0 -> None | x -> Some x) |> Async.executeAsTask ct
/// Runs an inner task with a dedicated Linked Token Source. Cancels via the ct upon completion, before Disposing the LCTS
let inline runWithCancellation (ct: CancellationToken) ([<InlineIfLambda>]f: CancellationToken -> Task) = task {
use cts = CancellationTokenSource.CreateLinkedTokenSource(ct) // https://stackoverflow.com/questions/6960520/when-to-dispose-cancellationtokensource
try do! f cts.Token
finally cts.Cancel() }
let parallelLimit maxDop ct xs : Task<'t []> =
parallel_ maxDop ct xs
let sequential ct xs : Task<'t []> =
Expand Down
13 changes: 5 additions & 8 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -770,14 +770,11 @@ module Scheduling =
let waitForIncomingBatches = hasCapacity
let waitForDispatcherCapacity = not hasCapacity && not wakeForResults
let sleepTs = Stopwatch.timestamp ()
use cts = CancellationTokenSource.CreateLinkedTokenSource(ct)
let wakeConditions : Task[] = [|
if wakeForResults then awaitResults cts.Token
elif waitForDispatcherCapacity then dispatcher.AwaitCapacity(cts.Token)
if waitForIncomingBatches then awaitPending cts.Token
Task.Delay(sleepIntervalMs, cts.Token) |]
do! Task.WhenAny(wakeConditions) :> Task
cts.Cancel()
let startWaits ct = [| if wakeForResults then awaitResults ct :> Task
elif waitForDispatcherCapacity then dispatcher.AwaitCapacity(ct)
if waitForIncomingBatches then awaitPending ct
Task.Delay(sleepIntervalMs, ct) |]
do! Task.runWithCancellation ct (fun ct -> Task.WhenAny(startWaits ct))
t.RecordSleep sleepTs
// 4. Record completion state once per iteration; dumping streams is expensive so needs to be done infrequently
let dispatcherState = if not hasCapacity then Full elif idle then Idle else Active
Expand Down
9 changes: 4 additions & 5 deletions src/Propulsion/Submission.fs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,10 @@ type SubmissionEngine<'P, 'M, 'S, 'B when 'P : equality>
while tryPropagate () |> shouldLoop do ()
stats.RecordCycle()
if stats.Interval.IfDueRestart() then stats.Dump(queueStats)
use cts = CancellationTokenSource.CreateLinkedTokenSource(ct)
do! Task.WhenAny[| if awaitCapacity then waitToSubmitBatch cts.Token
awaitIncoming cts.Token :> Task
Task.Delay(stats.Interval.RemainingMs, cts.Token) |] :> Task
cts.Cancel() }
let startWaits ct = [| if awaitCapacity then waitToSubmitBatch ct
awaitIncoming ct
Task.Delay(stats.Interval.RemainingMs, ct) |]
do! Task.runWithCancellation ct (fun ct -> Task.WhenAny(startWaits ct)) }

/// Supplies a set of Batches for holding and forwarding to scheduler at the right time
member _.Ingest(items : Batch<'P, 'M>[]) =
Expand Down

0 comments on commit 1b4221b

Please sign in to comment.