diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 72bbd880..6617e042 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -51,6 +51,6 @@ jobs: displayName: 'Install .NET Core sdk' inputs: packageType: sdk - version: 6.x + version: 8.x - script: dotnet pack build.proj displayName: dotnet pack build.proj diff --git a/global.json b/global.json index 0f050ba5..9bb4dcbe 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "6.0.300", + "version": "8.0.101", "rollForward": "latestMajor" } } diff --git a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs index e1a136f5..b59df5f3 100644 --- a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs +++ b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs @@ -4,8 +4,8 @@ open Equinox.CosmosStore.Core open Propulsion.Sinks -/// Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams -/// NOTE No attempt is made to filter out Tip (`id=-1`) batches from the ChangeFeed; Equinox versions >= 3, Tip batches can bear events. +/// Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams. +/// NOTE No attempt is made to filter out Tip (`id=-1`) batches from the ChangeFeed, as in Equinox versions >= 3, Tip batches can bear events. [] #if !COSMOSV3 module EquinoxSystemTextJsonParser = diff --git a/src/Propulsion.DynamoStore/AppendsEpoch.fs b/src/Propulsion.DynamoStore/AppendsEpoch.fs index 4fb361d2..ed079151 100644 --- a/src/Propulsion.DynamoStore/AppendsEpoch.fs +++ b/src/Propulsion.DynamoStore/AppendsEpoch.fs @@ -45,7 +45,7 @@ module Events = let next (x: Events.StreamSpan) = int x.i + x.c.Length /// Aggregates all spans per stream into a single Span from the lowest index to the highest let flatten: Events.StreamSpan seq -> Events.StreamSpan seq = - Seq.groupBy (fun x -> x.p) + Seq.groupBy _.p >> Seq.map (fun (p, xs) -> let mutable i = -1L let c = ResizeArray() @@ -175,7 +175,7 @@ module Reader = member _.ReadVersion(partitionId, epochId): Async = let decider = resolve (partitionId, epochId, System.Int64.MaxValue) - decider.QueryEx(fun c -> c.Version) + decider.QueryEx _.Version module Factory = diff --git a/src/Propulsion.DynamoStore/DynamoDbExport.fs b/src/Propulsion.DynamoStore/DynamoDbExport.fs index 93cf4f99..015ed9e2 100644 --- a/src/Propulsion.DynamoStore/DynamoDbExport.fs +++ b/src/Propulsion.DynamoStore/DynamoDbExport.fs @@ -47,7 +47,7 @@ type Importer(buffer: DynamoStoreIndex.Buffer, emit, dump) = let _ok, _ = buffer.LogIndexed(string streamSpan.p, { i = int streamSpan.i; c = streamSpan.c }) pending.Remove(string streamSpan.p) |> ignore totalIngestedSpans <- totalIngestedSpans + batch.LongLength - return pending.Values |> Seq.sumBy (fun x -> x.c.Length) } + return pending.Values |> Seq.sumBy _.c.Length } /// Ingest a file worth of data, flushing whenever we've accumulated enough pending data to be written member _.IngestDynamoDbJsonFile(file, bufferedEventsFlushThreshold) = async { diff --git a/src/Propulsion.DynamoStore/DynamoStoreIndex.fs b/src/Propulsion.DynamoStore/DynamoStoreIndex.fs index e213eef0..f31a591c 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreIndex.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreIndex.fs @@ -100,7 +100,7 @@ module Reader = let! maybeStreamBytes, _version, state = epochs.Read(partitionId, epochId, 0) let sizeB, loadS = defaultValueArg maybeStreamBytes 0L, Stopwatch.elapsedSeconds ts let spans = state.changes |> Array.collect (fun struct (_i, spans) -> spans) - let totalEvents = spans |> Array.sumBy (fun x -> x.c.Length) + let totalEvents = spans |> Array.sumBy _.c.Length let totalStreams = spans |> AppendsEpoch.flatten |> Seq.length log.Information("Epoch {epochId} {totalE} events {totalS} streams ({spans} spans, {batches} batches, {k:n3} MiB) {loadS:n1}s", string epochId, totalEvents, totalStreams, spans.Length, state.changes.Length, Log.miB sizeB, loadS) diff --git a/src/Propulsion.DynamoStore/DynamoStoreSource.fs b/src/Propulsion.DynamoStore/DynamoStoreSource.fs index 0ab5f3e7..96fc6b4d 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreSource.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreSource.fs @@ -50,7 +50,7 @@ module private Impl = sw.Stop() let totalStreams, chosenEvents, totalEvents, streamEvents = let all = state.changes |> Seq.collect (fun struct (_i, xs) -> xs) |> AppendsEpoch.flatten |> Array.ofSeq - let totalEvents = all |> Array.sumBy (fun x -> x.c.Length) + let totalEvents = all |> Array.sumBy _.c.Length let mutable chosenEvents = 0 let chooseStream (span: AppendsEpoch.Events.StreamSpan) = match maybeLoad (IndexStreamId.toStreamName span.p) (span.i, span.c) with diff --git a/src/Propulsion.EventStoreDb/EventStoreSource.fs b/src/Propulsion.EventStoreDb/EventStoreSource.fs index 302426d8..ae4918fb 100644 --- a/src/Propulsion.EventStoreDb/EventStoreSource.fs +++ b/src/Propulsion.EventStoreDb/EventStoreSource.fs @@ -16,7 +16,7 @@ module private Impl = let readBatch withData batchSize streamFilter (store: EventStoreClient) pos ct = task { let pos = let p = pos |> Propulsion.Feed.Position.toInt64 |> uint64 in Position(p, p) let res = store.ReadAllAsync(Direction.Forwards, pos, batchSize, withData, cancellationToken = ct) - let! batch = res |> TaskSeq.map (fun e -> e.Event) |> TaskSeq.toArrayAsync + let! batch = res |> TaskSeq.map _.Event |> TaskSeq.toArrayAsync return ({ checkpoint = checkpointPos batch; items = toItems streamFilter batch; isTail = batch.LongLength <> batchSize }: Propulsion.Feed.Core.Batch<_>) } // @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk diff --git a/src/Propulsion.MemoryStore/MemoryStoreLogger.fs b/src/Propulsion.MemoryStore/MemoryStoreLogger.fs index 3be9d71f..94586730 100644 --- a/src/Propulsion.MemoryStore/MemoryStoreLogger.fs +++ b/src/Propulsion.MemoryStore/MemoryStoreLogger.fs @@ -19,7 +19,7 @@ let renderSubmit (log: Serilog.ILogger) struct (epoch, categoryName, streamId, e if (not << log.IsEnabled) Serilog.Events.LogEventLevel.Debug then log elif typedefof<'F> <> typeof> then log else log |> propEventJsonUtf8 "Json" (unbox events) - let types = events |> Seq.map (fun e -> e.EventType) + let types = events |> Seq.map _.EventType log.ForContext("types", types).Debug("Submit #{epoch} {categoryName}-{streamId}x{count}", epoch, categoryName, streamId, events.Length) elif log.IsEnabled Serilog.Events.LogEventLevel.Debug then let types = seq { for e in events -> e.EventType } |> Seq.truncate 5 diff --git a/src/Propulsion/FeedMonitor.fs b/src/Propulsion/FeedMonitor.fs index 688be4d3..f4767ba9 100644 --- a/src/Propulsion/FeedMonitor.fs +++ b/src/Propulsion/FeedMonitor.fs @@ -77,7 +77,7 @@ and FeedMonitor(log: Serilog.ILogger, fetchPositions: unit -> struct (TrancheId let logInterval = IntervalTimer logInterval let logWaitStatusUpdateNow () = let current = fetchPositions () - let currentRead, completed = current |> choose (fun v -> v.ReadPos), current |> choose (fun v -> v.CompletedPos) + let currentRead, completed = current |> choose _.ReadPos, current |> choose _.CompletedPos match waitMode with | OriginalWorkOnly -> log.Information("FeedMonitor {totalTime:n1}s Awaiting Started {starting} Completed {completed}", sw.ElapsedSeconds, startReadPositions, completed) @@ -89,7 +89,7 @@ and FeedMonitor(log: Serilog.ILogger, fetchPositions: unit -> struct (TrancheId let busy () = let current = fetchPositions () match waitMode with - | OriginalWorkOnly -> let completed = current |> choose (fun v -> v.CompletedPos) + | OriginalWorkOnly -> let completed = current |> choose _.CompletedPos let trancheCompletedPos = System.Collections.Generic.Dictionary(completed |> Seq.map ValueTuple.toKvp) let startPosStillPendingCompletion trancheStartPos trancheId = match trancheCompletedPos.TryGetValue trancheId with @@ -145,7 +145,7 @@ and FeedMonitor(log: Serilog.ILogger, fetchPositions: unit -> struct (TrancheId | xs when Array.any xs && requireTail && xs |> Array.forall (ValueTuple.snd >> TranchePosition.isDrained) -> xs |> choose (fun v -> v.ReadPos |> orDummyValue) | xs when xs |> Array.forall (fun struct (_, v) -> TranchePosition.isEmpty v && (not requireTail || v.IsTail)) -> Array.empty - | originals -> originals |> choose (fun v -> v.ReadPos) + | originals -> originals |> choose _.ReadPos match! awaitPropagation sleep propagationDelay activeTranches ct with | [||] -> if propagationDelay = TimeSpan.Zero then log.Debug("FeedSource Wait Skipped; no processing pending. Completed {completed}", currentCompleted) diff --git a/src/Propulsion/Internal.fs b/src/Propulsion/Internal.fs index 21fb74e3..1b477dd0 100644 --- a/src/Propulsion/Internal.fs +++ b/src/Propulsion/Internal.fs @@ -234,7 +234,7 @@ module Stats = let emit = logStatsPadded log keys let summary = cats.Values - |> Seq.collect (fun x -> x.All) + |> Seq.collect _.All |> Seq.groupBy ValueTuple.fst |> Seq.map (fun (g, xs) -> struct (g, Seq.sumBy ValueTuple.snd xs)) |> Seq.sortByDescending ValueTuple.snd @@ -254,7 +254,7 @@ module Stats = open MathNet.Numerics.Statistics let private logLatencyPercentiles (log: Serilog.ILogger) (label: string) (xs: TimeSpan seq) = - let sortedLatencies = xs |> Seq.map (fun ts -> ts.TotalSeconds) |> Seq.sort |> Seq.toArray + let sortedLatencies = xs |> Seq.map _.TotalSeconds |> Seq.sort |> Seq.toArray let pc p = SortedArrayStatistics.Percentile(sortedLatencies, p) |> TimeSpan.FromSeconds let l = { @@ -310,9 +310,9 @@ module Stats = if buckets.Count <> 0 then let clusters = buckets |> Seq.groupBy (fun kv -> bucketGroup kv.Key) |> Seq.sortBy fst |> Seq.toArray let emit = logLatencyPercentilesPadded log (clusters |> Seq.map fst) - totalLabel |> Option.iter (fun l -> emit l (buckets |> Seq.collect (fun kv -> kv.Value))) + totalLabel |> Option.iter (fun l -> emit l (buckets |> Seq.collect _.Value)) for name, items in clusters do - emit name (items |> Seq.collect (fun kv -> kv.Value)) + emit name (items |> Seq.collect _.Value) member _.Clear() = buckets.Clear() type LogEventLevel = Serilog.Events.LogEventLevel diff --git a/src/Propulsion/Parallel.fs b/src/Propulsion/Parallel.fs index 62628832..33b47dd5 100755 --- a/src/Propulsion/Parallel.fs +++ b/src/Propulsion/Parallel.fs @@ -98,9 +98,9 @@ module Scheduling = log.Information("Scheduler {cycles} cycles Started {startedBatches}b {startedItems}i Completed {completedBatches}b {completedItems}i latency {completedLatency:f1}ms Ready {readyitems} Waiting {waitingBatches}b", cycles, statsTotal startedB, statsTotal startedI, statsTotal completedB, totalItemsCompleted, latencyMs, waiting.Count, incoming.Count) let active = - seq { for KeyValue(pid,q) in active -> pid, q |> Seq.sumBy (fun x -> x.remaining) } - |> Seq.filter (fun (_,snd) -> snd <> 0) - |> Seq.sortBy (fun (_,snd) -> -snd) + seq { for KeyValue (pid, q) in active -> pid, q |> Seq.sumBy _.remaining } + |> Seq.filter (fun (_, snd) -> snd <> 0) + |> Seq.sortBy (fun (_, snd) -> -snd) log.Information("Partitions Active items {@active} Started batches {@startedBatches} items {@startedItems} Completed batches {@completedBatches} items {@completedItems}", active, startedB, startedI, completedB, completedI) cycles <- 0; processingDuration <- TimeSpan.Zero; startedBatches.Clear(); completedBatches.Clear(); startedItems.Clear(); completedItems.Clear() diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 6bcbb826..e045182c 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -198,7 +198,7 @@ module Buffer = let sn, wp = FsCodec.StreamName.toString stream, defaultValueArg state.WritePos 0L waitingStreams.Ingest(sprintf "%s@%dx%d" sn wp state.queue[0].Length, (sz + 512L) / 1024L) waiting <- waiting + 1 - waitingE <- waitingE + (state.queue |> Array.sumBy (fun x -> x.Length)) + waitingE <- waitingE + (state.queue |> Array.sumBy _.Length) waitingB <- waitingB + sz let m = Log.Metric.BufferReport { cats = waitingCats.Count; streams = waiting; events = waitingE; bytes = waitingB } (log |> Log.withMetric m).Information(" Streams Waiting {busy:n0}/{busyMb:n1}MB", waiting, Log.miB waitingB) diff --git a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs index 8521363a..cd225e25 100644 --- a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs +++ b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs @@ -272,7 +272,7 @@ and [] ConsumerIntegration(testOutputHelper, expectConcurrentSche // ``should have consumed all expected messages` let unconsumed = allMessages - |> Array.groupBy (fun msg -> msg.payload.producerId) + |> Array.groupBy _.payload.producerId |> Array.map (fun (_, gp) -> gp |> Array.distinctBy (fun msg -> msg.payload.messageId)) |> Array.where (fun gp -> gp.Length <> messagesPerProducer) let unconsumedCounts = diff --git a/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj b/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj index f67727ca..74b429fb 100644 --- a/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj +++ b/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj @@ -1,7 +1,7 @@  - net6.0 + net8.0 false diff --git a/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj b/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj index 171da889..0c5a9d63 100644 --- a/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj +++ b/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj @@ -1,7 +1,7 @@ - net6.0 + net8.0 diff --git a/tests/Propulsion.Tests/Propulsion.Tests.fsproj b/tests/Propulsion.Tests/Propulsion.Tests.fsproj index 70be998d..f5117433 100644 --- a/tests/Propulsion.Tests/Propulsion.Tests.fsproj +++ b/tests/Propulsion.Tests/Propulsion.Tests.fsproj @@ -1,7 +1,7 @@  - net6.0 + net8.0 TRIM_FEED diff --git a/tools/Propulsion.Tool/Propulsion.Tool.fsproj b/tools/Propulsion.Tool/Propulsion.Tool.fsproj index f1f8fddc..4959f367 100644 --- a/tools/Propulsion.Tool/Propulsion.Tool.fsproj +++ b/tools/Propulsion.Tool/Propulsion.Tool.fsproj @@ -1,7 +1,7 @@  - net6.0 + net8.0 Exe true