Skip to content

Commit

Permalink
Target sdk 8
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jan 31, 2024
1 parent 59a2076 commit 049b7be
Show file tree
Hide file tree
Showing 18 changed files with 26 additions and 31 deletions.
5 changes: 0 additions & 5 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,5 @@ jobs:
pool:
vmImage: 'macOS-latest'
steps:
- task: UseDotNet@2
displayName: 'Install .NET Core sdk'
inputs:
packageType: sdk
version: 6.x
- script: dotnet pack build.proj
displayName: dotnet pack build.proj
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": {
"version": "6.0.300",
"version": "8.0.101",
"rollForward": "latestMajor"
}
}
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ open Equinox.CosmosStore.Core

open Propulsion.Sinks

/// Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams
/// <remarks>NOTE No attempt is made to filter out Tip (`id=-1`) batches from the ChangeFeed; Equinox versions >= 3, Tip batches can bear events.</remarks>
/// <summary>Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams.</summary>
/// <remarks>NOTE No attempt is made to filter out Tip (`id=-1`) batches from the ChangeFeed, as in Equinox versions >= 3, Tip batches can bear events.</remarks>
[<RequireQualifiedAccess>]
#if !COSMOSV3
module EquinoxSystemTextJsonParser =
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.DynamoStore/AppendsEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ module Events =
let next (x: Events.StreamSpan) = int x.i + x.c.Length
/// Aggregates all spans per stream into a single Span from the lowest index to the highest
let flatten: Events.StreamSpan seq -> Events.StreamSpan seq =
Seq.groupBy (fun x -> x.p)
Seq.groupBy _.p
>> Seq.map (fun (p, xs) ->
let mutable i = -1L
let c = ResizeArray()
Expand Down Expand Up @@ -175,7 +175,7 @@ module Reader =

member _.ReadVersion(partitionId, epochId): Async<int64> =
let decider = resolve (partitionId, epochId, System.Int64.MaxValue)
decider.QueryEx(fun c -> c.Version)
decider.QueryEx _.Version

module Factory =

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/DynamoDbExport.fs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Importer(buffer: DynamoStoreIndex.Buffer, emit, dump) =
let _ok, _ = buffer.LogIndexed(string streamSpan.p, { i = int streamSpan.i; c = streamSpan.c })
pending.Remove(string streamSpan.p) |> ignore
totalIngestedSpans <- totalIngestedSpans + batch.LongLength
return pending.Values |> Seq.sumBy (fun x -> x.c.Length) }
return pending.Values |> Seq.sumBy _.c.Length }

/// Ingest a file worth of data, flushing whenever we've accumulated enough pending data to be written
member _.IngestDynamoDbJsonFile(file, bufferedEventsFlushThreshold) = async {
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/DynamoStoreIndex.fs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ module Reader =
let! maybeStreamBytes, _version, state = epochs.Read(partitionId, epochId, 0)
let sizeB, loadS = defaultValueArg maybeStreamBytes 0L, Stopwatch.elapsedSeconds ts
let spans = state.changes |> Array.collect (fun struct (_i, spans) -> spans)
let totalEvents = spans |> Array.sumBy (fun x -> x.c.Length)
let totalEvents = spans |> Array.sumBy _.c.Length
let totalStreams = spans |> AppendsEpoch.flatten |> Seq.length
log.Information("Epoch {epochId} {totalE} events {totalS} streams ({spans} spans, {batches} batches, {k:n3} MiB) {loadS:n1}s",
string epochId, totalEvents, totalStreams, spans.Length, state.changes.Length, Log.miB sizeB, loadS)
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ module private Impl =
sw.Stop()
let totalStreams, chosenEvents, totalEvents, streamEvents =
let all = state.changes |> Seq.collect (fun struct (_i, xs) -> xs) |> AppendsEpoch.flatten |> Array.ofSeq
let totalEvents = all |> Array.sumBy (fun x -> x.c.Length)
let totalEvents = all |> Array.sumBy _.c.Length
let mutable chosenEvents = 0
let chooseStream (span: AppendsEpoch.Events.StreamSpan) =
match maybeLoad (IndexStreamId.toStreamName span.p) (span.i, span.c) with
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStoreDb/EventStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module private Impl =
let readBatch withData batchSize streamFilter (store: EventStoreClient) pos ct = task {
let pos = let p = pos |> Propulsion.Feed.Position.toInt64 |> uint64 in Position(p, p)
let res = store.ReadAllAsync(Direction.Forwards, pos, batchSize, withData, cancellationToken = ct)
let! batch = res |> TaskSeq.map (fun e -> e.Event) |> TaskSeq.toArrayAsync
let! batch = res |> TaskSeq.map _.Event |> TaskSeq.toArrayAsync
return ({ checkpoint = checkpointPos batch; items = toItems streamFilter batch; isTail = batch.LongLength <> batchSize }: Propulsion.Feed.Core.Batch<_>) }

// @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.MemoryStore/MemoryStoreLogger.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ let renderSubmit (log: Serilog.ILogger) struct (epoch, categoryName, streamId, e
if (not << log.IsEnabled) Serilog.Events.LogEventLevel.Debug then log
elif typedefof<'F> <> typeof<ReadOnlyMemory<byte>> then log
else log |> propEventJsonUtf8 "Json" (unbox events)
let types = events |> Seq.map (fun e -> e.EventType)
let types = events |> Seq.map _.EventType
log.ForContext("types", types).Debug("Submit #{epoch} {categoryName}-{streamId}x{count}", epoch, categoryName, streamId, events.Length)
elif log.IsEnabled Serilog.Events.LogEventLevel.Debug then
let types = seq { for e in events -> e.EventType } |> Seq.truncate 5
Expand Down
6 changes: 3 additions & 3 deletions src/Propulsion/FeedMonitor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ and FeedMonitor(log: Serilog.ILogger, fetchPositions: unit -> struct (TrancheId
let logInterval = IntervalTimer logInterval
let logWaitStatusUpdateNow () =
let current = fetchPositions ()
let currentRead, completed = current |> choose (fun v -> v.ReadPos), current |> choose (fun v -> v.CompletedPos)
let currentRead, completed = current |> choose _.ReadPos, current |> choose _.CompletedPos
match waitMode with
| OriginalWorkOnly -> log.Information("FeedMonitor {totalTime:n1}s Awaiting Started {starting} Completed {completed}",
sw.ElapsedSeconds, startReadPositions, completed)
Expand All @@ -89,7 +89,7 @@ and FeedMonitor(log: Serilog.ILogger, fetchPositions: unit -> struct (TrancheId
let busy () =
let current = fetchPositions ()
match waitMode with
| OriginalWorkOnly -> let completed = current |> choose (fun v -> v.CompletedPos)
| OriginalWorkOnly -> let completed = current |> choose _.CompletedPos
let trancheCompletedPos = System.Collections.Generic.Dictionary(completed |> Seq.map ValueTuple.toKvp)
let startPosStillPendingCompletion trancheStartPos trancheId =
match trancheCompletedPos.TryGetValue trancheId with
Expand Down Expand Up @@ -145,7 +145,7 @@ and FeedMonitor(log: Serilog.ILogger, fetchPositions: unit -> struct (TrancheId
| xs when Array.any xs && requireTail && xs |> Array.forall (ValueTuple.snd >> TranchePosition.isDrained) ->
xs |> choose (fun v -> v.ReadPos |> orDummyValue)
| xs when xs |> Array.forall (fun struct (_, v) -> TranchePosition.isEmpty v && (not requireTail || v.IsTail)) -> Array.empty
| originals -> originals |> choose (fun v -> v.ReadPos)
| originals -> originals |> choose _.ReadPos
match! awaitPropagation sleep propagationDelay activeTranches ct with
| [||] ->
if propagationDelay = TimeSpan.Zero then log.Debug("FeedSource Wait Skipped; no processing pending. Completed {completed}", currentCompleted)
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ module Stats =
let emit = logStatsPadded log keys
let summary =
cats.Values
|> Seq.collect (fun x -> x.All)
|> Seq.collect _.All
|> Seq.groupBy ValueTuple.fst
|> Seq.map (fun (g, xs) -> struct (g, Seq.sumBy ValueTuple.snd xs))
|> Seq.sortByDescending ValueTuple.snd
Expand All @@ -254,7 +254,7 @@ module Stats =

open MathNet.Numerics.Statistics
let private logLatencyPercentiles (log: Serilog.ILogger) (label: string) (xs: TimeSpan seq) =
let sortedLatencies = xs |> Seq.map (fun ts -> ts.TotalSeconds) |> Seq.sort |> Seq.toArray
let sortedLatencies = xs |> Seq.map _.TotalSeconds |> Seq.sort |> Seq.toArray

let pc p = SortedArrayStatistics.Percentile(sortedLatencies, p) |> TimeSpan.FromSeconds
let l = {
Expand Down Expand Up @@ -310,9 +310,9 @@ module Stats =
if buckets.Count <> 0 then
let clusters = buckets |> Seq.groupBy (fun kv -> bucketGroup kv.Key) |> Seq.sortBy fst |> Seq.toArray
let emit = logLatencyPercentilesPadded log (clusters |> Seq.map fst)
totalLabel |> Option.iter (fun l -> emit l (buckets |> Seq.collect (fun kv -> kv.Value)))
totalLabel |> Option.iter (fun l -> emit l (buckets |> Seq.collect _.Value))
for name, items in clusters do
emit name (items |> Seq.collect (fun kv -> kv.Value))
emit name (items |> Seq.collect _.Value)
member _.Clear() = buckets.Clear()

type LogEventLevel = Serilog.Events.LogEventLevel
Expand Down
6 changes: 3 additions & 3 deletions src/Propulsion/Parallel.fs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ module Scheduling =
log.Information("Scheduler {cycles} cycles Started {startedBatches}b {startedItems}i Completed {completedBatches}b {completedItems}i latency {completedLatency:f1}ms Ready {readyitems} Waiting {waitingBatches}b",
cycles, statsTotal startedB, statsTotal startedI, statsTotal completedB, totalItemsCompleted, latencyMs, waiting.Count, incoming.Count)
let active =
seq { for KeyValue(pid,q) in active -> pid, q |> Seq.sumBy (fun x -> x.remaining) }
|> Seq.filter (fun (_,snd) -> snd <> 0)
|> Seq.sortBy (fun (_,snd) -> -snd)
seq { for KeyValue (pid, q) in active -> pid, q |> Seq.sumBy _.remaining }
|> Seq.filter (fun (_, snd) -> snd <> 0)
|> Seq.sortBy (fun (_, snd) -> -snd)
log.Information("Partitions Active items {@active} Started batches {@startedBatches} items {@startedItems} Completed batches {@completedBatches} items {@completedItems}",
active, startedB, startedI, completedB, completedI)
cycles <- 0; processingDuration <- TimeSpan.Zero; startedBatches.Clear(); completedBatches.Clear(); startedItems.Clear(); completedItems.Clear()
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ module Buffer =
let sn, wp = FsCodec.StreamName.toString stream, defaultValueArg state.WritePos 0L
waitingStreams.Ingest(sprintf "%s@%dx%d" sn wp state.queue[0].Length, (sz + 512L) / 1024L)
waiting <- waiting + 1
waitingE <- waitingE + (state.queue |> Array.sumBy (fun x -> x.Length))
waitingE <- waitingE + (state.queue |> Array.sumBy _.Length)
waitingB <- waitingB + sz
let m = Log.Metric.BufferReport { cats = waitingCats.Count; streams = waiting; events = waitingE; bytes = waitingB }
(log |> Log.withMetric m).Information(" Streams Waiting {busy:n0}/{busyMb:n1}MB", waiting, Log.miB waitingB)
Expand Down
2 changes: 1 addition & 1 deletion tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ and [<AbstractClass>] ConsumerIntegration(testOutputHelper, expectConcurrentSche
// ``should have consumed all expected messages`
let unconsumed =
allMessages
|> Array.groupBy (fun msg -> msg.payload.producerId)
|> Array.groupBy _.payload.producerId
|> Array.map (fun (_, gp) -> gp |> Array.distinctBy (fun msg -> msg.payload.messageId))
|> Array.where (fun gp -> gp.Length <> messagesPerProducer)
let unconsumedCounts =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<DisableImplicitFSharpCoreReference>false</DisableImplicitFSharpCoreReference>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion tests/Propulsion.Tests/Propulsion.Tests.fsproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<DefineConstants>TRIM_FEED</DefineConstants>
</PropertyGroup>

Expand Down
2 changes: 1 addition & 1 deletion tools/Propulsion.Tool/Propulsion.Tool.fsproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<OutputType>Exe</OutputType>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>

Expand Down

0 comments on commit 049b7be

Please sign in to comment.