Skip to content

Commit

Permalink
Res
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 26, 2023
1 parent dbef2ff commit 087cd92
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 19 deletions.
10 changes: 5 additions & 5 deletions src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ type Factory private () =
select, handle : Func<Scheduling.Item<_>[], CancellationToken, Task<seq<Result<int64, exn>>>>, stats,
?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) =
let handle (items : Scheduling.Item<EventBody>[]) ct
: Task<struct (TimeSpan * FsCodec.StreamName * int64 * bool * Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>)[]> = task {
: Task<Scheduling.Res<Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>>[]> = task {
let sw = Stopwatch.start ()
let avgElapsed () =
let tot = float sw.ElapsedMilliseconds
Expand All @@ -361,16 +361,16 @@ type Factory private () =
| item, Ok index' ->
let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq
let metrics = StreamSpan.metrics Event.storedSize used
struct (ae, item.stream, Events.index item.span, true, Ok struct (index', struct (metrics, ())))
| item, Error exn ->
Scheduling.Res.create (ae, item.stream, Events.index item.span, not (Array.isEmpty used), Ok struct (index', struct (metrics, ())))
| item, Error e ->
let metrics = StreamSpan.metrics Event.renderedSize item.span
ae, item.stream, Events.index item.span, false, Result.Error struct (metrics, exn) |]
Scheduling.Item.createResE (ae, item, metrics, e) |]
with e ->
let ae = avgElapsed ()
return
[| for x in items ->
let metrics = StreamSpan.metrics Event.renderedSize x.span
ae, x.stream, Events.index x.span, false, Result.Error struct (metrics, e) |] }
Scheduling.Item.createResE (ae, x, metrics, e) |] }
let dispatcher = Dispatcher.Batched(select, handle)
let dumpStreams logStreamStates log =
logExternalState |> Option.iter (fun f -> f log)
Expand Down
37 changes: 23 additions & 14 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -645,9 +645,13 @@ module Scheduling =
if xs.MoveNext() then Seq.append (prioritizeHead f xs.Current) (collectUniqueStreams xs)
else Seq.empty

[<Struct; NoComparison; NoEquality>]
type Res<'R> = { duration: TimeSpan; stream: FsCodec.StreamName; index: int64; progressed: bool; result: 'R }
module Res = let create (d, s, i, p, r) = { duration = d; stream = s; index = i; progressed = p; result = r }

/// Defines interface between Scheduler (which owns the pending work) and the Dispatcher which periodically selects work to commence based on a policy
type IDispatcher<'P, 'R, 'E, 'F> =
[<CLIEvent>] abstract member Result : IEvent<struct (TimeSpan * FsCodec.StreamName * int64 * bool * Result<'P, 'E>)>
[<CLIEvent>] abstract member Result : IEvent<Res<Result<'P, 'E>>>
abstract member Pump : CancellationToken -> Task<unit>
abstract member State : struct (int * int)
abstract member HasCapacity : bool with get
Expand All @@ -656,7 +660,12 @@ module Scheduling =
abstract member InterpretProgress : StreamStates<'F> * FsCodec.StreamName * Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>)
and [<Struct; NoComparison; NoEquality>]
Item<'Format> = { stream: FsCodec.StreamName; nextIndex: int64 voption; span: FsCodec.ITimelineEvent<'Format>[] }

module Item =
let createResE (d, i: Item<'F>, m, e) =
Res.create (d, i.stream, StreamSpan.idx i.span, false, Error struct (m, e))
let createResO (d, i: Item<'F>, m, i') =
let index = StreamSpan.idx i.span
Res.create (d, i.stream, index, i' > index, Ok struct (i', struct (m, ())))
/// Consolidates ingested events into streams; coordinates dispatching of these to projector/ingester in the order implied by the submission order
/// a) does not itself perform any reading activities
/// b) triggers synchronous callbacks as batches complete; writing of progress is managed asynchronously by the TrancheEngine(s)
Expand Down Expand Up @@ -717,7 +726,7 @@ module Scheduling =
dispatcher.TryReplenish(candidateItems, handleStarted)

// Ingest information to be gleaned from processing the results into `streams` (i.e. remove stream requirements as they are completed)
let handleResult { duration = duration; stream = stream; index = i; progressed = p; result = r } =
let handleResult ({ duration = duration; stream = stream; index = i; progressed = p; result = r } : Res<_>) =
match dispatcher.InterpretProgress(streams, stream, r) with
| ValueSome index, Ok (r : 'R) ->
batches.MarkStreamProgress(stream, index)
Expand Down Expand Up @@ -776,7 +785,7 @@ module Scheduling =
let wakeForResults = defaultArg wakeForResults false

member _.Pump(abend, ct : CancellationToken) = task {
use _ = dispatcher.Result.Subscribe(fun struct (t, s, i, pr, r) -> writeResult { duration = t; stream = s; index = i; progressed = pr; result = r })
use _ = dispatcher.Result.Subscribe writeResult
Task.start (fun () -> task { try do! dispatcher.Pump ct
with e -> abend (AggregateException e) })
let inline ts () = Stopwatch.timestamp ()
Expand Down Expand Up @@ -855,7 +864,7 @@ module Dispatcher =

/// Kicks off enough work to fill the inner Dispatcher up to capacity
type internal ItemDispatcher<'R, 'F>(maxDop) =
let inner = DopDispatcher<struct (TimeSpan * FsCodec.StreamName * int64 * bool * 'R)>(maxDop)
let inner = DopDispatcher<Scheduling.Res<'R>>(maxDop)

// On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first
let tryFillDispatcher (potential : seq<Scheduling.Item<'F>>) markStarted project =
Expand All @@ -880,15 +889,15 @@ module Dispatcher =
/// Implementation of IDispatcher that feeds items to an item dispatcher that maximizes concurrent requests (within a limit)
type Concurrent<'P, 'R, 'E, 'F> internal
( inner : ItemDispatcher<Result<'P, 'E>, 'F>,
project : struct (int64 * Scheduling.Item<'F>) -> CancellationToken -> Task<struct (TimeSpan * FsCodec.StreamName * int64 * bool * Result<'P, 'E>)>,
project : struct (int64 * Scheduling.Item<'F>) -> CancellationToken -> Task<Scheduling.Res<Result<'P, 'E>>>,
interpretProgress : Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>)) =
static member Create
( maxDop,
project : FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task<struct (int64 * bool * Result<'P, 'E>)>,
interpretProgress) =
let project struct (startTs, item : Scheduling.Item<'F>) (ct : CancellationToken) = task {
let! struct (index, progressed, res) = project item.stream item.span ct
return struct (Stopwatch.elapsed startTs, item.stream, index, progressed, res) }
return Scheduling.Res.create (Stopwatch.elapsed startTs, item.stream, index, progressed, res) }
Concurrent<_, _, _, _>(ItemDispatcher(maxDop), project, interpretProgress)
static member Create(maxDop, prepare : Func<_, _, _>, handle : Func<_, _, CancellationToken, Task<_>>, toIndex : Func<_, 'R, int64>) =
let project sn span ct = task {
Expand All @@ -914,9 +923,9 @@ module Dispatcher =
type Batched<'F>
( select : Func<Scheduling.Item<'F> seq, Scheduling.Item<'F>[]>,
handle : Scheduling.Item<'F>[] -> CancellationToken ->
Task<struct (TimeSpan * FsCodec.StreamName * int64 * bool * Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>)[]>) =
Task<Scheduling.Res<Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>>[]>) =
let inner = DopDispatcher 1
let result = Event<struct (TimeSpan * FsCodec.StreamName * int64 * bool * Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>)>()
let result = Event<Scheduling.Res<Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>>>()

// On each iteration, we offer the ordered work queue to the selector
// we propagate the selected streams to the handler
Expand Down Expand Up @@ -1085,7 +1094,7 @@ type Batched private () =
?ingesterStatsInterval, ?requireCompleteStreams)
: Sink<Ingestion.Ingester<StreamEvent<'F> seq>> =
let handle (items : Scheduling.Item<'F>[]) ct
: Task<struct (TimeSpan * FsCodec.StreamName * int64 * bool * Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>)[]> = task {
: Task<Scheduling.Res<Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>>[]> = task {
let sw = Stopwatch.start ()
let avgElapsed () =
let tot = float sw.ElapsedMilliseconds
Expand All @@ -1098,16 +1107,16 @@ type Batched private () =
| item, Ok index' ->
let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq
let metrics = StreamSpan.metrics eventSize used
struct (ae, item.stream, StreamSpan.idx item.span, index' > item.span[0].Index, Ok struct (index', struct (metrics, ())))
| item, Error exn ->
Scheduling.Item.createResO (ae, item, metrics, index')
| item, Error e ->
let metrics = StreamSpan.metrics eventSize item.span
ae, item.stream, StreamSpan.idx item.span, false, Error struct (metrics, exn) |]
Scheduling.Item.createResE (ae, item, metrics, e) |]
with e ->
let ae = avgElapsed ()
return
[| for x in items ->
let metrics = StreamSpan.metrics eventSize x.span
ae, x.stream, StreamSpan.idx x.span, false, Error struct (metrics, e) |] }
Scheduling.Item.createResE (ae, x, metrics, e) |] }
let dispatcher = Dispatcher.Batched(select, handle)
let dumpStreams logStreamStates _log = logStreamStates eventSize
let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams,
Expand Down

0 comments on commit 087cd92

Please sign in to comment.