Skip to content

Commit

Permalink
cardano-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
eyeinsky committed Dec 2, 2022
1 parent 99723db commit edd1090
Show file tree
Hide file tree
Showing 4 changed files with 522 additions and 53 deletions.
37 changes: 35 additions & 2 deletions cardano-streaming/cardano-streaming.cabal
Expand Up @@ -29,7 +29,10 @@ common lang
library
import: lang
hs-source-dirs: src
exposed-modules: Cardano.Streaming
exposed-modules:
Cardano.Streaming
Cardano.Streaming.Callbacks
Cardano.Streaming.Helpers

--------------------------
-- Other IOG dependencies
Expand All @@ -42,9 +45,39 @@ library
-- Non-IOG dependencies
------------------------
build-depends:
, aeson
, async
, base >=4.9 && <5
, base >=4.9 && <5
, base16-bytestring
, bytestring
, cardano-api
, cardano-binary
, cardano-crypto-class
, cardano-crypto-wrapper
, cardano-data
, cardano-ledger-alonzo
, cardano-ledger-byron
, cardano-ledger-core
, cardano-ledger-shelley
, cardano-protocol-tpraos
, cardano-slotting
, containers
, filepath
, formatting
, memory
, ouroboros-consensus
, ouroboros-consensus-byron
, ouroboros-consensus-cardano
, ouroboros-consensus-protocol
, ouroboros-consensus-shelley
, primitive
, small-steps
, streaming
, transformers
, transformers-except
, typed-protocols
, vector
, yaml

executable cardano-streaming-example-1
import: lang
Expand Down
252 changes: 201 additions & 51 deletions cardano-streaming/src/Cardano/Streaming.hs
@@ -1,48 +1,63 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
module Cardano.Streaming
( withChainSyncEventStream,
ChainSyncEvent (..),
ChainSyncEventException (..),
( withChainSyncEventStream
, ChainSyncEvent (..)
, ChainSyncEventException (..)

--
, mkConnectInfo
, mkLocalNodeConnectInfo

-- * Stream blokcs and ledger states
, foldLedgerState
, getEnvAndInitialLedgerStateHistory
, blocks
, ignoreRollbacks
, ledgerStates
, ledgerStates0
, ledgerStatesPipelined
)
where

import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (ChainSyncClient), ChainTip,
ConsensusModeParams (CardanoModeParams), EpochSlots (EpochSlots),
LocalChainSyncClient (LocalChainSyncClient),
LocalNodeClientProtocols (LocalNodeClientProtocols, localChainSyncClient, localStateQueryClient, localTxMonitoringClient, localTxSubmissionClient),
LocalNodeConnectInfo (LocalNodeConnectInfo, localConsensusModeParams, localNodeNetworkId, localNodeSocketPath),
NetworkId, connectToLocalNode)
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsgRequestNext),
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync)
import Control.Concurrent qualified as IO
import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), async, link, withAsync)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Exception (Exception, SomeException (SomeException), catch, throw)
import Control.Exception qualified as IO
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Except (ExceptT, runExceptT, withExceptT)
import Data.Foldable (forM_)
import Data.Function ((&))
import Data.Sequence (Seq)
import Data.Sequence qualified as Seq
import Data.Word (Word32)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S

data ChainSyncEvent a
= RollForward a ChainTip
| RollBackward ChainPoint ChainTip
deriving (Show, Functor, Generic)

data ChainSyncEventException
= NoIntersectionFound
deriving (Show)
import Cardano.Api qualified as C
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsgRequestNext),
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Cardano.Slotting.Slot (WithOrigin (At, Origin))

instance Exception ChainSyncEventException
import Cardano.Streaming.Callbacks
import Cardano.Streaming.Helpers

-- | `withChainSyncEventStream` uses the chain-sync mini-protocol to
-- connect to a locally running node and fetch blocks from the given
-- starting point.
withChainSyncEventStream ::
-- | Path to the node socket
FilePath ->
NetworkId ->
C.NetworkId ->
-- | The point on the chain to start streaming from
ChainPoint ->
C.ChainPoint ->
-- | The stream consumer
(Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b) ->
(Stream (Of (ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r -> IO b) ->
IO b
withChainSyncEventStream socketPath networkId point consumer = do
-- The chain-sync client runs in a different thread passing the blocks it
Expand All @@ -65,29 +80,16 @@ withChainSyncEventStream socketPath networkId point consumer = do
nextBlockVar <- newEmptyMVar

let client = chainSyncStreamingClient point nextBlockVar

localNodeClientProtocols =
LocalNodeClientProtocols
{ localChainSyncClient = LocalChainSyncClient client,
localStateQueryClient = Nothing,
localTxMonitoringClient = Nothing,
localTxSubmissionClient = Nothing
}

connectInfo =
LocalNodeConnectInfo
{ localConsensusModeParams = CardanoModeParams epochSlots,
localNodeNetworkId = networkId,
localNodeSocketPath = socketPath
C.LocalNodeClientProtocols
{ C.localChainSyncClient = C.LocalChainSyncClient client,
C.localStateQueryClient = Nothing,
C.localTxMonitoringClient = Nothing,
C.localTxSubmissionClient = Nothing
}
connectInfo = mkLocalNodeConnectInfo networkId socketPath

-- This a parameter needed only for the Byron era. Since the Byron
-- era is over and the parameter has never changed it is ok to
-- hardcode this. See comment on `Cardano.Api.ConsensusModeParams` in
-- cardano-node.
epochSlots = EpochSlots 21600

withAsync (connectToLocalNode connectInfo localNodeClientProtocols) $ \a -> do
withAsync (C.connectToLocalNode connectInfo localNodeClientProtocols) $ \a -> do
-- Make sure all exceptions in the client thread are passed to the consumer thread
link a
-- Run the consumer
Expand All @@ -103,16 +105,16 @@ withChainSyncEventStream socketPath networkId point consumer = do
-- If the starting point is such that an intersection cannot be found, this
-- client will throw a NoIntersectionFound exception.
chainSyncStreamingClient ::
ChainPoint ->
C.ChainPoint ->
MVar (ChainSyncEvent e) ->
ChainSyncClient e ChainPoint ChainTip IO ()
C.ChainSyncClient e C.ChainPoint C.ChainTip IO ()
chainSyncStreamingClient point nextChainEventVar =
ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect
C.ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect
where
onIntersect =
ClientStIntersect
{ recvMsgIntersectFound = \_ _ ->
ChainSyncClient sendRequestNext,
C.ChainSyncClient sendRequestNext,
recvMsgIntersectNotFound =
-- There is nothing we can do here
throw NoIntersectionFound
Expand All @@ -124,12 +126,160 @@ chainSyncStreamingClient point nextChainEventVar =
onNext =
ClientStNext
{ recvMsgRollForward = \bim ct ->
ChainSyncClient $ do
C.ChainSyncClient $ do
putMVar nextChainEventVar (RollForward bim ct)
sendRequestNext,
recvMsgRollBackward = \cp ct ->
ChainSyncClient $ do
C.ChainSyncClient $ do
putMVar nextChainEventVar (RollBackward cp ct)
sendRequestNext
}

-- | Create stream of @ChainSyncEvent (BlockInMode CardanoMode)@ from
-- a node at @socketPath@ with @networkId@ at @point@.
blocks
:: C.LocalNodeConnectInfo C.CardanoMode -> C.ChainPoint
-> S.Stream (S.Of (ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
blocks con chainPoint = do
chan <- liftIO IO.newChan
void $ liftIO $ linkedAsync $ blocksCallback con chainPoint $ IO.writeChan chan
S.repeatM $ IO.readChan chan

blocksPipelined
:: Word32 -> C.LocalNodeConnectInfo C.CardanoMode -> C.ChainPoint
-> S.Stream (S.Of (ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
blocksPipelined pipelineSize con chainPoint = do
chan <- liftIO IO.newChan
void $ liftIO $ linkedAsync $ blocksCallbackPipelined pipelineSize con chainPoint $ IO.writeChan chan
S.repeatM $ IO.readChan chan

-- * Ledger states

-- | Get a stream of permanent ledger states
ledgerStates :: FilePath -> FilePath -> C.ValidationMode -> S.Stream (S.Of C.LedgerState) IO r
ledgerStates config socket validationMode = do
(env, initialLedgerStateHistory) <- liftIO $ getEnvAndInitialLedgerStateHistory config
blocks (mkConnectInfo env socket) C.ChainPointAtGenesis
& foldLedgerState env initialLedgerStateHistory validationMode

-- | TODO: fix this. Get a stream of permanent ledger states, rollbacks handled by rollbackRingBuffer
ledgerStates0 :: FilePath -> FilePath -> C.ValidationMode -> S.Stream (S.Of C.LedgerState) IO r
ledgerStates0 config socket validationMode = do
(env, initialLedgerState) <- either IO.throw pure =<< (liftIO $ runExceptT $ C.initialLedgerState config)

let applyBlock_ :: C.LedgerState -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent])
applyBlock_ ledgerState block = applyBlockThrow env ledgerState validationMode block

loop :: C.LedgerState -> S.Stream (S.Of (C.BlockInMode C.CardanoMode)) IO r -> S.Stream (S.Of C.LedgerState) IO r
loop ledgerState source = lift (S.next source) >>= \case
Left r -> pure r
Right (C.BlockInMode block _, source') -> do
(newLedgerState, _) <- liftIO $ applyBlock_ ledgerState block
S.yield newLedgerState
loop newLedgerState source'

blocks (mkConnectInfo env socket) C.ChainPointAtGenesis
& rollbackRingBuffer (fromIntegral $ C.envSecurityParam env)
& loop initialLedgerState

-- | Get a stream of ledger states over a pipelined chain sync
ledgerStatesPipelined
:: Word32 -> FilePath -> FilePath -> C.ValidationMode -> S.Stream (S.Of C.LedgerState) IO r
ledgerStatesPipelined pipelineSize config socket validationMode = do
(env, initialLedgerStateHistory) <- liftIO $ getEnvAndInitialLedgerStateHistory config
blocksPipelined pipelineSize (mkConnectInfo env socket) C.ChainPointAtGenesis
& foldLedgerState env initialLedgerStateHistory validationMode

-- * Apply block

-- | Fold a stream of blocks into a stream of ledger states. This is
-- implemented in a similar way as `foldBlocks` in
-- cardano-api:Cardano.Api.LedgerState, the difference being that this
-- keeps waiting for more blocks when chainsync server and client are
-- fully synchronized.
foldLedgerState
:: C.Env -> LedgerStateHistory -> C.ValidationMode
-> S.Stream (S.Of (ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
-> S.Stream (S.Of C.LedgerState) IO r
foldLedgerState env initialLedgerStateHistory validationMode = loop initialLedgerStateHistory
where
applyBlock_ :: C.LedgerState -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent])
applyBlock_ ledgerState block = applyBlockThrow env ledgerState validationMode block

loop
:: LedgerStateHistory
-> S.Stream (S.Of (ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
-> S.Stream (S.Of C.LedgerState) IO r
loop ledgerStateHistory source = lift (S.next source) >>= \case
Left r -> pure r
Right (chainSyncEvent, source') -> do
ledgerStateHistory1 <- case chainSyncEvent of
RollForward (blockInMode@(C.BlockInMode block _)) ct -> do
newLedgerState <- liftIO $ applyBlock_ (getLastLedgerState ledgerStateHistory) block
let (ledgerStateHistory1, committedStates) = pushLedgerState env ledgerStateHistory (bimSlotNo blockInMode) newLedgerState blockInMode
forM_ committedStates $ \(_, (ledgerState, _ledgerEvents), currBlockMay) -> case currBlockMay of
Origin -> return ()
At _currBlock -> S.yield ledgerState
pure ledgerStateHistory1
RollBackward cp ct -> do
let newClientTip = Origin
newServerTip = fromChainTip ct
pure $ case cp of
C.ChainPointAtGenesis -> initialLedgerStateHistory
C.ChainPoint slotNo _ -> rollBackLedgerStateHist ledgerStateHistory slotNo
loop ledgerStateHistory1 source'

getEnvAndInitialLedgerStateHistory :: FilePath -> IO (C.Env, LedgerStateHistory)
getEnvAndInitialLedgerStateHistory configPath = do
(env, initialLedgerState) <- either IO.throw pure =<< (runExceptT $ C.initialLedgerState configPath)
let initialLedgerStateHistory = singletonLedgerStateHistory initialLedgerState
return (env, initialLedgerStateHistory)


applyBlockThrow :: C.Env -> C.LedgerState -> C.ValidationMode -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent])
applyBlockThrow env ledgerState validationMode block = case C.applyBlock env ledgerState validationMode block of
Left err -> IO.throw err
Right ls -> pure ls

-- | A history of k (security parameter) recent ledger states. The head is the
-- most recent item. Elements are:
--
-- * Slot number that a new block occurred
-- * The ledger state and events after applying the new block
-- * The new block
--
type LedgerStateHistory = History LedgerStateEvents
type History a = Seq (C.SlotNo, a, WithOrigin (C.BlockInMode C.CardanoMode))

singletonLedgerStateHistory :: C.LedgerState -> LedgerStateHistory
singletonLedgerStateHistory ledgerState = Seq.singleton (0, (ledgerState, []), Origin)

-- | Add a new ledger state to the history
pushLedgerState
:: C.Env -- ^ Environment used to get the security param, k.
-> History a -- ^ History of k items.
-> C.SlotNo -- ^ Slot number of the new item.
-> a -- ^ New item to add to the history
-> C.BlockInMode C.CardanoMode
-- ^ The block that (when applied to the previous
-- item) resulted in the new item.
-> (History a, History a)
-- ^ ( The new history with the new item appended
-- , Any existing items that are now past the security parameter
-- and hence can no longer be rolled back.
-- )
pushLedgerState env hist ix st block
= Seq.splitAt
(fromIntegral $ C.envSecurityParam env + 1)
((ix, st, At block) Seq.:<| hist)

rollBackLedgerStateHist :: History a -> C.SlotNo -> History a
rollBackLedgerStateHist hist maxInc = Seq.dropWhileL ((> maxInc) . (\(x,_,_) -> x)) hist

getLastLedgerState :: LedgerStateHistory -> C.LedgerState
getLastLedgerState ledgerStates' = maybe
(error "Impossible! Missing Ledger state")
(\(_,(ledgerState, _),_) -> ledgerState)
(Seq.lookup 0 ledgerStates')

type LedgerStateEvents = (C.LedgerState, [C.LedgerEvent])

0 comments on commit edd1090

Please sign in to comment.