From 65f417be983032ba103641009f5296ad1126f6b3 Mon Sep 17 00:00:00 2001 From: Simon Herteby Date: Fri, 23 Oct 2020 21:40:21 +0200 Subject: [PATCH] Wrap all code that uses Tasks with async.Delay, so that it behaves properly and doesn't execute until the Async computation is run. --- src/AppendRaw.fs | 26 ++++++++------- src/Postgres.fs | 2 +- src/ReadRaw.fs | 85 ++++++++++++++++++++++++++---------------------- 3 files changed, 61 insertions(+), 52 deletions(-) diff --git a/src/AppendRaw.fs b/src/AppendRaw.fs index 60e2142..7dd6288 100644 --- a/src/AppendRaw.fs +++ b/src/AppendRaw.fs @@ -26,21 +26,23 @@ module AppendRaw = (appendVersion: AppendVersion) (messageDetails: MessageDetails) : Async = - store.AppendToStream - (StreamId(streamName), - fromAppendVersion appendVersion, - [| newStreamMessageFromMessageDetails messageDetails |]) - |> Async.AwaitTask + async.Delay(fun () -> + store.AppendToStream + (StreamId(streamName), + fromAppendVersion appendVersion, + [| newStreamMessageFromMessageDetails messageDetails |]) + |> Async.AwaitTask) let appendNewMessages (store: SqlStreamStore.IStreamStore) (streamName: string) (appendVersion: AppendVersion) (messages: MessageDetails list) : Async = - store.AppendToStream - (StreamId(streamName), - fromAppendVersion appendVersion, - messages - |> List.map newStreamMessageFromMessageDetails - |> List.toArray) - |> Async.AwaitTask + async.Delay(fun () -> + store.AppendToStream + (StreamId(streamName), + fromAppendVersion appendVersion, + messages + |> List.map newStreamMessageFromMessageDetails + |> List.toArray) + |> Async.AwaitTask) diff --git a/src/Postgres.fs b/src/Postgres.fs index 3f858b7..8e8a7ff 100644 --- a/src/Postgres.fs +++ b/src/Postgres.fs @@ -61,7 +61,7 @@ module Postgres = new SqlStreamStore.PostgresStreamStore(SqlStreamStore.PostgresStreamStoreSettings(config)) let createSchemaRaw (store: SqlStreamStore.PostgresStreamStore): Async = - store.CreateSchemaIfNotExists() |> Async.AwaitTask + async.Delay(fun () -> store.CreateSchemaIfNotExists() |> Async.AwaitTask) let createSchema (store: SqlStreamStore.PostgresStreamStore): Async> = createSchemaRaw store diff --git a/src/ReadRaw.fs b/src/ReadRaw.fs index 4c0d7c7..969a675 100644 --- a/src/ReadRaw.fs +++ b/src/ReadRaw.fs @@ -21,11 +21,13 @@ module ReadRaw = (startPositionInclusive: StartPosition) (msgCount: int) : Async = - match readingDirection with - | ReadingDirection.Forward -> store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount) - | ReadingDirection.Backward -> - store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount) - |> Async.AwaitTask + async.Delay(fun () -> + match readingDirection with + | ReadingDirection.Forward -> + store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount) + | ReadingDirection.Backward -> + store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount) + |> Async.AwaitTask) let readFromStream (store: SqlStreamStore.IStreamStore) (readingDirection: ReadingDirection) @@ -33,12 +35,13 @@ module ReadRaw = (readVersion: ReadVersion) (msgCount: int) : Async = - match readingDirection with - | ReadingDirection.Forward -> - store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount) - | ReadingDirection.Backward -> - store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount) - |> Async.AwaitTask + async.Delay(fun () -> + match readingDirection with + | ReadingDirection.Forward -> + store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount) + | ReadingDirection.Backward -> + store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount) + |> Async.AwaitTask) let readFromAllStream' (store: SqlStreamStore.IStreamStore) (readingDirection: ReadingDirection) @@ -46,12 +49,13 @@ module ReadRaw = (msgCount: int) (prefetchJson: bool) : Async = - match readingDirection with - | ReadingDirection.Forward -> - store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson) - | ReadingDirection.Backward -> - store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson) - |> Async.AwaitTask + async.Delay(fun () -> + match readingDirection with + | ReadingDirection.Forward -> + store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson) + | ReadingDirection.Backward -> + store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson) + |> Async.AwaitTask) let readFromStream' (store: SqlStreamStore.IStreamStore) (readingDirection: ReadingDirection) @@ -60,12 +64,13 @@ module ReadRaw = (msgCount: int) (prefetchJson: bool) : Async = - match readingDirection with - | ReadingDirection.Forward -> - store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson) - | ReadingDirection.Backward -> - store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson) - |> Async.AwaitTask + async.Delay(fun () -> + match readingDirection with + | ReadingDirection.Forward -> + store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson) + | ReadingDirection.Backward -> + store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson) + |> Async.AwaitTask) let readFromAllStream'' (store: SqlStreamStore.IStreamStore) (readingDirection: ReadingDirection) @@ -74,14 +79,15 @@ module ReadRaw = (prefetchJson: bool) (cancellationToken: CancellationToken) : Async = - match readingDirection with - | ReadingDirection.Forward -> - store.ReadAllForwards - (fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken) - | ReadingDirection.Backward -> - store.ReadAllBackwards - (fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken) - |> Async.AwaitTask + async.Delay(fun () -> + match readingDirection with + | ReadingDirection.Forward -> + store.ReadAllForwards + (fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken) + | ReadingDirection.Backward -> + store.ReadAllBackwards + (fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken) + |> Async.AwaitTask) let readFromStream'' (store: SqlStreamStore.IStreamStore) (readingDirection: ReadingDirection) @@ -91,11 +97,12 @@ module ReadRaw = (prefetchJson: bool) (cancellationToken: CancellationToken) : Async = - match readingDirection with - | ReadingDirection.Forward -> - store.ReadStreamForwards - (StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken) - | ReadingDirection.Backward -> - store.ReadStreamBackwards - (StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken) - |> Async.AwaitTask + async.Delay(fun () -> + match readingDirection with + | ReadingDirection.Forward -> + store.ReadStreamForwards + (StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken) + | ReadingDirection.Backward -> + store.ReadStreamBackwards + (StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken) + |> Async.AwaitTask)