Skip to content

Commit

Permalink
wip: LSQ monad
Browse files Browse the repository at this point in the history
Goals
- Make AcquireFailures impossible (for wallet API users)
- Eliminate integration test flakiness
- Reduce boilerplate

by adding a monad for composing queries to be run against any, but the
same, node tip.
  • Loading branch information
Anviking committed Jan 14, 2021
1 parent 501db44 commit eaf80fa
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 267 deletions.
3 changes: 1 addition & 2 deletions lib/core/src/Cardano/Wallet/Network.hs
Expand Up @@ -151,8 +151,7 @@ data NetworkLayer m block = NetworkLayer
-- ^ Broadcast a transaction to the chain producer

, stakeDistribution
:: BlockHeader -- Point of interest
-> Coin -- Stake to consider for rewards
:: Coin -- Stake to consider for rewards
-> ExceptT ErrStakeDistribution m StakePoolsSummary

, getAccountBalance
Expand Down
132 changes: 91 additions & 41 deletions lib/core/src/Ouroboros/Network/Client/Wallet.hs
Expand Up @@ -31,12 +31,13 @@ module Ouroboros.Network.Client.Wallet
, localTxSubmission

-- * LocalStateQuery
, LSQ (..)
, LocalStateQueryCmd (..)
, LocalStateQueryResult
, localStateQuery

-- * Helpers
, send
, sendAsync

-- * Logs
, ChainSyncLog (..)
Expand All @@ -53,6 +54,8 @@ import Cardano.Slotting.Slot
( WithOrigin (..) )
import Cardano.Wallet.Network
( NextBlocksResult (..) )
import Control.Monad
( ap, join, liftM, (>=>) )
import Control.Monad.Class.MonadSTM
( MonadSTM
, TQueue
Expand All @@ -67,6 +70,8 @@ import Control.Monad.Class.MonadSTM
)
import Control.Monad.Class.MonadThrow
( MonadThrow )
import Control.Monad.Class.MonadTimer
( MonadTimer, threadDelay )
import Control.Tracer
( Tracer, traceWith )
import Data.Functor
Expand Down Expand Up @@ -440,12 +445,11 @@ chainSyncWithBlocks tr fromTip queue responseBuffer =
--
-- LocalStateQuery

-- | Command to send to the localStateQuery client. See also 'ChainSyncCmd'.
data LocalStateQueryCmd block (m :: * -> *)
= forall state. CmdQueryLocalState
(Point block)
(Query block state)
(LocalStateQueryResult state -> m ())
---- | Command to send to the localStateQuery client. See also 'ChainSyncCmd'.
--data LocalStateQueryCmd block (m :: * -> *)
-- = forall state. CmdQueryLocalState
-- (Query block state)
-- (LocalStateQueryResult state -> m ())

-- | Shorthand for the possible outcomes of acquiring local state parameters.
type LocalStateQueryResult state = Either AcquireFailure state
Expand Down Expand Up @@ -475,62 +479,99 @@ type LocalStateQueryResult state = Either AcquireFailure state
-- │ ┌───────────┴───┐ ┌────────┴───────┐
-- └───────┤ Acquired │────────────>│ Querying │
-- └───────────────┘ └────────────────┘
--
-- FIXME: I think the diagram is wrong. It is possible to query multiple times
-- while we're acquired.
localStateQuery
:: forall m block . (MonadThrow m, MonadSTM m)
=> TQueue m (LocalStateQueryCmd block m)
:: forall m block . (MonadThrow m, MonadSTM m, MonadTimer m, Eq (Point block))
=> m (Point block)
-> TQueue m (LocalStateQueryCmd block m)
-- ^ We use a 'TQueue' as a communication channel to drive queries from
-- outside of the network client to the client itself.
-- Requests are pushed to the queue which are then transformed into
-- messages to keep the state-machine moving.
-> LocalStateQueryClient block (Point block) (Query block) m Void
localStateQuery queue =
localStateQuery getTip queue =
LocalStateQueryClient clientStIdle
where
clientStIdle
:: m (LSQ.ClientStIdle block (Point block) (Query block) m Void)
clientStIdle = awaitNextCmd <&> \case
CmdQueryLocalState pt query respond ->
LSQ.SendMsgAcquire pt (clientStAcquiring query respond)
clientStIdle = do
cmd <- awaitNextCmd
pt <- getTip
pure $ LSQ.SendMsgAcquire pt (clientStAcquiring cmd pt 0)

clientStAcquiring
:: forall state. Query block state
-> (LocalStateQueryResult state -> m ())
:: LocalStateQueryCmd block m
-> Point block
-> Int
-> LSQ.ClientStAcquiring block (Point block) (Query block) m Void
clientStAcquiring query respond = LSQ.ClientStAcquiring
{ recvMsgAcquired = clientStAcquired query respond
clientStAcquiring qry oldPt retries = LSQ.ClientStAcquiring
{ recvMsgAcquired = clientStAcquired qry
, recvMsgFailure = \failure -> do
respond (Left failure)
if retries < 1000
then do
let go = do
newPt <- getTip
if oldPt == newPt
-- FIXME: Not the most elegant approach of waiting for a
-- new tip.
then threadDelay 0.01 >> go
else do
pure $ LSQ.SendMsgAcquire newPt (clientStAcquiring qry newPt (retries + 1))
go
else do
--respond (Left failure)
-- TODO: Fail
clientStIdle
}

clientStAcquired
:: forall state. Query block state
-> (LocalStateQueryResult state -> m ())
-> LSQ.ClientStAcquired block (Point block) (Query block) m Void
clientStAcquired query respond =
LSQ.SendMsgQuery query (clientStQuerying respond)

-- By re-acquiring rather releasing the state with 'MsgRelease' it
-- enables optimisations on the server side.
clientStAcquiredAgain
:: m (LSQ.ClientStAcquired block (Point block) (Query block) m Void)
clientStAcquiredAgain = awaitNextCmd <&> \case
CmdQueryLocalState pt query respond ->
LSQ.SendMsgReAcquire pt (clientStAcquiring query respond)

clientStQuerying
:: forall state. (LocalStateQueryResult state -> m ())
-> LSQ.ClientStQuerying block (Point block) (Query block) m Void state
clientStQuerying respond = LSQ.ClientStQuerying
{ recvMsgResult = \result -> do
respond (Right result)
clientStAcquiredAgain
}
:: LocalStateQueryCmd block m
-> (LSQ.ClientStAcquired block (Point block) (Query block) m Void)
clientStAcquired (SomeLSQ cmd respond) = go cmd $ \res -> do
LSQ.SendMsgRelease (respond res >> clientStIdle)
-- TODO: We might not want to release. Re-acquiring, or even staying
-- acquired should be more efficient better.
where
go
:: forall a. LSQ block m a
-> (a -> (LSQ.ClientStAcquired block (Point block) (Query block) m Void))
-> (LSQ.ClientStAcquired block (Point block) (Query block) m Void)
go (LSQPure a) cont = (cont a)
go (LSQry qry) cont = LSQ.SendMsgQuery qry $ LSQ.ClientStQuerying $ \res -> do
pure $ cont res
go (LSQBind ma f) cont = go ma $ \a -> do
go (f a) $ \b -> cont b

awaitNextCmd :: m (LocalStateQueryCmd block m)
awaitNextCmd = atomically $ readTQueue queue

data LocalStateQueryCmd block m = forall a. SomeLSQ
(LSQ block m a)
(a -> m ())

-- Monad for composing local state queries for the node /tip/.
data LSQ block (m :: * -> *) a where
LSQPure :: a -> LSQ block m a
LSQBind :: LSQ block m a -> (a -> LSQ block m b) -> LSQ block m b
LSQry :: (Query block res) -> LSQ block m res

-- TODO: Allow reading of the tip and era, such that queries can bo chosen based
-- on it, and one can easily create era-agnostic wrappers.
-- LSQTip :: LSQ block m (Tip block)
-- LSQEra :: LSQ block m (Tip block)

instance Functor (LSQ block m) where
fmap = liftM

instance Applicative (LSQ block m) where
pure = LSQPure
(<*>) = ap

instance Monad (LSQ block m) where
return = pure
(>>=) = LSQBind

--------------------------------------------------------------------------------
--
-- LocalTxSubmission
Expand Down Expand Up @@ -607,6 +648,15 @@ send queue cmd = do
atomically $ writeTQueue queue (cmd (atomically . putTMVar tvar))
atomically $ takeTMVar tvar


sendAsync
:: MonadSTM m
=> TQueue m (cmd m)
-> (cmd m)
-> m ()
sendAsync queue cmd = do
atomically $ writeTQueue queue cmd

-- Tracing

data ChainSyncLog block point
Expand Down

0 comments on commit eaf80fa

Please sign in to comment.