Skip to content

Commit

Permalink
Rework mock server to use a TVar as shared database
Browse files Browse the repository at this point in the history
  The tx submission pushes things in the TVar, and the chain sync reads things from the TVar using a local cursor. The cursor is local to every client and corresponds to the syncing state of the client.
  • Loading branch information
KtorZ committed Sep 16, 2021
1 parent 86b4c00 commit c1e142f
Showing 1 changed file with 35 additions and 25 deletions.
60 changes: 35 additions & 25 deletions hydra-node/src/Hydra/Chain/Direct.hs
Expand Up @@ -27,13 +27,15 @@ import Cardano.Slotting.Slot (
WithOrigin (Origin),
)
import Control.Monad.Class.MonadSTM (
newTQueueIO,
readTQueue,
writeTQueue,
modifyTVar',
newTVarIO,
readTVar,
retry,
)
import Control.Tracer (
nullTracer,
)
import Data.List ((!!))
import Data.Map.Strict (
(!),
)
Expand Down Expand Up @@ -183,15 +185,15 @@ withMockServer ::
-- See also 'defaultEpochSlots' and 'defaultNodeToClientVersionData' for playing
-- around.
(NodeToClientVersionData, EpochSlots) ->
-- | Socket used between clients.
-- | Socket used to connect to the server.
FilePath ->
-- | Action to run in-between.
IO a ->
IO a
withMockServer (vData, epochSlots) addr action = withIOManager $ \iocp -> do
let snocket = localSnocket iocp addr
networkState <- newNetworkMutableState
queue <- newTQueueIO
db <- newTVarIO mempty
withServerNode
snocket
tracers
Expand All @@ -202,16 +204,16 @@ withMockServer (vData, epochSlots) addr action = withIOManager $ \iocp -> do
noTimeLimitsHandshake
(cborTermVersionDataCodec nodeToClientCodecCBORTerm)
acceptableVersion
(SomeResponderApplication <$> versions queue)
(SomeResponderApplication <$> versions db)
errorPolicies
(\_ _ -> action)
where
-- NOTE: written in such a way to make it easier to add support for multiple
-- versions if needed. A bit YAGNI but also tiny enough to be too much
-- overhead.
versions queue =
versions db =
combineVersions
[ simpleSingletonVersions v vData (mockServer (defaultCodecs epochSlots v) queue)
[ simpleSingletonVersions v vData (mockServer (defaultCodecs epochSlots v) db)
| v <- [nodeToClientVLatest]
]

Expand All @@ -238,9 +240,9 @@ type Era = AlonzoEra StandardCrypto
mockServer ::
MonadSTM m =>
ClientCodecs Block m ->
TQueue m (ValidatedTx Era) ->
TVar m [ValidatedTx Era] ->
OuroborosApplication 'ResponderMode LocalAddress LByteString m Void ()
mockServer codecs queue =
mockServer codecs db =
OuroborosApplication $ \_connectionId _controlMessageSTM ->
[ localChainSyncMiniProtocol
, localTxSubmissionMiniProtocol
Expand All @@ -259,7 +261,7 @@ mockServer codecs queue =
MuxPeer
nullTracer
(cChainSyncCodec codecs)
(chainSyncServerPeer $ mockChainSyncServer queue)
(chainSyncServerPeer $ mockChainSyncServer db)

localTxSubmissionMiniProtocol =
MiniProtocol
Expand All @@ -272,7 +274,7 @@ mockServer codecs queue =
MuxPeer
nullTracer
(cTxSubmissionCodec codecs)
(localTxSubmissionServerPeer $ pure $ mockTxSubmissionServer queue)
(localTxSubmissionServerPeer $ pure $ mockTxSubmissionServer db)

maximumMiniProtocolLimits :: MiniProtocolLimits
maximumMiniProtocolLimits =
Expand All @@ -281,10 +283,10 @@ mockServer codecs queue =
mockChainSyncServer ::
forall m.
MonadSTM m =>
TQueue m (ValidatedTx Era) ->
TVar m [ValidatedTx Era] ->
ChainSyncServer Block (Point Block) (Tip Block) m ()
mockChainSyncServer queue =
ChainSyncServer (pure serverStIdle)
mockChainSyncServer db =
ChainSyncServer (pure $ serverStIdle 1)
where
tip :: Tip Block
tip = TipGenesis
Expand All @@ -299,33 +301,41 @@ mockChainSyncServer queue =
body = toTxSeq $ StrictSeq.singleton tx
in BlockAlonzo $ mkShelleyBlock $ Ledger.Block header body

serverStIdle :: ServerStIdle Block (Point Block) (Tip Block) m ()
serverStIdle =
serverStIdle :: Int -> ServerStIdle Block (Point Block) (Tip Block) m ()
serverStIdle cursor =
ServerStIdle
{ -- recvMsgRequestNext :: m (Either (ServerStNext header point tip m a) (m (ServerStNext header point tip m a))),
recvMsgRequestNext = do
tx <- atomically $ readTQueue queue
pure $ Left $ SendMsgRollForward (nextBlock tx) tip (mockChainSyncServer queue)
tx <- atomically $ do
txs <- readTVar db
let ix = length txs - cursor
if ix < 0 then retry else pure (txs !! ix)
let st = ChainSyncServer $ pure $ serverStIdle (cursor + 1)
pure $ Left $ SendMsgRollForward (nextBlock tx) tip st
, recvMsgFindIntersect = \case
[] -> pure $ SendMsgIntersectFound origin tip (ChainSyncServer $ pure serverStIdle)
h : _ -> pure $ SendMsgIntersectFound h tip (ChainSyncServer $ pure serverStIdle)
[] ->
let st = ChainSyncServer $ pure $ serverStIdle cursor
in pure $ SendMsgIntersectFound origin tip st
h : _ ->
let st = ChainSyncServer $ pure $ serverStIdle cursor
in pure $ SendMsgIntersectFound h tip st
, recvMsgDoneClient = pure ()
}

mockTxSubmissionServer ::
MonadSTM m =>
TQueue m (ValidatedTx Era) ->
TVar m [ValidatedTx Era] ->
LocalTxSubmissionServer (GenTx Block) reject m ()
mockTxSubmissionServer queue =
mockTxSubmissionServer db =
LocalTxSubmissionServer
{ recvMsgSubmitTx = \tx -> do
case tx of
GenTxAlonzo genTx ->
atomically $ writeTQueue queue (toValidatedTx genTx)
atomically $ modifyTVar' db (toValidatedTx genTx :)
_ ->
-- FIXME: This should really fail? (i.e. SubmitFail)
pure ()
pure (LocalTxSubmission.SubmitSuccess, mockTxSubmissionServer queue)
pure (LocalTxSubmission.SubmitSuccess, mockTxSubmissionServer db)
, recvMsgDone = ()
}
where
Expand Down

0 comments on commit c1e142f

Please sign in to comment.