Skip to content

Commit

Permalink
reset me: Add blocksCallback blocksCallbackPipelined
Browse files Browse the repository at this point in the history
  • Loading branch information
eyeinsky committed Nov 29, 2022
1 parent e63cdb1 commit 77d1249
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 104 deletions.
175 changes: 140 additions & 35 deletions cardano-streaming/src/Cardano/Streaming/LedgerState.hs
Expand Up @@ -19,13 +19,18 @@ module Cardano.Streaming.LedgerState
, foldBlocks
, ledgerStatesPipelined
, ledgerStates

-- * Raw chain-sync clients using callback
, blocksCallbackPipelined
, blocksCallback
)
where

import Prelude

import Control.Concurrent.Async qualified as IO
import Control.Concurrent.Chan qualified as IO
import Control.Exception (Exception, SomeException (SomeException), catch, throw)
import Control.Exception qualified as E
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Class (lift)
Expand All @@ -45,7 +50,8 @@ import Cardano.Api qualified as C
import Cardano.Chain.Genesis qualified
import Cardano.Crypto (ProtocolMagicId (unProtocolMagicId), RequiresNetworkMagic (RequiresMagic, RequiresNoMagic))
import Cardano.Slotting.Slot (WithOrigin (At, Origin))
import Cardano.Streaming (ChainSyncEvent (RollForward), blocks)
import Cardano.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound),
blocks)
import Ouroboros.Consensus.Cardano.CanHardFork qualified as Consensus
import Ouroboros.Consensus.HardFork.Combinator qualified as Consensus
import Ouroboros.Consensus.HardFork.Combinator.AcrossEras qualified as HFC
Expand Down Expand Up @@ -120,7 +126,7 @@ foldBlocksPipelinedIO env initialLedgerState' nodeConfigFilePath socketPath vali

clientNextN :: Nat n -> LedgerStateHistory -> CSP.ClientStNext n (C.BlockInMode C.CardanoMode) C.ChainPoint C.ChainTip IO ()
clientNextN rqsInFlight knownLedgerStates =
CSP.ClientStNext {
CSP.ClientStNext { -- xxx
CSP.recvMsgRollForward = \blockInMode@(C.BlockInMode block@(C.Block (C.BlockHeader slotNo _ currBlockNo) _) _era) serverChainTip -> do
newLedgerState <- applyBlock_ (getLastLedgerState knownLedgerStates) block
let (knownLedgerStates', committedStates) = pushLedgerState env knownLedgerStates slotNo newLedgerState blockInMode
Expand Down Expand Up @@ -173,45 +179,59 @@ foldBlocksIO
-> FilePath -> FilePath -> C.ValidationMode -> IO.IORef a
-> (C.LedgerState -> [C.LedgerEvent] -> C.BlockInMode C.CardanoMode -> a -> IO a)
-> IO ()
foldBlocksIO env initialLedgerState' nodeConfigFilePath socketPath validationMode stateIORef accumulate =
C.connectToLocalNode (mkConnectInfo env socketPath) $ C.LocalNodeClientProtocols
{ C.localChainSyncClient = C.LocalChainSyncClient (chainSyncClient $ singletonLedgerStateHistory initialLedgerState')
, C.localTxSubmissionClient = Nothing
, C.localStateQueryClient = Nothing
, C.localTxMonitoringClient = Nothing
}
foldBlocksIO env initialLedgerState' nodeConfigFilePath socketPath validationMode stateIORef accumulate = do

lhsIORef <- IO.newIORef $ singletonLedgerStateHistory initialLedgerState'
blocksCallback (mkConnectInfo env socketPath) C.ChainPointAtGenesis $ \case
RollForward blockInMode@(C.BlockInMode block _) _ -> do
knownLedgerStates <- IO.readIORef lhsIORef
newLedgerState <- applyBlock_ (getLastLedgerState knownLedgerStates) block
let
slotNo = bimSlotNo blockInMode
(knownLedgerStates', committedStates) = pushLedgerState env knownLedgerStates slotNo newLedgerState blockInMode
forM_ committedStates $ \(_, (ledgerState, ledgerEvents), currBlockMay) -> case currBlockMay of
Origin -> return ()
At currBlock -> do
newState <- accumulate ledgerState ledgerEvents currBlock =<< IO.readIORef stateIORef
IO.writeIORef stateIORef newState
IO.writeIORef lhsIORef knownLedgerStates'

RollBackward cp ct -> TODO

where
-- | Pre-applied applyBlock to env and validation mode as these don't change over the fold.
applyBlock_ :: C.LedgerState -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent])
applyBlock_ ledgerState block = applyBlockThrow env ledgerState validationMode block

-- | Defines the client side of the chain sync protocol.
chainSyncClient :: LedgerStateHistory -> CS.ChainSyncClient (C.BlockInMode C.CardanoMode) C.ChainPoint C.ChainTip IO ()
chainSyncClient lsh = CS.ChainSyncClient $ do
pure $ clientIdle_RequestMoreN lsh
where
clientIdle_RequestMoreN :: LedgerStateHistory -> CS.ClientStIdle (C.BlockInMode C.CardanoMode) C.ChainPoint C.ChainTip IO ()
clientIdle_RequestMoreN knownLedgerStates
= let action = clientNextN knownLedgerStates
in CS.SendMsgRequestNext action (pure action)

clientNextN :: LedgerStateHistory -> CS.ClientStNext (C.BlockInMode C.CardanoMode) C.ChainPoint C.ChainTip IO ()
clientNextN knownLedgerStates = CS.ClientStNext
{ CS.recvMsgRollForward = \blockInMode@(C.BlockInMode block@(C.Block (C.BlockHeader slotNo _ _currBlockNo) _) _era) _serverChainTip ->
CS.ChainSyncClient $ do
newLedgerState <- applyBlock_ (getLastLedgerState knownLedgerStates) block
let (knownLedgerStates', committedStates) = pushLedgerState env knownLedgerStates slotNo newLedgerState blockInMode
forM_ committedStates $ \(_, (ledgerState, ledgerEvents), currBlockMay) -> case currBlockMay of
Origin -> return ()
At currBlock -> do
newState <- accumulate ledgerState ledgerEvents currBlock =<< IO.readIORef stateIORef
IO.writeIORef stateIORef newState
return (clientIdle_RequestMoreN knownLedgerStates')
, CS.recvMsgRollBackward = \chainPoint _ -> chainSyncClient $ case chainPoint of
C.ChainPointAtGenesis -> lsh
C.ChainPoint slotNo _ -> rollBackLedgerStateHist knownLedgerStates slotNo
}
-- C.connectToLocalNode (mkConnectInfo env socketPath) $ C.LocalNodeClientProtocols
-- { C.localChainSyncClient = C.LocalChainSyncClient (chainSyncClient $ singletonLedgerStateHistory initialLedgerState')

-- -- | Defines the client side of the chain sync protocol.
-- chainSyncClient :: LedgerStateHistory -> CS.ChainSyncClient (C.BlockInMode C.CardanoMode) C.ChainPoint C.ChainTip IO ()
-- chainSyncClient lsh = CS.ChainSyncClient $ do
-- pure $ clientIdle_RequestMoreN lsh
-- where
-- clientIdle_RequestMoreN :: LedgerStateHistory -> CS.ClientStIdle (C.BlockInMode C.CardanoMode) C.ChainPoint C.ChainTip IO ()
-- clientIdle_RequestMoreN knownLedgerStates
-- = let action = clientNextN knownLedgerStates
-- in CS.SendMsgRequestNext action (pure action)

-- clientNextN :: LedgerStateHistory -> CS.ClientStNext (C.BlockInMode C.CardanoMode) C.ChainPoint C.ChainTip IO ()
-- clientNextN knownLedgerStates = CS.ClientStNext
-- { CS.recvMsgRollForward = \blockInMode@(C.BlockInMode block@(C.Block (C.BlockHeader slotNo _ _currBlockNo) _) _era) _serverChainTip ->
-- CS.ChainSyncClient $ do
-- newLedgerState <- applyBlock_ (getLastLedgerState knownLedgerStates) block
-- let (knownLedgerStates', committedStates) = pushLedgerState env knownLedgerStates slotNo newLedgerState blockInMode
-- forM_ committedStates $ \(_, (ledgerState, ledgerEvents), currBlockMay) -> case currBlockMay of
-- Origin -> return ()
-- At currBlock -> do
-- newState <- accumulate ledgerState ledgerEvents currBlock =<< IO.readIORef stateIORef
-- IO.writeIORef stateIORef newState
-- return (clientIdle_RequestMoreN knownLedgerStates')
-- , CS.recvMsgRollBackward = \chainPoint _ -> chainSyncClient $ case chainPoint of
-- C.ChainPointAtGenesis -> lsh
-- C.ChainPoint slotNo _ -> rollBackLedgerStateHist knownLedgerStates slotNo
-- }

applyBlockThrow :: C.Env -> C.LedgerState -> C.ValidationMode -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent])
applyBlockThrow env ledgerState validationMode block = case C.applyBlock env ledgerState validationMode block of
Expand Down Expand Up @@ -323,3 +343,88 @@ ledgerStates_ config validationMode socketPath networkId point = do
blocks socketPath networkId point
& S.mapMaybe (\case (RollForward e _) -> Just e; _ -> Nothing)
& loop (singletonLedgerStateHistory initialLedgerState)

-- * Raw chain-sync clients using callback

blocksCallbackPipelined
:: forall a. Word32 -> C.LocalNodeConnectInfo C.CardanoMode
-> (ChainSyncEvent (C.BlockInMode C.CardanoMode) -> IO ())
-> IO ()
blocksCallbackPipelined n con callback =
C.connectToLocalNode con $ C.LocalNodeClientProtocols
{ C.localChainSyncClient = C.LocalChainSyncClientPipelined (work n)
, C.localTxSubmissionClient = Nothing
, C.localStateQueryClient = Nothing
, C.localTxMonitoringClient = Nothing
}
where
work :: Word32 -> CSP.ChainSyncClientPipelined (C.BlockInMode C.CardanoMode) C.ChainPoint C.ChainTip IO ()
work pipelineSize = CSP.ChainSyncClientPipelined $ pure $ requestMore Origin Origin Zero []
where
requestMore -- was clientIdle_RequestMoreN
:: WithOrigin C.BlockNo -> WithOrigin C.BlockNo -> Nat n
-> [ChainSyncEvent (C.BlockInMode C.CardanoMode)]
-> CSP.ClientPipelinedStIdle n (C.BlockInMode C.CardanoMode) C.ChainPoint C.ChainTip IO ()
requestMore clientTip serverTip rqsInFlight bims = let
in case pipelineDecisionMax pipelineSize rqsInFlight clientTip serverTip of
-- handle a response
Collect -> case rqsInFlight of
Succ predN -> CSP.CollectResponse Nothing (clientNextN predN bims)
-- request more: client and server tip unchanged, one more request in flight, accumulator (bims) unchanged
_ -> CSP.SendMsgRequestNextPipelined (requestMore clientTip serverTip (Succ rqsInFlight) bims)

clientNextN
:: Nat n
-> [ChainSyncEvent (C.BlockInMode C.CardanoMode)]
-> CSP.ClientStNext n (C.BlockInMode C.CardanoMode) C.ChainPoint C.ChainTip IO ()
clientNextN rqsInFlight bims = CSP.ClientStNext
{ CSP.recvMsgRollForward = \bim ct -> do
mapM_ callback bims -- emit collected batch
return $ requestMore (At $ bimBlockNo bim) (fromChainTip ct) rqsInFlight []
, CSP.recvMsgRollBackward = \cp ct -> do
return $ requestMore Origin (fromChainTip ct) rqsInFlight (RollBackward cp ct : bims)
}

fromChainTip :: C.ChainTip -> WithOrigin C.BlockNo
fromChainTip ct = case ct of
C.ChainTipAtGenesis -> Origin
C.ChainTip _ _ bno -> At bno

blocksCallback
:: C.LocalNodeConnectInfo C.CardanoMode -> C.ChainPoint
-> (ChainSyncEvent (C.BlockInMode C.CardanoMode) -> IO ())
-> IO ()
blocksCallback con point callback =
C.connectToLocalNode con $ C.LocalNodeClientProtocols
{ C.localChainSyncClient = C.LocalChainSyncClient $ CS.ChainSyncClient $ pure $ CS.SendMsgFindIntersect [point] onIntersect
, C.localTxSubmissionClient = Nothing
, C.localStateQueryClient = Nothing
, C.localTxMonitoringClient = Nothing
}
where
onIntersect =
CS.ClientStIntersect
{ CS.recvMsgIntersectFound = \_ _ -> CS.ChainSyncClient sendRequestNext
, CS.recvMsgIntersectNotFound = throw NoIntersectionFound
}
sendRequestNext = pure $ CS.SendMsgRequestNext onNext (pure onNext)
where
onNext = CS.ClientStNext
{ CS.recvMsgRollForward = \bim ct -> CS.ChainSyncClient $ do
callback $ RollForward bim ct
sendRequestNext
, CS.recvMsgRollBackward = \cp ct -> CS.ChainSyncClient $ do
callback $ RollBackward cp ct
sendRequestNext
}

-- * Helpers

bimBlockNo :: C.BlockInMode C.CardanoMode -> C.BlockNo
bimBlockNo (C.BlockInMode (C.Block (C.BlockHeader _ _ blockNo) _) _) = blockNo

bimSlotNo :: C.BlockInMode C.CardanoMode -> C.SlotNo
bimSlotNo (C.BlockInMode (C.Block (C.BlockHeader slotNo _ _) _) _) = slotNo

u :: a
u = undefined

0 comments on commit 77d1249

Please sign in to comment.