Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
andreabedini authored and raduom committed May 21, 2022
1 parent e92200a commit 1d014e8
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 26 deletions.
19 changes: 5 additions & 14 deletions plutus-streaming/app/Main.hs
Expand Up @@ -6,6 +6,7 @@ module Main where
import Cardano.Api
import Cardano.Api.Extras ()
import Control.Monad.Except (runExceptT)
import Data.Aeson qualified as Aeson
import Data.Maybe qualified as Maybe
import Options.Applicative hiding (header)
import Plutus.Streaming
Expand Down Expand Up @@ -71,8 +72,8 @@ chainPointParser =
-- Utilities
--

pPrintStream :: (Show a, MonadIO m) => Stream (Of a) m r -> m r
pPrintStream = S.mapM_ pPrint
printJson :: (MonadIO m, ToJSON a) => Stream (Of a) m r -> m r
printJson = S.print . S.map Aeson.encode

--
-- Example consumers
Expand Down Expand Up @@ -139,7 +140,7 @@ main = do
optionsSocketPath
Mainnet
optionsChainPoint
pPrintStream
S.print
>>= print

doSimple ::
Expand Down Expand Up @@ -172,7 +173,7 @@ nthBlock = nthBlockAt ChainPointAtGenesis
nthBlockAt :: ChainPoint -> Int -> IO (BlockInMode CardanoMode)
nthBlockAt point n = do
withSimpleChainSyncEventStream
"node.socket"
"socket/node.socket"
Mainnet
point
( fmap Maybe.fromJust
Expand All @@ -182,13 +183,3 @@ nthBlockAt point n = do
. S.drop n
. S.map (\case RollForward bim _ -> Just bim; _ -> Nothing)
)

testLedgerState :: IO ()
testLedgerState = do
ils <- runExceptT (initialLedgerState "mainnet-config.json")
case ils of
(Left e) -> error $ show e
(Right (env, ls)) ->
withSimpleChainSyncEventStream "node.socket" Mainnet ChainPointAtGenesis $
void . S.print . ledgerState env ls QuickValidation

37 changes: 37 additions & 0 deletions plutus-streaming/app/TestLedgerState1.hs
@@ -0,0 +1,37 @@
{-# OPTIONS_GHC -Wno-orphans #-}

import Cardano.Api
import Control.Monad.Except (runExceptT)
import Plutus.Streaming (ChainSyncEvent (..), EventStreamResult, SimpleChainSyncEvent, withSimpleChainSyncEventStream)
import Plutus.Streaming.LedgerState (ledgerState')
import Streaming.Prelude qualified as S

deriving instance Show InitialLedgerStateError

deriving instance Show GenesisConfigError

main :: IO ()
main = do
ils <- runExceptT (initialLedgerState "/home/andrea/work/cardano-mainnet/config/mainnet-config.json")
case ils of
(Left e) -> error $ show e
(Right (env, ls)) -> do
r <-
withSimpleChainSyncEventStream
"/home/andrea/work/cardano-mainnet/socket/node.socket"
Mainnet
ChainPointAtGenesis $
consume . ledgerState' env ls FullValidation
print r

consume ::
S.Stream (S.Of (SimpleChainSyncEvent, (LedgerState, [LedgerEvent]))) IO EventStreamResult ->
IO EventStreamResult
consume = S.mapM_ g

g :: (SimpleChainSyncEvent, (LedgerState, [LedgerEvent])) -> IO ()
g (RollForward (BlockInMode blk _eim) _, _) = print cp
where
(Block (BlockHeader sn ha _bn) _) = blk
cp = ChainPoint sn ha
g (rb@(RollBackward _ _), _) = print rb
20 changes: 20 additions & 0 deletions plutus-streaming/app/TestLedgerState2.hs
@@ -0,0 +1,20 @@
import Cardano.Api
import Plutus.Streaming (ChainSyncEvent, ChainSyncEventWithLedgerState, withChainSyncEventStreamWithLedgerState)
import Streaming.Prelude qualified as S

main :: IO ()
main = do
r <-
withChainSyncEventStreamWithLedgerState
"/home/andrea/work/cardano-mainnet/config/mainnet-config.json"
"/home/andrea/work/cardano-mainnet/socket/node.socket"
Mainnet
ChainPointAtGenesis
$ S.print . S.map f
print r

f :: ChainSyncEventWithLedgerState -> ChainSyncEvent ChainPoint
f = fmap g

g :: (BlockInMode CardanoMode, Either LedgerStateError (LedgerState, [LedgerEvent])) -> ChainPoint
g (BlockInMode (Block (BlockHeader sn ha _bn) _eim) _ct, _) = ChainPoint sn ha
37 changes: 37 additions & 0 deletions plutus-streaming/app/TestLedgerState3.hs
@@ -0,0 +1,37 @@
{-# OPTIONS_GHC -Wno-orphans #-}

import Cardano.Api
import Control.Monad.Except (runExceptT)
import Data.Time.LocalTime

deriving instance Show InitialLedgerStateError

deriving instance Show FoldBlocksError

deriving instance Show GenesisConfigError

main :: IO ()
main = do
r <-
runExceptT $
foldBlocks
"/home/andrea/work/cardano-mainnet/config/mainnet-config.json"
"/home/andrea/work/cardano-mainnet/socket/node.socket"
FullValidation
()
myfold
print r

myfold ::
Env ->
LedgerState ->
[LedgerEvent] ->
BlockInMode CardanoMode ->
() ->
IO ()
myfold _env _ls _le (BlockInMode blk _eim) _ = do
t <- getZonedTime
putStrLn $ show t <> " " <> show cp
where
(Block (BlockHeader sn ha _bn) _) = blk
cp = ChainPoint sn ha
42 changes: 39 additions & 3 deletions plutus-streaming/plutus-streaming.cabal
Expand Up @@ -32,17 +32,16 @@ library
Plutus.Streaming.LedgerState
build-depends:
base >=4.9 && <5,
aeson,
async,
base16-bytestring,
bytestring,
cardano-api,
containers,
plutus-chain-index-core,
plutus-ledger,
streaming,
transformers,
ouroboros-network,
ouroboros-network-framework,
hs-source-dirs: src
default-language: Haskell2010

Expand All @@ -52,12 +51,49 @@ executable plutus-streaming-cli
build-depends:
plutus-streaming,
base >=4.9 && <5,
aeson,
cardano-api,
mtl,
optparse-applicative,
plutus-chain-index-core,
pretty-simple,
streaming,

hs-source-dirs: app
default-language: Haskell2010

executable test-ledger-state-1
import: lang
main-is: TestLedgerState1.hs
build-depends:
plutus-streaming,
base >=4.9 && <5,
cardano-api,
mtl,
streaming,
hs-source-dirs: app
default-language: Haskell2010

executable test-ledger-state-2
import: lang
main-is: TestLedgerState2.hs
build-depends:
plutus-streaming,
base >=4.9 && <5,
cardano-api,
mtl,
streaming,
hs-source-dirs: app
default-language: Haskell2010

executable test-ledger-state-3
import: lang
main-is: TestLedgerState3.hs
build-depends:
plutus-streaming,
base >=4.9 && <5,
cardano-api,
mtl,
streaming,
time,
hs-source-dirs: app
default-language: Haskell2010
15 changes: 12 additions & 3 deletions plutus-streaming/src/Plutus/Streaming.hs
@@ -1,10 +1,12 @@
{-# LANGUAGE FlexibleInstances #-}

module Plutus.Streaming
( SimpleChainSyncEvent,
withSimpleChainSyncEventStream,
ChainSyncEventWithLedgerState,
withChainSyncEventStreamWithLedgerState,
ChainSyncEvent(..),
EventStreamResult(..)
ChainSyncEvent (..),
EventStreamResult (..),
)
where

Expand All @@ -13,13 +15,20 @@ import Cardano.Api.ChainSync.Client
import Control.Concurrent
import Control.Concurrent.Async
import Control.Monad.Trans.Except (runExceptT)
-- import Data.Aeson (ToJSON (..))
import GHC.Generics (Generic)
import Streaming
import Streaming.Prelude qualified as S

data ChainSyncEvent a
= RollForward a ChainTip
| RollBackward ChainPoint ChainTip
deriving (Show, Functor)
deriving (Show, Functor, Generic)

-- deriving instance Generic ChainPoint

-- instance ToJSON ChainPoint
-- instance ToJSON a => ToJSON (ChainSyncEvent a)

type SimpleChainSyncEvent = ChainSyncEvent (BlockInMode CardanoMode)

Expand Down
16 changes: 14 additions & 2 deletions plutus-streaming/src/Plutus/Streaming/ChainIndex.hs
@@ -1,4 +1,9 @@
module Plutus.Streaming.ChainIndex where
module Plutus.Streaming.ChainIndex (
utxoState,
utxoState',
UtxoState,
TxUtxoBalance
) where

import Plutus.ChainIndex (TxUtxoBalance)
import Plutus.ChainIndex.Compatibility qualified as CI
Expand All @@ -14,7 +19,14 @@ utxoState ::
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of (UtxoState TxUtxoBalance)) m r
utxoState =
S.scan step initial projection
S.map snd . utxoState'

utxoState' ::
Monad m =>
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of (SimpleChainSyncEvent, UtxoState TxUtxoBalance)) m r
utxoState' =
S.scanned step initial projection
where
step index (RollForward block _) =
case CI.fromCardanoBlock block of
Expand Down
21 changes: 17 additions & 4 deletions plutus-streaming/src/Plutus/Streaming/LedgerState.hs
@@ -1,5 +1,6 @@
module Plutus.Streaming.LedgerState
( ledgerState,
ledgerState',
LedgerState (..),
LedgerEvent (..),
)
Expand All @@ -15,6 +16,8 @@ import Unsafe.Coerce (unsafeCoerce)

data LedgerStateEvents = LedgerStateEvents LedgerState [LedgerEvent]

-- This terrible hack is because Cardano.Api does not export
-- LedgerStateEvents
applyBlock' ::
Env ->
LedgerState ->
Expand All @@ -25,23 +28,33 @@ applyBlock' = unsafeCoerce applyBlock

type History a = Seq (SlotNo, a)

ledgerState ::
forall m r.
Monad m =>
Env ->
LedgerState ->
ValidationMode ->
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of (LedgerState, [LedgerEvent])) m r
ledgerState env ls0 vm = S.map snd . ledgerState' env ls0 vm

-- | This function works under the assumption that the stream of blocks it
-- receives is valid. The function will trigger an exception if
-- 1. a block it receives does not apply on top of the ledger state
-- 2. a rollback goes past the security parameter
-- FIXME, for the moment I kept this function pure but it requires us to do
-- some up-front IO to obtain the initial ledger state from the network
-- config file.
ledgerState ::
ledgerState' ::
forall m r.
Monad m =>
Env ->
LedgerState ->
ValidationMode ->
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of (LedgerState, [LedgerEvent])) m r
ledgerState env ls0 vm =
S.scan step initialHistory projection
Stream (Of (SimpleChainSyncEvent, (LedgerState, [LedgerEvent]))) m r
ledgerState' env ls0 vm =
S.scanned step initialHistory projection
where
step ::
(History LedgerState, [LedgerEvent]) ->
Expand Down

0 comments on commit 1d014e8

Please sign in to comment.