Skip to content

Commit

Permalink
Add comments re exceptions protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jan 30, 2024
1 parent 00bb0a0 commit 59a2076
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<PackageProjectUrl>https://github.com/jet/propulsion</PackageProjectUrl>
<PackageTags>fsharp eventsourcing cosmosdb changefeedprocessor dynamodb equinox eventstoredb messagedb kafka</PackageTags>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<Copyright>Copyright © 2018-23</Copyright>
<Copyright>Copyright © 2018-24</Copyright>

<!-- Global Project config flags -->
<WarningLevel>5</WarningLevel>
Expand Down
1 change: 0 additions & 1 deletion src/Propulsion/Parallel.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ module Scheduling =
/// Single instance per system; coordinates the dispatching of work, subject to the maxDop concurrent processors constraint
/// Semaphore is allocated on queueing, deallocated on completion of the processing
type Dispatcher(maxDop) =
// Using a Queue as a) the ordering is more correct, favoring more important work b) we are adding from many threads so no value in ConcurrentBag's thread-affinity
let tryWrite, wait, apply =
let c = Channel.unboundedSwSr<_> in let r, w = c.Reader, c.Writer
w.TryWrite, Channel.awaitRead r, Channel.apply r
Expand Down
7 changes: 4 additions & 3 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,6 @@ module Dispatcher =
let result = Event<'R>()
let dop = Sem maxDop

// NOTE this obviously depends on the passed computation never throwing, or we'd leak dop
let runHandler struct (computation: CancellationToken -> Task<'R>, ct) = task {
let! res = computation ct
dop.Release()
Expand All @@ -886,7 +885,8 @@ module Dispatcher =
member _.State = dop.State
member _.HasCapacity = dop.HasCapacity
member _.AwaitButRelease(ct) = dop.WaitButRelease(ct)
member _.TryAdd(item) = dop.TryTake() && tryWrite item
// NOTE computation is required/trusted to have an outer catch (or results would not be posted and dop would leak)
member _.TryAdd(computation) = dop.TryTake() && tryWrite computation

member _.Pump(ct: CancellationToken) = task {
while not ct.IsCancellationRequested do
Expand Down Expand Up @@ -926,6 +926,7 @@ module Dispatcher =
interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P,'E> -> struct (int64 voption * Result<'R, 'E>)) =
static member Create
( maxDop,
// NOTE `project` must not throw under any circumstances, or the exception will go unobserved, and DOP will leak in the dispatcher
project: FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task<Result<'P, 'E>>,
interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>)) =
let project struct (startTs, item: Scheduling.Item<'F>) (ct: CancellationToken) = task {
Expand Down Expand Up @@ -955,7 +956,7 @@ module Dispatcher =
/// Implementation of IDispatcher that allows a supplied handler select work and declare completion based on arbitrarily defined criteria
type Batched<'F>
( select: Func<Scheduling.Item<'F> seq, Scheduling.Item<'F>[]>,
// NOTE Handler must not throw under any circumstances, or the exception will go unobserved
// NOTE `handle` must not throw under any circumstances, or the exception will go unobserved
handle: Scheduling.Item<'F>[] -> CancellationToken ->
Task<Scheduling.InternalRes<Result<struct (StreamSpan.Metrics * int64), struct (StreamSpan.Metrics * exn)>>[]>) =
let inner = DopDispatcher 1
Expand Down

0 comments on commit 59a2076

Please sign in to comment.