Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DynamoDB #321

Merged
merged 56 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
c4b5f28
tmp
bartelink Mar 10, 2022
574a050
Relax netstandard requirements
bartelink Mar 17, 2022
ecea707
Tidy
bartelink Apr 13, 2022
199124f
Add DynamoDb stub Docker
bartelink Mar 15, 2022
0300998
ddb wip
bartelink Mar 15, 2022
df1e68b
wip
bartelink Mar 16, 2022
e6e977e
Port initialization logic
bartelink Mar 21, 2022
3629a37
Port Tip reading, metrics
bartelink Mar 22, 2022
493bb03
Port more querying
bartelink Mar 22, 2022
30c3548
Port connection logic
bartelink Mar 23, 2022
86504f4
Generalize store integration tests
bartelink Mar 23, 2022
a3f2f51
Basic Sync impl
bartelink Mar 25, 2022
ca07dfe
Add Dynamo wiring and docs
bartelink Mar 30, 2022
5db074b
Polish docs
bartelink Mar 30, 2022
5b0f3f4
Overdue credit to @pierregoudjo
bartelink Mar 30, 2022
91e94be
Remove Resync, Exp.Any
bartelink Mar 30, 2022
08f52ed
Tidy
bartelink Mar 30, 2022
d9f1065
Remove Tip vs Batch
bartelink Mar 31, 2022
5ac19c6
Implement calving
bartelink Mar 31, 2022
ea31665
Fix expectations re Split algorithm
bartelink Apr 2, 2022
e35da67
Collect RCs
bartelink Apr 2, 2022
639fead
Add Dynamo temp tables fixture
bartelink Apr 3, 2022
e404c7c
Clarify secondary/fallback/2 naming
bartelink Apr 4, 2022
32f40df
Update to FSAWSDDB 0.9.4-beta
bartelink Apr 8, 2022
5e963db
Target FSAWSDDB 0.10.1-beta
bartelink Apr 12, 2022
36443a8
make maxEvents optional
bartelink Apr 12, 2022
62e098f
Add DynamoStore.Prometheus
bartelink Apr 13, 2022
0b2a0d0
Tidy
bartelink Apr 13, 2022
6a3512c
Split Schema; add 'a' field
bartelink Apr 15, 2022
e3f9fdf
Fix message
bartelink Apr 18, 2022
b131634
Optimize Event Encoding
bartelink Apr 18, 2022
e165224
Surface (and fix) RUs
bartelink Apr 18, 2022
e3146b7
Streaming init support
bartelink Apr 19, 2022
07d2139
Port prune, get
bartelink Apr 19, 2022
52e0f86
Tiny tweaks
bartelink Apr 26, 2022
1f55a93
Remove initialization path
bartelink Apr 27, 2022
d871b3f
Separate read/write RU stats
bartelink Apr 29, 2022
5321752
Polish Tip logging
bartelink Apr 28, 2022
3c79c8b
Fix RU breakdown
bartelink May 3, 2022
8c52b08
Using Actual Transactions
bartelink May 3, 2022
596de86
Switch EventBody to ReadOnlyMemory
bartelink May 4, 2022
ecdfae2
Minor Formatting
bartelink May 5, 2022
ed63357
Finesse Compression/Serialization
bartelink May 6, 2022
20b0461
Fix RU expectation
bartelink May 9, 2022
e37ed98
Push down Sync logic
bartelink May 9, 2022
3bb00b3
Cleanup retries/timeout
bartelink May 9, 2022
0c069f2
Split read vs write RUs for Cosmos too
bartelink May 9, 2022
eb93241
Tiny tidy
bartelink May 9, 2022
cd96221
Finalize encoding
bartelink May 11, 2022
724232a
Tidy
bartelink May 11, 2022
d946132
Add "rut" tag to metrics
bartelink May 11, 2022
9950521
Shift t back into e to avoid empty rows
bartelink May 11, 2022
1f2e24e
Use FsCodec 3
bartelink May 11, 2022
5643eb7
Tidy changelog
bartelink May 12, 2022
95fe47d
More READMEing
bartelink May 12, 2022
f2f5691
Ref binary for FSharp.AWS.DynamoDB
bartelink May 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

.DS_Store

# DynamoDb local simulator data directory
docker-dynamodblocal-data/

# ephemeral EventStoreDB certs created via `docker compose up`
certs/

Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `Equinox`: `Decider.Transact(interpret : 'state -> Async<'event list>)` [#314](https://github.com/jet/equinox/pull/314)
- `CosmosStore.Prometheus`: Add `rut` tag to enable filtering/grouping by Read vs Write activity as per `DynamoDB` [#321](https://github.com/jet/equinox/pull/321)
- `DynamoDb`/`DynamoDb.Prometheus`: Implements the majority of the `CosmosStore` functionality via `FSharp.AWS.DynamoDB` [#321](https://github.com/jet/equinox/pull/321)
- `EventStoreDb`: As per `EventStore` module, but using the modern `EventStore.Client.Grpc.Streams` client [#196](https://github.com/jet/equinox/pull/196)

### Changed
Expand Down
18 changes: 18 additions & 0 deletions Equinox.sln
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.EventStoreDb", "src
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.EventStoreDb.Integration", "tests\Equinox.EventStoreDb.Integration\Equinox.EventStoreDb.Integration.fsproj", "{BA63048B-3CA3-448D-A4CD-0C772D57B6F8}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore", "src\Equinox.DynamoStore\Equinox.DynamoStore.fsproj", "{E04E86B4-4E35-4AC9-8D8F-B01297484FC1}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore.Integration", "tests\Equinox.DynamoStore.Integration\Equinox.DynamoStore.Integration.fsproj", "{2C8FCD63-4A3C-4EA6-88E0-E0F287B0F47A}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore.Prometheus", "src\Equinox.DynamoStore.Prometheus\Equinox.DynamoStore.Prometheus.fsproj", "{A9AF41B3-AB28-4296-B4A4-B90DA7821476}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -209,6 +215,18 @@ Global
{BA63048B-3CA3-448D-A4CD-0C772D57B6F8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BA63048B-3CA3-448D-A4CD-0C772D57B6F8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BA63048B-3CA3-448D-A4CD-0C772D57B6F8}.Release|Any CPU.Build.0 = Release|Any CPU
{E04E86B4-4E35-4AC9-8D8F-B01297484FC1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E04E86B4-4E35-4AC9-8D8F-B01297484FC1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E04E86B4-4E35-4AC9-8D8F-B01297484FC1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E04E86B4-4E35-4AC9-8D8F-B01297484FC1}.Release|Any CPU.Build.0 = Release|Any CPU
{2C8FCD63-4A3C-4EA6-88E0-E0F287B0F47A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2C8FCD63-4A3C-4EA6-88E0-E0F287B0F47A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2C8FCD63-4A3C-4EA6-88E0-E0F287B0F47A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2C8FCD63-4A3C-4EA6-88E0-E0F287B0F47A}.Release|Any CPU.Build.0 = Release|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
76 changes: 70 additions & 6 deletions README.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions build.proj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
<Exec Command="dotnet pack src/Equinox.Core $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.CosmosStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.CosmosStore.Prometheus $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.DynamoStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.DynamoStore.Prometheus $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.EventStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.EventStoreDb $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.MemoryStore $(Cfg) $(PackOptions)" />
Expand Down
23 changes: 23 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,29 @@ services:
clusternetwork:
ipv4_address: 172.30.240.13

dynamodb-local:
image: amazon/dynamodb-local
container_name: dynamodb-local
hostname: dynamodb-local
restart: always
volumes:
- ./docker-dynamodblocal-data:/home/dynamodblocal/data
ports:
- 8000:8000
command: "-jar DynamoDBLocal.jar -sharedDb -dbPath /home/dynamodblocal/data/"

dynamodb-admin:
image: aaronshaf/dynamodb-admin
ports:
- "8001:8001"
environment:
DYNAMO_ENDPOINT: "http://dynamodb-local:8000"
AWS_REGION: "us-west-2"
AWS_ACCESS_KEY_ID: local
AWS_SECRET_ACCESS_KEY: local
depends_on:
- dynamodb-local

networks:
clusternetwork:
name: eventstoredb.local
Expand Down
3 changes: 2 additions & 1 deletion samples/Infrastructure/Infrastructure.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<ProjectReference Include="..\..\src\Equinox.SqlStreamStore.MySql\Equinox.SqlStreamStore.MySql.fsproj" />
<ProjectReference Include="..\..\src\Equinox.SqlStreamStore.Postgres\Equinox.SqlStreamStore.Postgres.fsproj" />
<ProjectReference Include="..\..\src\Equinox.CosmosStore\Equinox.CosmosStore.fsproj" />
<ProjectReference Include="..\..\src\Equinox.DynamoStore\Equinox.DynamoStore.fsproj" />
<ProjectReference Include="..\..\src\Equinox.EventStoreDb\Equinox.EventStoreDb.fsproj" />
<ProjectReference Include="..\..\src\Equinox.MemoryStore\Equinox.MemoryStore.fsproj" />
<ProjectReference Include="..\Store\Domain\Domain.fsproj" />
Expand All @@ -25,7 +26,7 @@
<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="FSharp.Core" Version="4.5.4" />
<PackageReference Include="FSharp.Core" Version="4.7.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="5.1.1" />
Expand Down
3 changes: 3 additions & 0 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type StreamResolver(storage) =
| Storage.StorageConfig.Cosmos (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy).Resolve
| Storage.StorageConfig.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.DynamoStore.AccessStrategy.Snapshot snapshot else Equinox.DynamoStore.AccessStrategy.Unoptimized
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, codec, fold, initial, caching, accessStrategy).Resolve
| Storage.StorageConfig.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStoreDb.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Expand Down
64 changes: 64 additions & 0 deletions samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type StorageConfig =
// For MemoryStore, we keep the events as UTF8 arrays - we could use FsCodec.Codec.Box to remove the JSON encoding, which would improve perf but can conceal problems
| Memory of Equinox.MemoryStore.VolatileStore<ReadOnlyMemory<byte>>
| Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.CosmosStore.CachingStrategy * unfolds: bool
| Dynamo of Equinox.DynamoStore.DynamoStoreContext * Equinox.DynamoStore.CachingStrategy * unfolds: bool
| Es of Equinox.EventStoreDb.EventStoreContext * Equinox.EventStoreDb.CachingStrategy option * unfolds: bool
| Sql of Equinox.SqlStreamStore.SqlStreamStoreContext * Equinox.SqlStreamStore.CachingStrategy option * unfolds: bool

Expand Down Expand Up @@ -125,6 +126,69 @@ module Cosmos =
let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
StorageConfig.Cosmos (context, cacheStrategy, unfolds)

module Dynamo =

open Equinox.DynamoStore
let [<Literal>] SERVICE_URL = "EQUINOX_DYNAMO_SERVICE_URL"
let [<Literal>] ACCESS_KEY = "EQUINOX_DYNAMO_ACCESS_KEY_ID"
let [<Literal>] SECRET_KEY = "EQUINOX_DYNAMO_SECRET_ACCESS_KEY"
let [<Literal>] TABLE = "EQUINOX_DYNAMO_TABLE"
type [<NoEquality; NoComparison>] Arguments =
| [<AltCommandLine "-V">] VerboseStore
| [<AltCommandLine "-s">] ServiceUrl of string
| [<AltCommandLine "-sa">] AccessKey of string
| [<AltCommandLine "-ss">] SecretKey of string
| [<AltCommandLine "-t">] Table of string
| [<AltCommandLine "-ta">] ArchiveTable of string
| [<AltCommandLine "-r">] Retries of int
| [<AltCommandLine "-rt">] RetriesTimeoutS of float
| [<AltCommandLine "-tb">] TipMaxBytes of int
| [<AltCommandLine "-te">] TipMaxEvents of int
| [<AltCommandLine "-b">] QueryMaxItems of int
interface IArgParserTemplate with
member a.Usage = a |> function
| VerboseStore -> "Include low level Store logging."
| ServiceUrl _ -> "specify a server endpoint for a Dynamo account. (optional if environment variable " + SERVICE_URL + " specified)"
| AccessKey _ -> "specify an access key id for a Dynamo account. (optional if environment variable " + ACCESS_KEY + " specified)"
| SecretKey _ -> "specify a secret access key for a Dynamo account. (optional if environment variable " + SECRET_KEY + " specified)"
| Table _ -> "specify a table name for the primary store. (optional if environment variable " + TABLE + " specified)"
| ArchiveTable _ -> "specify a table name for the Archive. Default: Do not attempt to look in an Archive store as a Fallback to locate pruned events."
| Retries _ -> "specify operation retries (default: 1)."
| RetriesTimeoutS _ -> "specify max wait-time including retries in seconds (default: 5)"
| TipMaxBytes _ -> "specify maximum number of bytes to hold in Tip before calving off to a frozen Batch. Default: 32K"
| TipMaxEvents _ -> "specify maximum number of events to hold in Tip before calving off to a frozen Batch. Default: limited by Max Bytes"
| QueryMaxItems _ -> "specify maximum number of batches of events to retrieve in per query response. Default: 10"
type Info(args : ParseResults<Arguments>) =
let serviceUrl = args.TryGetResult ServiceUrl |> defaultWithEnvVar SERVICE_URL "ServiceUrl"
let accessKey = args.TryGetResult AccessKey |> defaultWithEnvVar ACCESS_KEY "AccessKey"
let secretKey = args.TryGetResult SecretKey |> defaultWithEnvVar SECRET_KEY "SecretKey"
let retries = args.GetResult(Retries, 1)
let timeout = args.GetResult(RetriesTimeoutS, 5.) |> TimeSpan.FromSeconds
member val Connector = DynamoStoreConnector(serviceUrl, accessKey, secretKey, retries, timeout)

member val Table = args.TryGetResult Table |> defaultWithEnvVar TABLE "Table"
member val ArchiveTable = args.TryGetResult ArchiveTable

member x.TipMaxEvents = args.TryGetResult TipMaxEvents
member x.TipMaxBytes = args.GetResult(TipMaxBytes, 32 * 1024)
member x.QueryMaxItems = args.GetResult(QueryMaxItems, 10)

let logTable (log: ILogger) endpoint role table =
log.Information("DynamoDB {name:l} {endpoint} Table {table}", role, endpoint, table)
let createStoreClient (log : ILogger) (a : Info) =
let client = a.Connector.CreateClient()
let storeClient = DynamoStoreClient(client, a.Table, ?archiveTableName = a.ArchiveTable)
logTable log a.Connector.Endpoint "Primary" a.Table
match a.ArchiveTable with None -> () | Some at -> logTable log a.Connector.Endpoint "Archive" at
storeClient
let config (log : ILogger) (cache, unfolds) (a : Info) =
let storeClient = createStoreClient log a
log.Information("DynamoStore Max Events in Tip: {maxTipBytes}b {maxTipEvents}e Query Limit: {queryMaxItems} items",
a.TipMaxBytes, a.TipMaxEvents, a.QueryMaxItems)
let context = DynamoStoreContext(storeClient, maxBytes = a.TipMaxBytes, queryMaxItems = a.QueryMaxItems, ?tipMaxEvents = a.TipMaxEvents)
let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
StorageConfig.Dynamo (context, cacheStrategy, unfolds)

/// To establish a local node to run the tests against, follow https://developers.eventstore.com/server/v21.10/installation.html#use-docker-compose
/// and/or do `docker compose up` in github.com/jet/equinox
module EventStore =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/AsAt.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// - the same general point applies to over-using querying of streams for read purposes as we do here;
// applying CQRS principles can often lead to a better model regardless of raw necessity

#if !LOCAL
#if LOCAL
// Compile Tutorial.fsproj by either a) right-clicking or b) typing
// dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter
#if VISUALSTUDIO
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/FulfilmentCenter.fsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if !LOCAL
#if LOCAL
#I "bin/Debug/net6.0/"
#r "Serilog.dll"
#r "Serilog.Sinks.Console.dll"
Expand Down
6 changes: 6 additions & 0 deletions samples/Web/Startup.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Arguments =
| [<AltCommandLine "-U">] Unfolds
| [<CliPrefix(CliPrefix.None); Last>] Memory of ParseResults<Storage.MemoryStore.Arguments>
| [<CliPrefix(CliPrefix.None); Last>] Cosmos of ParseResults<Storage.Cosmos.Arguments>
| [<CliPrefix(CliPrefix.None); Last>] Dynamo of ParseResults<Storage.Dynamo.Arguments>
| [<CliPrefix(CliPrefix.None); Last>] Es of ParseResults<Storage.EventStore.Arguments>
| [<CliPrefix(CliPrefix.None); Last; AltCommandLine "ms">] MsSql of ParseResults<Storage.Sql.Ms.Arguments>
| [<CliPrefix(CliPrefix.None); Last; AltCommandLine "my">] MySql of ParseResults<Storage.Sql.My.Arguments>
Expand All @@ -27,6 +28,7 @@ type Arguments =
| Cached -> "employ a 50MB cache."
| Unfolds -> "employ a store-appropriate Rolling Snapshots and/or Unfolding strategy."
| Cosmos _ -> "specify storage in CosmosDB (--help for options)."
| Dynamo _ -> "specify storage in DynamoDB (--help for options)."
| Es _ -> "specify storage in EventStore (--help for options)."
| Memory _ -> "specify In-Memory Volatile Store (Default store)."
| MsSql _ -> "specify storage in Sql Server (--help for options)."
Expand Down Expand Up @@ -68,6 +70,10 @@ type Startup() =
let storeLog = createStoreLog <| sargs.Contains Storage.Cosmos.Arguments.VerboseStore
log.Information("CosmosDB Storage options: {options:l}", options)
Storage.Cosmos.config log (cache, unfolds) (Storage.Cosmos.Info sargs), storeLog
| Some (Dynamo sargs) ->
let storeLog = createStoreLog <| sargs.Contains Storage.Dynamo.Arguments.VerboseStore
log.Information("DynamoDB Storage options: {options:l}", options)
Storage.Dynamo.config log (cache, unfolds) (Storage.Dynamo.Info sargs), storeLog
| Some (Es sargs) ->
let storeLog = createStoreLog <| sargs.Contains Storage.EventStore.Arguments.VerboseStore
log.Information("EventStoreDB Storage options: {options:l}", options)
Expand Down
Loading