DIY LedgerState streaming PoC
andreabedini authored and raduom committed May 21, 2022
1 parent 06eb0a4 commit e92200a
14 changes: 13 additions & 1 deletion plutus-streaming/app/Main.hs
module Main where

import Cardano.Api
import Cardano.Api.Extras ()
import Control.Monad.Except (runExceptT)
import Data.Maybe qualified as Maybe
import Options.Applicative hiding (header)
import Plutus.Streaming
import Plutus.Streaming.ChainIndex
import Plutus.Streaming.LedgerState (ledgerState)
import Streaming
import Streaming.Prelude qualified as S
import Text.Pretty.Simple (pPrint)
nthBlock = nthBlockAt ChainPointAtGenesis
nthBlockAt :: ChainPoint -> Int -> IO (BlockInMode CardanoMode)
nthBlockAt point n = do
( fmap Maybe.fromJust
. S.drop n
. S.drop n
. (\case RollForward bim _ -> Just bim; _ -> Nothing)

testLedgerState :: IO ()
testLedgerState = do
ils <- runExceptT (initialLedgerState "mainnet-config.json")
case ils of
(Left e) -> error $ show e
(Right (env, ls)) ->
withSimpleChainSyncEventStream "node.socket" Mainnet ChainPointAtGenesis $
void . S.print . ledgerState env ls QuickValidation

7 changes: 3 additions & 4 deletions plutus-streaming/plutus-streaming.cabal
base >=4.9 && <5,
base >=4.9 && <5,
hs-source-dirs: src
hs-source-dirs: src
default-language: Haskell2010

executable main
executable plutus-streaming-cli
import: lang
main-is: Main.hs
base >=4.9 && <5,
4 changes: 4 additions & 0 deletions plutus-streaming/src/Cardano/Api/Extras.hs
deriving instance Show LedgerEvent
deriving instance Show MIRDistributionDetails

deriving instance Show PoolReapDetails

deriving instance Show InitialLedgerStateError

deriving instance Show GenesisConfigError
55 changes: 55 additions & 0 deletions plutus-streaming/src/Cardano/Api/IPC/Extras.hs
@@ -0,0 +1,55 @@
module Cardano.Api.IPC.Extras where

-- import Cardano.Api
-- import Control.Tracer (nullTracer)
-- import Ouroboros.Network.NodeToClient qualified as N2C
-- import Ouroboros.Network.NodeToNode qualified as N2N
-- import Ouroboros.Network.Snocket (socketSnocket)
-- import Ouroboros.Network.Socket (nullNetworkConnectTracers)

-- connectToLocalNode' :: LocalNodeConnectInfo mode
-- -> LocalNodeClientProtocolsInMode mode
-- -> IO ()
-- connectToLocalNode' localNodeConnectInfo handlers
-- = connectToLocalNodeWithVersion' localNodeConnectInfo (const handlers)

-- -- | Establish a connection to a local node and execute the given set of
-- -- protocol handlers parameterized on the negotiated node-to-client protocol
-- -- version.
-- --
-- connectToLocalNodeWithVersion' :: LocalNodeConnectInfo mode
-- -> (NodeToClientVersion -> LocalNodeClientProtocolsInMode mode)
-- -> IO ()
-- connectToLocalNodeWithVersion' LocalNodeConnectInfo {
-- localNodeSocketPath,
-- localNodeNetworkId,
-- localConsensusModeParams
-- } clients =
-- N2C.withIOManager $ \iomgr ->
-- N2C.connectTo
-- (N2C.localSnocket iomgr)
-- N2C.NetworkConnectTracers {
-- N2C.nctMuxTracer = nullTracer,
-- N2C.nctHandshakeTracer = nullTracer
-- }
-- versionedProtocls
-- localNodeSocketPath
-- where
-- versionedProtocls =
-- -- First convert from the mode-parametrised view of things to the
-- -- block-parametrised view and then do the final setup for the versioned
-- -- bundles of mini-protocols.
-- undefined
-- -- case mkLocalNodeClientParams localConsensusModeParams clients of
-- -- LocalNodeClientParams ptcl clients' ->
-- mkVersionedProtocols localNodeNetworkId ptcl clients'

-- connect ioManager =
-- N2N.connectTo
-- (socketSnocket ioManager)
-- N2N.nullNetworkConnectTracers
-- _versions
-- _maybeSockAddr
-- _sockAddr
20 changes: 20 additions & 0 deletions plutus-streaming/src/Cardano/Api/Lens.hs
@@ -0,0 +1,20 @@
module Cardano.Api.Lens where

-- import Control.Lens
-- import Control.Lens.Traversal
-- import Control.Lens.Combinators
-- import Cardano.Api

-- makePrisms ''Block
-- makePrisms ''EraInMode

-- _BlockHeader :: Getter (Block era) BlockHeader
-- _BlockHeader f b =
-- let g = \case (Block header _) -> header in
-- contramap g (f $ g b)

-- _Tx :: Traversal' (Block era) (Tx era)
-- _Tx f bl = _wa

-- makePrisms ''Tx
-- makePrisms ''TxBodyContent
81 changes: 81 additions & 0 deletions plutus-streaming/src/Plutus/Streaming/LedgerState.hs
@@ -0,0 +1,81 @@
module Plutus.Streaming.LedgerState
( ledgerState,
LedgerState (..),
LedgerEvent (..),

import Cardano.Api
import Data.Sequence (Seq (..))
import Data.Sequence qualified as Seq
import Plutus.Streaming
import Streaming
import Streaming.Prelude qualified as S
import Unsafe.Coerce (unsafeCoerce)

data LedgerStateEvents = LedgerStateEvents LedgerState [LedgerEvent]

applyBlock' ::
Env ->
LedgerState ->
ValidationMode ->
Block era ->
Either LedgerStateError LedgerStateEvents
applyBlock' = unsafeCoerce applyBlock

type History a = Seq (SlotNo, a)

-- | This function works under the assumption that the stream of blocks it
-- receives is valid. The function will trigger an exception if
-- 1. a block it receives does not apply on top of the ledger state
-- 2. a rollback goes past the security parameter
-- FIXME, for the moment I kept this function pure but it requires us to do
-- some up-front IO to obtain the initial ledger state from the network
-- config file.
ledgerState ::
forall m r.
Monad m =>
Env ->
LedgerState ->
ValidationMode ->
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of (LedgerState, [LedgerEvent])) m r
ledgerState env ls0 vm =
S.scan step initialHistory projection
step ::
(History LedgerState, [LedgerEvent]) ->
SimpleChainSyncEvent ->
(History LedgerState, [LedgerEvent])
step (history, _) (RollForward (BlockInMode blk _) _) =
unsafePushBlock history blk
step _ (RollBackward ChainPointAtGenesis _) =
step (history, _) (RollBackward (ChainPoint sn _) _) =
unsafeRollback history sn

initialHistory :: (History LedgerState, [LedgerEvent])
initialHistory = (Seq.singleton (0, ls0), [])

-- This function is unsafe because it might result in an empty history,
-- breaking the assumption of unsafePushBlock and projection
unsafeRollback :: History LedgerState -> SlotNo -> (History LedgerState, [LedgerEvent])
unsafeRollback history sn =
let history' = Seq.dropWhileL ((> sn) . fst) history
in (history', [])

-- This function is unsafe because it will assume the given block will
-- successfully apply on top of the ledger state.
unsafePushBlock :: History LedgerState -> Block era -> (History LedgerState, [LedgerEvent])
unsafePushBlock history@((_, ls) :<| _) blk@(Block (BlockHeader sn _ _) _) =
case applyBlock' env ls vm blk of
Left e ->
error $ "applyBlock failed " <> show e
Right (LedgerStateEvents ls' lse) ->
let history' = fst $ Seq.splitAt (fromIntegral $ envSecurityParam env + 1) ((sn, ls') :<| history)
in (history', lse)
unsafePushBlock Seq.Empty _ = error "Impossible! History should never be empty"

projection :: (History LedgerState, [LedgerEvent]) -> (LedgerState, [LedgerEvent])
projection ((_, ls) :<| _, lse) = (ls, lse)
projection (Seq.Empty, _) = error "Impossible! History should never be empty"

