Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/AppendRaw.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@ module AppendRaw =
(appendVersion: AppendVersion)
(messageDetails: MessageDetails)
: Async<AppendResult> =
store.AppendToStream
(StreamId(streamName),
fromAppendVersion appendVersion,
[| newStreamMessageFromMessageDetails messageDetails |])
|> Async.AwaitTask
async.Delay(fun () ->
store.AppendToStream
(StreamId(streamName),
fromAppendVersion appendVersion,
[| newStreamMessageFromMessageDetails messageDetails |])
|> Async.AwaitTask)

let appendNewMessages (store: SqlStreamStore.IStreamStore)
(streamName: string)
(appendVersion: AppendVersion)
(messages: MessageDetails list)
: Async<AppendResult> =
store.AppendToStream
(StreamId(streamName),
fromAppendVersion appendVersion,
messages
|> List.map newStreamMessageFromMessageDetails
|> List.toArray)
|> Async.AwaitTask
async.Delay(fun () ->
store.AppendToStream
(StreamId(streamName),
fromAppendVersion appendVersion,
messages
|> List.map newStreamMessageFromMessageDetails
|> List.toArray)
|> Async.AwaitTask)
2 changes: 1 addition & 1 deletion src/Postgres.fs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ module Postgres =
new SqlStreamStore.PostgresStreamStore(SqlStreamStore.PostgresStreamStoreSettings(config))

let createSchemaRaw (store: SqlStreamStore.PostgresStreamStore): Async<unit> =
store.CreateSchemaIfNotExists() |> Async.AwaitTask
async.Delay(fun () -> store.CreateSchemaIfNotExists() |> Async.AwaitTask)

let createSchema (store: SqlStreamStore.PostgresStreamStore): Async<Result<unit, string>> =
createSchemaRaw store
Expand Down
85 changes: 46 additions & 39 deletions src/ReadRaw.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,41 @@ module ReadRaw =
(startPositionInclusive: StartPosition)
(msgCount: int)
: Async<ReadAllPage> =
match readingDirection with
| ReadingDirection.Forward -> store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount)
| ReadingDirection.Backward ->
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount)
|> Async.AwaitTask
async.Delay(fun () ->
match readingDirection with
| ReadingDirection.Forward ->
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount)
| ReadingDirection.Backward ->
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount)
|> Async.AwaitTask)

let readFromStream (store: SqlStreamStore.IStreamStore)
(readingDirection: ReadingDirection)
(streamName: string)
(readVersion: ReadVersion)
(msgCount: int)
: Async<ReadStreamPage> =
match readingDirection with
| ReadingDirection.Forward ->
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
| ReadingDirection.Backward ->
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
|> Async.AwaitTask
async.Delay(fun () ->
match readingDirection with
| ReadingDirection.Forward ->
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
| ReadingDirection.Backward ->
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
|> Async.AwaitTask)

let readFromAllStream' (store: SqlStreamStore.IStreamStore)
(readingDirection: ReadingDirection)
(startPositionInclusive: StartPosition)
(msgCount: int)
(prefetchJson: bool)
: Async<ReadAllPage> =
match readingDirection with
| ReadingDirection.Forward ->
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
| ReadingDirection.Backward ->
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
|> Async.AwaitTask
async.Delay(fun () ->
match readingDirection with
| ReadingDirection.Forward ->
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
| ReadingDirection.Backward ->
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
|> Async.AwaitTask)

let readFromStream' (store: SqlStreamStore.IStreamStore)
(readingDirection: ReadingDirection)
Expand All @@ -60,12 +64,13 @@ module ReadRaw =
(msgCount: int)
(prefetchJson: bool)
: Async<ReadStreamPage> =
match readingDirection with
| ReadingDirection.Forward ->
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
| ReadingDirection.Backward ->
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
|> Async.AwaitTask
async.Delay(fun () ->
match readingDirection with
| ReadingDirection.Forward ->
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
| ReadingDirection.Backward ->
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
|> Async.AwaitTask)

let readFromAllStream'' (store: SqlStreamStore.IStreamStore)
(readingDirection: ReadingDirection)
Expand All @@ -74,14 +79,15 @@ module ReadRaw =
(prefetchJson: bool)
(cancellationToken: CancellationToken)
: Async<ReadAllPage> =
match readingDirection with
| ReadingDirection.Forward ->
store.ReadAllForwards
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
| ReadingDirection.Backward ->
store.ReadAllBackwards
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
|> Async.AwaitTask
async.Delay(fun () ->
match readingDirection with
| ReadingDirection.Forward ->
store.ReadAllForwards
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
| ReadingDirection.Backward ->
store.ReadAllBackwards
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
|> Async.AwaitTask)

let readFromStream'' (store: SqlStreamStore.IStreamStore)
(readingDirection: ReadingDirection)
Expand All @@ -91,11 +97,12 @@ module ReadRaw =
(prefetchJson: bool)
(cancellationToken: CancellationToken)
: Async<ReadStreamPage> =
match readingDirection with
| ReadingDirection.Forward ->
store.ReadStreamForwards
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
| ReadingDirection.Backward ->
store.ReadStreamBackwards
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
|> Async.AwaitTask
async.Delay(fun () ->
match readingDirection with
| ReadingDirection.Forward ->
store.ReadStreamForwards
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
| ReadingDirection.Backward ->
store.ReadStreamBackwards
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
|> Async.AwaitTask)