Skip to content

Commit

Permalink
feat(Tool)!: Add sync cosmos from cosmos|json (#252)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 20, 2024
1 parent 01d0e88 commit 599754c
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 111 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Feed`: `Checkpoint` enables committing progress (and obtaining the achieved positions) without stopping the Sink [#162](https://github.com/jet/propulsion/pull/162)
- `Feed.SinglePassFeedSource`: Coordinates reads of a set of tranches until each reaches its Tail [#179](https://github.com/jet/propulsion/pull/179)
- `Streams.Stats.abendThreshold`: Abends Sink's processing with a `HealthCheckException` if a stream continually fails to progress, or continually errors with non-transient exceptions [#246](https://github.com/jet/propulsion/pull/246)
- `StreamFilter`: Generic logic for filtering source events based on Category name, Stream name or Event type [#252](https://github.com/jet/propulsion/pull/252)
- `Ingester, Sinks`: Expose optional `ingesterStateInterval` and `commitInterval` control on Sink factories [#154](https://github.com/jet/propulsion/pull/154) [#239](https://github.com/jet/propulsion/pull/239)
- `Scheduler`: Split out stats re `rateLimited` and `timedOut` vs `exceptions` [#194](https://github.com/jet/propulsion/pull/194)
- `Scheduler`: Added `index`, `eventType` to error logging [#237](https://github.com/jet/propulsion/pull/237)
Expand All @@ -27,8 +28,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.MemoryStore`: `MemoryStoreSource` to align with other sources for integration testing. Includes *deterministic* `AwaitCompletion` as per `Propulsion.Feed`-based Sources [#165](https://github.com/jet/propulsion/pull/165)
- `Propulsion.SqlStreamStore`: Added `startFromTail` [#173](https://github.com/jet/propulsion/pull/173)
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
- `Propulsion.Tool`: `sync <kafka|stats>` supports `from json` source option [#250](https://github.com/jet/propulsion/pull/250)
- `Propulsion.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#142](https://github.com/jet/propulsion/pull/142) :pray: [@brihadish](https://github.com/brihadish)
- `Propulsion.Tool`: `project` supports `json` source option [#250](https://github.com/jet/propulsion/pull/250)
- `Propulsion.Tool`: `sync cosmos from <cosmos|json>` [#252](https://github.com/jet/propulsion/pull/252)

### Changed

Expand All @@ -51,6 +53,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.EventStoreDb.EventStoreSource`: Changed API to match`Propulsion.SqlStreamStore` API rather than`Propulsion.EventStore` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.Feed`,`Kafka`: Replaced `Async` with `task` for supervision [#158](https://github.com/jet/propulsion/pull/158), [#159](https://github.com/jet/propulsion/pull/159)
- `Propulsion.Kafka`: Target `FsCodec.NewtonsoftJson` v `3.0.0` [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.Tool`: `project` renamed to `sync`; sources now have a `from` prefix [#252](https://github.com/jet/propulsion/pull/252)

### Removed

Expand Down
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ If you're looking for a good discussion forum on these kinds of topics, look no
1. `FeedSource`: Handles continual reading and checkpointing of events from a set of feeds ('tranches') of a 'source' that collectively represent a change data capture source for a given system (roughly analogous to how a CosmosDB Container presents a changefeed). A `readTranches` function is used to identify the Tranches (sub-feeds) on startup. The Feed Source then operates a logical reader thread per Tranche. Tranches represent content as an incrementally retrievable change feed consisting of batches of `FsCodec.ITimelineEvent` records. Each batch has an optional associated checkpointing callback that's triggered only when the Sink has handled all events within it.
2. `Monitor.AwaitCompletion`: Enables efficient waiting for completion of reaction processing within an integration test.
3. `PeriodicSource`: Handles regular crawling of an external datasource (such as a SQL database) where there is no way to save progress and then resume from that saved token (based on either the intrinsic properties of the data, or of the store itself). The source is expected to present its content as an `IAsyncEnumerable` of `FsCodec.StreamName * FsCodec.IEventData * context`. Checkpointing occurs only when all events have been deemed handled by the Sink.
4. `SinglePassFeedSource`: Handles single pass loading of large datasets (such as a SQL database), completing when the full data has been ingested.
5. `Prometheus`: Exposes reading statistics to Prometheus (including metrics from `DynamoStore.DynamoStoreSource`, `EventStoreDb.EventStoreSource`, `MessageDb.MessageDbSource` and `SqlStreamStore.SqlStreamStoreSource`). (NOTE all other statistics relating to processing throughput and latency etc are exposed from the Scheduler component on the Sink side)
4. `JsonSource`: Simple source that feeds items from a File containing JSON (such a file can be generated via `eqx query -o JSONFILE from cosmos` etc)
5. `SinglePassFeedSource`: Handles single pass loading of large datasets (such as a SQL database), completing when the full data has been ingested.
6. `Prometheus`: Exposes reading statistics to Prometheus (including metrics from `DynamoStore.DynamoStoreSource`, `EventStoreDb.EventStoreSource`, `MessageDb.MessageDbSource` and `SqlStreamStore.SqlStreamStoreSource`). (NOTE all other statistics relating to processing throughput and latency etc are exposed from the Scheduler component on the Sink side)

- `Propulsion.MemoryStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MemoryStore.svg)](https://www.nuget.org/packages/Propulsion.MemoryStore/). Provides bindings to `Equinox.MemoryStore`. [Depends](https://www.fuget.org/packages/Propulsion.MemoryStore) on `Equinox.MemoryStore` v `4.0.0`, `FsCodec.Box`, `Propulsion`

Expand Down Expand Up @@ -152,7 +153,7 @@ adjusting package references while retaining source compatibility to the maximum

- [Propulsion+Equinox templates](https://github.com/jet/dotnet-templates#producerreactor-templates-combining-usage-of-equinox-and-propulsion):
- `eqxShipping`: Event-sourced example with a Process Manager. Includes a `Watchdog` component that uses a `StreamsSink`, with example wiring for `CosmosStore`, `DynamoStore` and `EventStoreDb`
- `proCosmosReactor`. single-source `StreamsSink` based Reactor. More legible version of `proReactor` template, currently only supports `Propulsion.CosmosStore`
- `proIndexer`. single-source `StreamsSink` based Reactor. More legible version of `proReactor` template, currently only supports `Propulsion.CosmosStore`, and provides some specific extensions such as updating snapshots.
- `proReactor` generic template, supporting multiple sources and multiple processing modes
- `summaryConsumer` consumes from the output of a `proReactor --kafka`, saving them in an `Equinox.CosmosStore` store
- `trackingConsumer` consumes from Kafka, feeding into example Ingester logic in an `Equinox.CosmosStore` store
Expand Down Expand Up @@ -199,17 +200,17 @@ The relevant pieces of the above break down as follows, when we emphasize the [C

```powershell
dotnet tool uninstall Propulsion.Tool -g
dotnet tool install Propulsion.Tool -g
dotnet tool install Propulsion.Tool -g --prerelease
propulsion init -ru 400 cosmos # generates a -aux container for the ChangeFeedProcessor to maintain consumer group progress within
# -V for verbose ChangeFeedProcessor logging
# `-g projector1` represents the consumer group - >=1 are allowed, allowing multiple independent projections to run concurrently
# stats specifies one only wants stats regarding items (other options include `kafka` to project to Kafka)
# cosmos specifies source overrides (using defaults in step 1 in this instance)
propulsion -V project -g projector1 stats cosmos
propulsion -V sync -g projector1 stats from cosmos
# load events with 2 parallel readers, detailed store logging and a read timeout of 20s
propulsion -VS project -g projector1 stats dynamo -rt 20 -d 2
propulsion -VS sync -g projector1 stats from dynamo -rt 20 -d 2
```

### 2. Use `propulsion` tool to Run a CosmosDb ChangeFeedProcessor or DynamoStoreSource projector, emitting to a Kafka topic
Expand All @@ -223,7 +224,7 @@ $env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b
# `kafka` specifies one wants to emit to Kafka
# `temp-topic` is the topic to emit to
# `cosmos` specifies source overrides (using defaults in step 1 in this instance)
propulsion -V project -g projector3 -l 5 kafka temp-topic cosmos
propulsion -V sync -g projector3 -l 5 kafka temp-topic from cosmos
```

### 3. Use `propulsion` tool to inspect DynamoStore Index
Expand Down
5 changes: 5 additions & 0 deletions src/Propulsion.CosmosStore/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ module MemoryStore =
let cat = MemoryStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial)
let resolve = Equinox.Decider.forStream log cat
Service(Stream.id >> resolve, consumerGroupName, defaultCheckpointFrequency)

let createNull () =
let checkpointStore = VolatileStore()
create Serilog.Log.Logger ("consumerGroup", TimeSpan.minutes 1) checkpointStore

#else
let private defaultCacheDuration = TimeSpan.FromMinutes 20.
#if COSMOSV3
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
namespace Propulsion.Tool
namespace Propulsion.Feed

open FSharp.Control
open Propulsion.Feed
open Propulsion.Internal

/// <summary>Parses CR separated file with items dumped from a Cosmos Container containing Equinox Items</summary>
/// <remarks>The recommended process is via the Equinox tool's eqx query mechanism <br/>
/// But any alternate way way that yields the full JSON will also work /// e.g. the cosmic tool at https://github.com/creyke/Cosmic <br/>
/// <summary>Parses CR separated file with items dumped from a Cosmos Container containing Equinox Items<br/>
/// Such items can be extracted via Equinox.Tool via <c>eqx query -o JSONFILE cosmos</c>.</summary>
/// <remarks>Any alternate way way that yields the full JSON will also work /// e.g. the cosmic tool at https://github.com/creyke/Cosmic <br/>
/// dotnet tool install -g cosmic <br/>
/// # then connect/select db per https://github.com/creyke/Cosmic#basic-usage <br/>
/// cosmic query 'select * from c order by c._ts' > file.out <br/>
/// </remarks>
///
type [<Sealed; AbstractClass>] CosmosDumpSource private () =
type [<Sealed; AbstractClass>] JsonSource private () =

static member Start(log, statsInterval, filePath, skip, parseFeedDoc, sink, ?truncateTo) =
static member Start(log, statsInterval, filePath, skip, parseFeedDoc, checkpoints, sink, ?truncateTo) =
let isNonCommentLine (line: string) = System.Text.RegularExpressions.Regex.IsMatch(line, "^\s*#") |> not
let truncate = match truncateTo with Some count -> Seq.truncate count | None -> id
let lines = Seq.append (System.IO.File.ReadLines filePath |> truncate) (Seq.singleton null) // Add a trailing EOF sentinel so checkpoint positions can be line numbers even when finished reading
Expand All @@ -29,8 +26,5 @@ type [<Sealed; AbstractClass>] CosmosDumpSource private () =
else System.Text.Json.JsonDocument.Parse line |> parseFeedDoc |> Seq.toArray
struct (System.TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Core.Batch<_>))
with e -> raise <| exn($"File Parse error on L{lineNo}: '{line.Substring(0, 200)}'", e) }
let source =
let checkpointStore = Equinox.MemoryStore.VolatileStore()
let checkpoints = ReaderCheckpoint.MemoryStore.create log ("consumerGroup", TimeSpan.minutes 1) checkpointStore
Propulsion.Feed.Core.SinglePassFeedSource(log, statsInterval, SourceId.parse filePath, crawl, checkpoints, sink, string)
let source = Propulsion.Feed.Core.SinglePassFeedSource(log, statsInterval, SourceId.parse filePath, crawl, checkpoints, sink, string)
source.Start(fun _ct -> task { return [| TrancheId.parse "0" |] })
1 change: 1 addition & 0 deletions src/Propulsion.Feed/Propulsion.Feed.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<ItemGroup>
<Compile Include="FeedReader.fs" />
<Compile Include="FeedSource.fs" />
<Compile Include="JsonSource.fs" />
<Compile Include="PeriodicSource.fs" />
<Compile Include="FeedPrometheus.fs" />
</ItemGroup>
Expand Down
7 changes: 7 additions & 0 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ module ValueOption =

module Seq =

let partition predicate xs =
let ham = ResizeArray()
let spam = ResizeArray()
for x in xs do
if predicate x then ham.Add x
else spam.Add x
ham.ToArray(), spam.ToArray()
let tryPickV f (xs: _ seq) =
use e = xs.GetEnumerator()
let mutable res = ValueNone
Expand Down
1 change: 1 addition & 0 deletions src/Propulsion/Propulsion.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<Compile Include="Feed.fs" />
<Compile Include="FeedMonitor.fs" />
<Compile Include="PropulsionPrometheus.fs" />
<Compile Include="StreamFilter.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
45 changes: 45 additions & 0 deletions src/Propulsion/StreamFilter.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
namespace Propulsion

open System.Runtime.InteropServices
open Propulsion.Internal

type StreamFilter([<Optional>] allowCats, [<Optional>] denyCats, [<Optional>] allowSns, [<Optional>] denySns,
[<Optional>] allowEts, [<Optional>] denyEts,
[<Optional; DefaultParameterValue(false)>] ?incIndexes,
[<Optional; DefaultParameterValue(null)>] ?log) =
let log = defaultArg log Serilog.Log.Logger
let defA x = match x with null -> Array.empty | xs -> Seq.toArray xs

let allowCats, denyCats, incIndexes = defA allowCats, defA denyCats, defaultArg incIndexes false
let allowSns, denySns = defA allowSns, defA denySns
let allowEts, denyEts = defA allowEts, defA denyEts
let isPlain = Seq.forall (fun x -> System.Char.IsLetterOrDigit x || x = '_')
let asRe = Seq.map (fun x -> if isPlain x then $"^{x}$" else x)
let (|Filter|) exprs =
let values, pats = Seq.partition isPlain exprs
let valuesContains = let set = System.Collections.Generic.HashSet(values) in set.Contains
let aPatternMatches (x: string) = pats |> Seq.exists (fun p -> System.Text.RegularExpressions.Regex.IsMatch(x, p))
fun cat -> valuesContains cat || aPatternMatches cat
let filter map (allow, deny) =
match allow, deny with
| [||], [||] -> fun _ -> true
| Filter includes, Filter excludes -> fun x -> let x = map x in (Array.isEmpty allow || includes x) && not (excludes x)
let validStream = filter FsCodec.StreamName.toString (allowSns, denySns)
let isTransactionalStream (sn: FsCodec.StreamName) = let sn = FsCodec.StreamName.toString sn in not (sn.StartsWith('$'))

member _.CreateStreamFilter([<Optional>] maybeCategories) =
let handlerCats = defA maybeCategories
let allowCats = Array.append handlerCats allowCats
let validCat = filter FsCodec.StreamName.Category.ofStreamName (allowCats, denyCats)
let allowCats = match allowCats with [||] -> [| ".*" |] | xs -> xs
let denyCats = if incIndexes then denyCats else Array.append denyCats [| "^\$" |]
let allowSns, denySns = match allowSns, denySns with [||], [||] -> [|".*"|], [||] | x -> x
let allowEts, denyEts = match allowEts, denyEts with [||], [||] -> [|".*"|], [||] | x -> x
log.Information("Categories ☑️ {@allowCats} 🚫{@denyCats} Streams ☑️ {@allowStreams} 🚫{denyStreams} Events ☑️ {allowEts} 🚫{@denyEts}",
asRe allowCats, asRe denyCats, asRe allowSns, asRe denySns, asRe allowEts, asRe denyEts)
fun sn ->
validCat sn
&& validStream sn
&& (incIndexes || isTransactionalStream sn)

member val EventFilter = filter (fun (x: Propulsion.Sinks.Event) -> x.EventType) (allowEts, denyEts)
3 changes: 1 addition & 2 deletions tests/Propulsion.Tests/SinkHealthTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ type Scenario(testOutput) =

let log = TestOutputLogger.forTestOutputEx true testOutput

let store = Equinox.MemoryStore.VolatileStore()
let checkpoints = ReaderCheckpoint.MemoryStore.create log ("consumerGroup", TimeSpan.FromMinutes 1) store
let checkpoints = ReaderCheckpoint.MemoryStore.createNull()
let abendThreshold = TimeSpan.FromSeconds 3.
let stats = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromSeconds 2, TimeSpan.FromSeconds 10, abendThreshold = abendThreshold)
with member _.HandleOk x = ()
Expand Down
3 changes: 1 addition & 2 deletions tests/Propulsion.Tests/SourceTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ type Scenario(testOutput) =

let log = TestOutputLogger.forTestOutput testOutput

let store = Equinox.MemoryStore.VolatileStore()
let checkpoints = ReaderCheckpoint.MemoryStore.create log ("consumerGroup", TimeSpan.FromMinutes 1) store
let checkpoints = ReaderCheckpoint.MemoryStore.createNull()
let stats = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1)
with member _.HandleOk x = ()
member _.HandleExn(log, x) = () }
Expand Down
8 changes: 6 additions & 2 deletions tools/Propulsion.Tool/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,24 @@ module Cosmos =
| LagFreqM _ -> "Specify frequency to dump lag stats. Default: off"

type Arguments(c: Configuration, p: ParseResults<Parameters>) =
let connection = p.GetResult(Connection, fun () -> c.CosmosConnection)
let connector =
let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString
let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.seconds
let retries = p.GetResult(Retries, 1)
let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.seconds
let mode = p.TryGetResult ConnectionMode
Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode)
Equinox.CosmosStore.CosmosStoreConnector((connection |> Equinox.CosmosStore.Discovery.ConnectionString), timeout, retries, maxRetryWaitTime, ?mode = mode)
let databaseId = p.GetResult(Database, fun () -> c.CosmosDatabase)
let containerId = p.GetResult(Container, fun () -> c.CosmosContainer)
let leasesContainerName = p.GetResult(LeaseContainer, fun () -> containerId + p.GetResult(Suffix, "-aux"))
let checkpointInterval = TimeSpan.hours 1.
member val Connection = connection
member val Database = databaseId
member val MaybeLogLagInterval = p.TryGetResult(LagFreqM, TimeSpan.minutes)
member _.CreateLeasesContainer() = connector.CreateLeasesContainer(databaseId, leasesContainerName)
member _.ConnectFeed() = connector.ConnectFeed(databaseId, containerId, leasesContainerName)
member _.ConnectFeedReadOnly(auxClient, auxDatabase, auxContainerId) =
connector.ConnectFeedReadOnly(databaseId, containerId, auxClient, auxDatabase, auxContainerId)
member x.CreateCheckpointStore(group, cache, storeLog) = async {
let! context = connector.ConnectContext("Checkpoints", databaseId, containerId, 256)
return Propulsion.Feed.ReaderCheckpoint.CosmosStore.create storeLog (group, checkpointInterval) (context, cache) }
Expand Down
Loading

0 comments on commit 599754c

Please sign in to comment.