Skip to content

Commit

Permalink
feat(Tool): Add JSON file support
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 19, 2024
1 parent a9fba70 commit e2d2670
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.SqlStreamStore`: Added `startFromTail` [#173](https://github.com/jet/propulsion/pull/173)
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
- `Propulsion.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#142](https://github.com/jet/propulsion/pull/142) :pray: [@brihadish](https://github.com/brihadish)
- `Propulsion.Tool`: `project` supports `json` source option [#250](https://github.com/jet/propulsion/pull/250)

### Changed

Expand Down
18 changes: 18 additions & 0 deletions tools/Propulsion.Tool/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,21 @@ module Mdb =
let checkpointStore = x.CreateCheckpointStore("nil")
do! checkpointStore.CreateSchemaIfNotExists(?ct = ct)
Log.Information("Table created") }

module Json =

type [<NoEquality; NoComparison>] Parameters =
| [<AltCommandLine "-f"; Mandatory; MainCommand>] Path of filename: string
| [<AltCommandLine "-s"; Unique>] Skip of lines: int
| [<AltCommandLine "-eof"; Unique>] Truncate of lines: int
| [<AltCommandLine "-pos"; Unique>] LineNo of int
interface IArgParserTemplate with
member p.Usage = p |> function
| Path _ -> "specify file path"
| Skip _ -> "specify number of lines to skip"
| Truncate _ -> "specify line number to pretend is End of File"
| LineNo _ -> "specify line number to start (1-based)"
and Arguments(c: Configuration, p: ParseResults<Parameters>) =
member val Filepath = p.GetResult Path
member val Skip = p.TryGetResult(LineNo, fun l -> l - 1) |> Option.defaultWith (fun () -> p.GetResult(Skip, 0))
member val Trunc = p.TryGetResult Truncate
36 changes: 36 additions & 0 deletions tools/Propulsion.Tool/CosmosDumpSource.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace Propulsion.Tool

open FSharp.Control
open Propulsion.Feed
open Propulsion.Internal

/// <summary>Parses CR separated file with items dumped from a Cosmos Container containing Equinox Items</summary>
/// <remarks>The recommended process is via the Equinox tool's eqx query mechanism <br/>
/// But any alternate way way that yields the full JSON will also work /// e.g. the cosmic tool at https://github.com/creyke/Cosmic <br/>
/// dotnet tool install -g cosmic <br/>
/// # then connect/select db per https://github.com/creyke/Cosmic#basic-usage <br/>
/// cosmic query 'select * from c order by c._ts' > file.out <br/>
/// </remarks>
///
type [<Sealed; AbstractClass>] CosmosDumpSource private () =

static member Start(log, statsInterval, filePath, skip, parseFeedDoc, sink, ?truncateTo) =
let isNonCommentLine (line: string) = System.Text.RegularExpressions.Regex.IsMatch(line, "^\s*#") |> not
let truncate = match truncateTo with Some count -> Seq.truncate count | None -> id
let lines = Seq.append (System.IO.File.ReadLines filePath |> truncate) (Seq.singleton null) // Add a trailing EOF sentinel so checkpoint positions can be line numbers even when finished reading
let crawl _ _ _ = taskSeq {
let mutable i = 0
for line in lines do
i <- i + 1
let isEof = line = null
if isEof || (i >= skip && isNonCommentLine line) then
let lineNo = int64 i + 1L
try let items = if isEof then Array.empty
else System.Text.Json.JsonDocument.Parse line |> parseFeedDoc |> Seq.toArray
struct (System.TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Core.Batch<_>))
with e -> raise <| exn($"File Parse error on L{lineNo}: '{line.Substring(0, 200)}'", e) }
let source =
let checkpointStore = Equinox.MemoryStore.VolatileStore()
let checkpoints = ReaderCheckpoint.MemoryStore.create log ("consumerGroup", TimeSpan.minutes 1) checkpointStore
Propulsion.Feed.Core.SinglePassFeedSource(log, statsInterval, SourceId.parse filePath, crawl, checkpoints, sink, string)
source.Start(fun _ct -> task { return [| TrancheId.parse "0" |] })
5 changes: 3 additions & 2 deletions tools/Propulsion.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type Arguments(c: Args.Configuration, p: ParseResults<Parameters>) =

let isExpectedShutdownSignalException: exn -> bool = function
| :? ArguParseException -> true // Via Arguments.Parse and/or Configuration.tryGet
| :? System.Threading.Tasks.TaskCanceledException -> true // via AwaitKeyboardInterruptAsTaskCanceledException
| :? System.Threading.Tasks.TaskCanceledException -> true // via AwaitKeyboardInterruptAsTaskCanceledException or Project.eofSignalException
| _ -> false

[<EntryPoint>]
Expand All @@ -110,5 +110,6 @@ let main argv =
try a.ExecuteSubCommand() |> Async.RunSynchronously; 0
with e when not (isExpectedShutdownSignalException e) -> Log.Fatal(e, "Exiting"); 2
finally Log.CloseAndFlush()
with :? ArguParseException as e -> eprintfn $"%s{e.Message}"; 1
with x when x = Project.eofSignalException -> printfn "Processing COMPLETE"; 0
| :? ArguParseException as e -> eprintfn $"%s{e.Message}"; 1
| e -> eprintfn $"EXCEPTION: %s{e.Message}"; 1
47 changes: 33 additions & 14 deletions tools/Propulsion.Tool/Project.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ open Propulsion.Internal
open Serilog

type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
| [<AltCommandLine "-g"; Mandatory>] ConsumerGroupName of string
| [<AltCommandLine "-g"; Unique>] ConsumerGroupName of string
| [<AltCommandLine "-r"; Unique>] MaxReadAhead of int
| [<AltCommandLine "-w"; Unique>] MaxWriters of int
| [<AltCommandLine "-Z"; Unique>] FromTail
| [<AltCommandLine "-F"; Unique>] Follow
| [<AltCommandLine "-b"; Unique>] MaxItems of int
| [<CliPrefix(CliPrefix.None); Last>] Stats of ParseResults<SourceParameters>
| [<CliPrefix(CliPrefix.None); Last>] Kafka of ParseResults<KafkaParameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| ConsumerGroupName _ -> "Projector instance context name."
| ConsumerGroupName _ -> "Projector instance context name. Optional if source is JSON"
| MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: File: 32768 Other: 2."
| MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8 (Sync: 16)."
| FromTail -> "(iff fresh projection) - force starting from present Position. Default: Ensure each and every event is projected from the start."
| Follow -> "Stop when the Tail is reached."
| MaxItems _ -> "Controls checkpointing granularity by adjusting the batch size being loaded from the feed. Default: Unlimited"
Expand All @@ -33,28 +37,32 @@ and [<NoEquality; NoComparison; RequireSubcommand>] SourceParameters =
| [<CliPrefix(CliPrefix.None); Last; Unique>] Cosmos of ParseResults<Args.Cosmos.Parameters>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Dynamo of ParseResults<Args.Dynamo.Parameters>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Mdb of ParseResults<Args.Mdb.Parameters>
| [<CliPrefix(CliPrefix.None); Last; Unique>] Json of ParseResults<Args.Json.Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| Cosmos _ -> "Specify CosmosDB parameters."
| Dynamo _ -> "Specify DynamoDB parameters."
| Mdb _ -> "Specify MessageDb parameters."
| Json _ -> "Specify JSON file parameters."
type [<NoEquality; NoComparison>] SourceArgs =
| Cosmos of Args.Cosmos.Arguments
| Dynamo of Args.Dynamo.Arguments
| Mdb of Args.Mdb.Arguments
| Json of Args.Json.Arguments
type KafkaArguments(c: Args.Configuration, p: ParseResults<KafkaParameters>) =
member val Broker = p.GetResult(Broker, fun () -> c.KafkaBroker)
member val Topic = p.GetResult(Topic, fun () -> c.KafkaTopic)
member val Source = SourceArguments(c, p.GetResult KafkaParameters.Source)
member val Broker = p.GetResult(Broker, fun () -> c.KafkaBroker)
member val Topic = p.GetResult(Topic, fun () -> c.KafkaTopic)
member val Source = SourceArguments(c, p.GetResult KafkaParameters.Source)
and SourceArguments(c, p: ParseResults<SourceParameters>) =
member val StoreArgs =
match p.GetSubCommand() with
| SourceParameters.Cosmos p -> Cosmos (Args.Cosmos.Arguments (c, p))
| SourceParameters.Dynamo p -> Dynamo (Args.Dynamo.Arguments (c, p))
| SourceParameters.Mdb p -> Mdb (Args.Mdb.Arguments (c, p))
| SourceParameters.Cosmos p -> Cosmos (Args.Cosmos.Arguments (c, p))
| SourceParameters.Dynamo p -> Dynamo (Args.Dynamo.Arguments (c, p))
| SourceParameters.Mdb p -> Mdb (Args.Mdb.Arguments (c, p))
| SourceParameters.Json p -> Json (Args.Json.Arguments (c, p))

type Arguments(c, p: ParseResults<Parameters>) =
member val IdleDelay = TimeSpan.ms 10.
member val IdleDelay = TimeSpan.ms 10.
member val StoreArgs =
match p.GetSubCommand() with
| Kafka a -> KafkaArguments(c, a).Source.StoreArgs
Expand All @@ -70,14 +78,21 @@ type Stats(statsInterval, statesInterval, logExternalStats) =
base.DumpStats()
logExternalStats Log.Logger

let eofSignalException = System.Threading.Tasks.TaskCanceledException "Stopping; FeedMonitor wait completed"
let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
let a = Arguments(c, p)
let dumpStoreStats =
match a.StoreArgs with
| Cosmos _ -> Equinox.CosmosStore.Core.Log.InternalMetrics.dump
| Dynamo _ -> Equinox.DynamoStore.Core.Log.InternalMetrics.dump
| Mdb _ -> ignore
let group, startFromTail, follow, maxItems = p.GetResult ConsumerGroupName, p.Contains FromTail, p.Contains Follow, p.TryGetResult MaxItems
| Json _ -> ignore
let group =
match p.TryGetResult ConsumerGroupName, a.StoreArgs with
| Some x, _ -> x
| None, Json _ -> System.Guid.NewGuid() |> _.ToString("N")
| None, _ -> p.Raise "ConsumerGroupName is mandatory, unless consuming from a JSON file"
let startFromTail, follow, maxItems = p.Contains FromTail, p.Contains Follow, p.TryGetResult MaxItems
let producer =
match p.GetSubCommand() with
| Kafka a ->
Expand All @@ -88,22 +103,24 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
Some p
| Stats _ -> None
| x -> p.Raise $"unexpected subcommand %A{x}"
let isFileSource = match a.StoreArgs with Json _ -> true | _ -> true
let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 2)
let maxConcurrentProcessors = p.GetResult(MaxWriters, 8)
let stats = Stats(TimeSpan.minutes 1., TimeSpan.minutes 5., logExternalStats = dumpStoreStats)
let sink =
let maxReadAhead, maxConcurrentStreams = 2, 16
let handle (stream: FsCodec.StreamName) (span: Propulsion.Sinks.Event[]) = async {
match producer with
| None -> ()
| Some producer ->
let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream span |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
do! producer.ProduceAsync(FsCodec.StreamName.toString stream, json) |> Async.Ignore
return Propulsion.Sinks.AllProcessed, () }
Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentStreams, handle, stats, idleDelay = a.IdleDelay)
Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle, stats, idleDelay = a.IdleDelay)
let source =
let parseFeedDoc = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream (fun _sn -> true)
match a.StoreArgs with
| Cosmos sa ->
let monitored, leases = sa.ConnectFeed() |> Async.RunSynchronously
let parseFeedDoc = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream (fun _sn -> true)
Propulsion.CosmosStore.CosmosStoreSource(
Log.Logger, stats.StatsInterval, monitored, leases, group, parseFeedDoc, sink,
startFromTail = startFromTail, ?maxItems = maxItems, ?lagEstimationInterval = sa.MaybeLogLagInterval
Expand All @@ -127,6 +144,8 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
client, defaultArg maxItems 100, TimeSpan.seconds 0.5,
checkpoints, sink, categories
).Start()
| Json sa ->
CosmosDumpSource.Start(Log.Logger, stats.StatsInterval, sa.Filepath, sa.Skip, parseFeedDoc, sink, ?truncateTo = sa.Trunc)

let work = [
Async.AwaitKeyboardInterruptAsTaskCanceledException()
Expand All @@ -138,6 +157,6 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
source.Stop()
do! source.Await() // Let it emit the stats
do! source.Flush() |> Async.Ignore<Propulsion.Feed.TranchePositions> // flush checkpoints (currently a no-op)
raise <| System.Threading.Tasks.TaskCanceledException "Stopping; FeedMonitor wait completed" } // trigger tear down of sibling waits
raise eofSignalException } // trigger tear down of sibling waits
sink.AwaitWithStopOnCancellation() ]
return! work |> Async.Parallel |> Async.Ignore<unit[]> }
2 changes: 2 additions & 0 deletions tools/Propulsion.Tool/Propulsion.Tool.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Args.fs" />
<Compile Include="CosmosDumpSource.fs" />
<Compile Include="Project.fs" />
<Compile Include="Program.fs" />
</ItemGroup>
Expand All @@ -23,6 +24,7 @@
<ProjectReference Include="..\..\src\Propulsion.CosmosStore\Propulsion.CosmosStore.fsproj" />
<ProjectReference Include="..\..\src\Propulsion.DynamoStore\Propulsion.DynamoStore.fsproj" />
<ProjectReference Include="..\..\src\Propulsion.MessageDb\Propulsion.MessageDb.fsproj" />
<ProjectReference Include="..\..\src\Propulsion.MemoryStore\Propulsion.MemoryStore.fsproj" />
<ProjectReference Include="..\..\src\Propulsion.Kafka\Propulsion.Kafka.fsproj" />
</ItemGroup>

Expand Down

0 comments on commit e2d2670

Please sign in to comment.