Skip to content

Commit

Permalink
Estimator tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Oct 11, 2023
1 parent 03ba20f commit c6effae
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
6 changes: 3 additions & 3 deletions src/Propulsion.CosmosStore/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ type ChangeFeedProcessor =
return result.ToArray() }
fun (ct: CancellationToken) -> task {
while not ct.IsCancellationRequested do
let! leasesState = fetchEstimatorStates (fun s -> leaseTokenToPartitionId s.LeaseToken, s.EstimatedLag) ct
Array.sortInPlaceBy fst leasesState
do! lagMonitorCallback leasesState } )
let! leasesStates = fetchEstimatorStates (fun s -> struct (leaseTokenToPartitionId s.LeaseToken, s.EstimatedLag)) ct
Array.sortInPlaceBy ValueTuple.fst leasesStates
do! lagMonitorCallback leasesStates } )
let wrap (f: unit -> Task) () = task { return! f () }
SourcePipeline.Start(log, wrap processor.StartAsync, maybePumpMetrics, wrap processor.StopAsync, observer)
static member private mkLeaseOwnerIdForProcess() =
Expand Down
6 changes: 3 additions & 3 deletions src/Propulsion.CosmosStore/CosmosStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Log =
ingestLatency: TimeSpan; ingestQueued: int }
type LagMetric =
{ database: string; container: string; group: string
rangeLags: (int * int64)[] }
rangeLags: struct (int * int64)[] }
[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Metric =
| Read of ReadMetric
Expand Down Expand Up @@ -88,7 +88,7 @@ type CosmosStoreSource =
[<O; D null>] ?notifyError,
[<O; D null>] ?customize) =
let databaseId, containerId = monitored.Database.Id, monitored.Id
let logLag (interval: TimeSpan) (remainingWork: (int * int64)[]) = task {
let logLag (interval: TimeSpan) (remainingWork: struct (int * int64)[]) = task {
let mutable synced, lagged, count, total = ResizeArray(), ResizeArray(), 0, 0L
for partitionId, gap as partitionAndGap in remainingWork do
total <- total + gap
Expand All @@ -102,7 +102,7 @@ type CosmosStoreSource =
let startFromTail = defaultArg startFromTail false
let source =
ChangeFeedProcessor.Start
( log, monitored, leases, processorName, observer, ?notifyError=notifyError, ?customize=customize,
( log, monitored, leases, processorName, observer, ?notifyError = notifyError, ?customize = customize,
?maxItems = maxItems, ?feedPollDelay = tailSleepInterval, ?reportLagAndAwaitNextEstimation = maybeLogLag,
startFromTail = startFromTail,
leaseAcquireInterval = TimeSpan.FromSeconds 5., leaseRenewInterval = TimeSpan.FromSeconds 5., leaseTtl = TimeSpan.FromSeconds 10.)
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion/Pipeline.fs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Pipeline(task: Task<unit>, triggerStop) =
/// Reacts to cancellation by aborting the processing via <c>Stop()</c>; see <c>Await</c> if such semantics are not desired.
member x.AwaitWithStopOnCancellation() = async {
let! ct = Async.CancellationToken
use _ = ct.Register(fun () -> x.Stop())
use _ = ct.Register(Action x.Stop)
return! x.Await() }

static member Prepare(log: ILogger, pumpScheduler, pumpSubmitter, ?pumpIngester, ?pumpDispatcher) =
Expand Down

0 comments on commit c6effae

Please sign in to comment.