Skip to content

Commit

Permalink
Re-write local state query client code with LSQ monad
Browse files Browse the repository at this point in the history
Goals
- Make AcquireFailures impossible (for wallet API users)
- Eliminate integration test flakiness
- Reduce boilerplate

by adding a monad for composing queries to be run against any, but the
same, node tip.

* wip: leverage STM retrying

This adopts the state observation pattern Duncan described in a recent
STM seminar.

We now use pure TVar's. Not TMVar or Chan. We can still efficiently
observe for changes by using `guard`.

This seems to make the tests much greener. Hoping this fixes several
flaky tests on master.

* Reduce boilerplate, and re-add support for past eras
  • Loading branch information
Anviking committed Jan 18, 2021
1 parent 922319c commit b2b4550
Show file tree
Hide file tree
Showing 4 changed files with 334 additions and 417 deletions.
3 changes: 1 addition & 2 deletions lib/core/src/Cardano/Wallet/Network.hs
Expand Up @@ -151,8 +151,7 @@ data NetworkLayer m block = NetworkLayer
-- ^ Broadcast a transaction to the chain producer

, stakeDistribution
:: BlockHeader -- Point of interest
-> Coin -- Stake to consider for rewards
:: Coin -- Stake to consider for rewards
-> ExceptT ErrStakeDistribution m StakePoolsSummary

, getAccountBalance
Expand Down
171 changes: 116 additions & 55 deletions lib/core/src/Ouroboros/Network/Client/Wallet.hs
Expand Up @@ -31,12 +31,15 @@ module Ouroboros.Network.Client.Wallet
, localTxSubmission

-- * LocalStateQuery
, LSQ (..)
, LocalStateQueryCmd (..)
, LocalStateQueryResult
, localStateQuery
, query
, currentEra

-- * Helpers
, send
, sendAsync

-- * Logs
, ChainSyncLog (..)
Expand All @@ -45,6 +48,8 @@ module Ouroboros.Network.Client.Wallet

import Prelude

import Cardano.Api
( AnyCardanoEra )
import Cardano.BM.Data.Severity
( Severity (..) )
import Cardano.BM.Data.Tracer
Expand All @@ -53,20 +58,32 @@ import Cardano.Slotting.Slot
( WithOrigin (..) )
import Cardano.Wallet.Network
( NextBlocksResult (..) )
import Control.Arrow
( second )
import Control.Monad
( ap, guard, join, liftM, (>=>) )
import Control.Monad.Class.MonadSTM
( MonadSTM
, STM
, TMVar
, TQueue
, TVar
, atomically
, isEmptyTQueue
, newEmptyTMVarIO
, putTMVar
, readTQueue
, readTVar
, takeTMVar
, tryReadTQueue
, writeTQueue
)
import Control.Monad.Class.MonadThrow
( MonadThrow )
import Control.Monad.Class.MonadTimer
( MonadTimer, threadDelay )
import Control.Monad.IO.Class
( MonadIO )
import Control.Tracer
( Tracer, traceWith )
import Data.Functor
Expand Down Expand Up @@ -104,12 +121,7 @@ import Ouroboros.Network.Protocol.ChainSync.Client
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
( ChainSyncClientPipelined (..) )
import Ouroboros.Network.Protocol.LocalStateQuery.Client
( ClientStAcquiring (..)
, ClientStQuerying (..)
, LocalStateQueryClient (..)
)
import Ouroboros.Network.Protocol.LocalStateQuery.Type
( AcquireFailure )
( ClientStAcquiring (..), LocalStateQueryClient (..) )
import Ouroboros.Network.Protocol.LocalTxSubmission.Client
( LocalTxClientStIdle (..), LocalTxSubmissionClient (..) )
import Ouroboros.Network.Protocol.LocalTxSubmission.Type
Expand All @@ -131,8 +143,10 @@ import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as LSQ
chainSyncFollowTip
:: forall m block era. (Monad m)
=> (block -> era)
-> (Maybe era -> Tip block -> m ())
-> (era -> Tip block -> m ())
-- ^ Callback for when the tip changes.
--
-- Will not be called on rollbacks(!), but on the subsequent roll-forward.
-> ChainSyncClient block (Point block) (Tip block) m Void
chainSyncFollowTip toCardanoEra onTipUpdate =
ChainSyncClient (clientStIdle False)
Expand Down Expand Up @@ -165,13 +179,15 @@ chainSyncFollowTip toCardanoEra onTipUpdate =
-- era-agnostic (for now at least!) which isn't a big deal really because
-- the era will simply be updated on the next RollForward which follows
-- immediately after.
--
-- NOTE: Let's try not updating the tip on rollbacks?
clientStNext True = ClientStNext
{ recvMsgRollBackward = doUpdate . const Nothing
, recvMsgRollForward = doUpdate . Just . toCardanoEra
{ recvMsgRollBackward = \_ _ -> ChainSyncClient $ clientStIdle True
, recvMsgRollForward = doUpdate . toCardanoEra
}
where
doUpdate
:: Maybe era
:: era
-> Tip block
-> ChainSyncClient block (Point block) (Tip block) m Void
doUpdate era tip = ChainSyncClient $ do
Expand Down Expand Up @@ -440,15 +456,10 @@ chainSyncWithBlocks tr fromTip queue responseBuffer =
--
-- LocalStateQuery

-- | Command to send to the localStateQuery client. See also 'ChainSyncCmd'.
data LocalStateQueryCmd block (m :: * -> *)
= forall state. CmdQueryLocalState
(Point block)
(Query block state)
(LocalStateQueryResult state -> m ())

-- | Shorthand for the possible outcomes of acquiring local state parameters.
type LocalStateQueryResult state = Either AcquireFailure state
---- | Command to send to the localStateQuery client. See also 'ChainSyncCmd'.
data LocalStateQueryCmd block m = forall a. SomeLSQ
(LSQ block m a)
(a -> m ())

-- | Client for the 'Local State Query' mini-protocol.
--
Expand All @@ -475,62 +486,103 @@ type LocalStateQueryResult state = Either AcquireFailure state
-- │ ┌───────────┴───┐ ┌────────┴───────┐
-- └───────┤ Acquired │────────────>│ Querying │
-- └───────────────┘ └────────────────┘
-- FIXME: I think the diagram is wrong. It is possible to query multiple times
-- while we're acquired.
--
-- NOTE: Using AnyCardanoEra arguably goes against the grain of the abstract
-- block type. We might be able to use the (Header block) as replacemet for
-- (AnyCardanoEra, Tip block).
localStateQuery
:: forall m block . (MonadThrow m, MonadSTM m)
=> TQueue m (LocalStateQueryCmd block m)
:: forall m block . (MonadIO m, MonadSTM m, MonadTimer m, Eq (Point block))
=> STM m (AnyCardanoEra, Point block)
-- ^ A way to fetch the current node tip
-> TQueue m (LocalStateQueryCmd block m)
-- ^ We use a 'TQueue' as a communication channel to drive queries from
-- outside of the network client to the client itself.
-- Requests are pushed to the queue which are then transformed into
-- messages to keep the state-machine moving.
-> LocalStateQueryClient block (Point block) (Query block) m Void
localStateQuery queue =
localStateQuery getTip queue =
LocalStateQueryClient clientStIdle
where
clientStIdle
:: m (LSQ.ClientStIdle block (Point block) (Query block) m Void)
clientStIdle = awaitNextCmd <&> \case
CmdQueryLocalState pt query respond ->
LSQ.SendMsgAcquire pt (clientStAcquiring query respond)
clientStIdle = do
cmd <- awaitNextCmd
(era, pt) <- atomically getTip
pure $ LSQ.SendMsgAcquire pt (clientStAcquiring cmd pt era)

clientStAcquiring
:: forall state. Query block state
-> (LocalStateQueryResult state -> m ())
:: LocalStateQueryCmd block m
-> Point block
-> AnyCardanoEra
-> LSQ.ClientStAcquiring block (Point block) (Query block) m Void
clientStAcquiring query respond = LSQ.ClientStAcquiring
{ recvMsgAcquired = clientStAcquired query respond
, recvMsgFailure = \failure -> do
respond (Left failure)
clientStIdle
clientStAcquiring qry oldPt era = LSQ.ClientStAcquiring
{ recvMsgAcquired = clientStAcquired qry era
, recvMsgFailure = \_failure -> do
(newEra, newPt) <- atomically $ do
t <- getTip
guard (((snd t) /= oldPt))
return t
pure $ LSQ.SendMsgAcquire newPt (clientStAcquiring qry newPt newEra)
}

clientStAcquired
:: forall state. Query block state
-> (LocalStateQueryResult state -> m ())
-> LSQ.ClientStAcquired block (Point block) (Query block) m Void
clientStAcquired query respond =
LSQ.SendMsgQuery query (clientStQuerying respond)

-- By re-acquiring rather releasing the state with 'MsgRelease' it
-- enables optimisations on the server side.
clientStAcquiredAgain
:: m (LSQ.ClientStAcquired block (Point block) (Query block) m Void)
clientStAcquiredAgain = awaitNextCmd <&> \case
CmdQueryLocalState pt query respond ->
LSQ.SendMsgReAcquire pt (clientStAcquiring query respond)

clientStQuerying
:: forall state. (LocalStateQueryResult state -> m ())
-> LSQ.ClientStQuerying block (Point block) (Query block) m Void state
clientStQuerying respond = LSQ.ClientStQuerying
{ recvMsgResult = \result -> do
respond (Right result)
clientStAcquiredAgain
}
:: LocalStateQueryCmd block m
-> AnyCardanoEra
-> (LSQ.ClientStAcquired block (Point block) (Query block) m Void)
clientStAcquired (SomeLSQ cmd respond) era = go cmd $ \res -> do
LSQ.SendMsgRelease (respond res >> clientStIdle)
-- TODO: We might not want to release. Re-acquiring, or even staying
-- acquired should be more efficient.
where
go
:: forall a. LSQ block m a
-> (a -> (LSQ.ClientStAcquired block (Point block) (Query block) m Void))
-> (LSQ.ClientStAcquired block (Point block) (Query block) m Void)
go (LSQPure a) cont = cont a
go (LSQry qry) cont = LSQ.SendMsgQuery qry $ LSQ.ClientStQuerying $ \res -> do
pure $ cont res
-- TODO: It would be nice to trace the time it takes to run the
-- queries.
go (LSQBind ma f) cont = go ma $ \a -> do
go (f a) $ \b -> cont b
go (LSQEra) cont = cont era

awaitNextCmd :: m (LocalStateQueryCmd block m)
awaitNextCmd = atomically $ readTQueue queue

-- | Monad for composing local state queries for the node /tip/.
data LSQ block (m :: * -> *) a where
LSQPure :: a -> LSQ block m a
LSQBind :: LSQ block m a -> (a -> LSQ block m b) -> LSQ block m b

-- | A local state query.
LSQry :: (Query block res) -> LSQ block m res

-- | The era of the node tip the query is run against.
LSQEra :: LSQ block m AnyCardanoEra

instance Functor (LSQ block m) where
fmap = liftM

instance Applicative (LSQ block m) where
pure = LSQPure
(<*>) = ap

instance Monad (LSQ block m) where
return = pure
(>>=) = LSQBind

--
-- Helpers

query :: (Query block res) -> LSQ block m res
query = LSQry

currentEra :: LSQ block m AnyCardanoEra
currentEra = LSQEra

--------------------------------------------------------------------------------
--
-- LocalTxSubmission
Expand Down Expand Up @@ -607,6 +659,15 @@ send queue cmd = do
atomically $ writeTQueue queue (cmd (atomically . putTMVar tvar))
atomically $ takeTMVar tvar


sendAsync
:: MonadSTM m
=> TQueue m (cmd m)
-> (cmd m)
-> m ()
sendAsync queue cmd = do
atomically $ writeTQueue queue cmd

-- Tracing

data ChainSyncLog block point
Expand Down

0 comments on commit b2b4550

Please sign in to comment.