Skip to content

Commit

Permalink
Replace ChainTip by ChainPoint the event streaming type
Browse files Browse the repository at this point in the history
  • Loading branch information
andreabedini authored and raduom committed May 21, 2022
1 parent 77b65b1 commit a51aa29
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 117 deletions.
16 changes: 8 additions & 8 deletions plutus-streaming/app/Main.hs
Expand Up @@ -4,14 +4,14 @@

module Main where

import Cardano.Api (Block (Block), BlockInMode (BlockInMode), ChainPoint (ChainPoint, ChainPointAtGenesis),
NetworkId (Mainnet, Testnet), NetworkMagic (NetworkMagic), SlotNo (SlotNo))
import Cardano.Api (ChainPoint (ChainPoint, ChainPointAtGenesis), NetworkId (Mainnet, Testnet),
NetworkMagic (NetworkMagic), SlotNo (SlotNo))
import Cardano.Api.Extras ()
import Data.Aeson.Text qualified as Aeson
import Data.Text.Lazy qualified as TL
import Options.Applicative (Alternative ((<|>)), Parser, auto, execParser, flag', help, helper, info, long, metavar,
option, str, strOption, (<**>))
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), withSimpleChainSyncEventStream)
import Plutus.Streaming (StreamerEvent (Append, Revert), withSimpleStreamerEventStream)
import Streaming.Prelude qualified as S

--
Expand Down Expand Up @@ -71,15 +71,15 @@ main = do
Options {optionsSocketPath, optionsNetworkId, optionsChainPoint} <-
execParser $ info (optionsParser <**> helper) mempty

withSimpleChainSyncEventStream
withSimpleStreamerEventStream
optionsSocketPath
optionsNetworkId
optionsChainPoint
$ S.stdoutLn
. S.map
( \case
RollForward (BlockInMode (Block header _txs) _era) _ct ->
"RollForward, header: " <> TL.unpack (Aeson.encodeToLazyText header)
RollBackward cp _ct ->
"RollBackward, point: " <> TL.unpack (Aeson.encodeToLazyText cp)
Append cp _bim ->
"Append block: " <> TL.unpack (Aeson.encodeToLazyText cp)
Revert cp ->
"Revert to point: " <> TL.unpack (Aeson.encodeToLazyText cp)
)
39 changes: 9 additions & 30 deletions plutus-streaming/app/Main2.hs
Expand Up @@ -6,15 +6,15 @@

module Main where

import Cardano.Api qualified
import Cardano.Api (ChainPoint (ChainPoint, ChainPointAtGenesis), NetworkId (Mainnet, Testnet),
NetworkMagic (NetworkMagic), SlotNo (SlotNo), ToJSON)
import Cardano.Api.Extras ()
import Data.Aeson qualified
import Data.ByteString.Lazy.Char8 qualified
import Data.Set (Set)
import Ledger qualified
import Options.Applicative (Alternative ((<|>)), Parser, auto, execParser, flag', help, helper, info, long, metavar,
option, str, strOption, (<**>))
import Plutus.Streaming (withSimpleChainSyncEventStream)
import Plutus.Streaming (withSimpleStreamerEventStream)
import Plutus.Streaming.Transactions (transactions, txInsAndOuts')
import Streaming.Prelude qualified as S

--
Expand Down Expand Up @@ -65,41 +65,20 @@ chainPointParser =
<*> option str (long "block-hash" <> metavar "BLOCK-HASH")
)

printJson :: ToJSON a => S.Stream (S.Of a) IO r -> IO r
printJson = S.mapM_ Data.ByteString.Lazy.Char8.putStrLn . S.map Data.Aeson.encode

--
-- Main
--

transactions ::
Cardano.Api.BlockInMode Cardano.Api.CardanoMode ->
[Ledger.CardanoTx]
transactions (Cardano.Api.BlockInMode (Cardano.Api.Block _ txs) eim) =
map (\tx -> Ledger.CardanoApiTx (workaround (Ledger.SomeTx tx) eim)) txs

-- https://github.com/input-output-hk/cardano-node/pull/3665
workaround ::
(Cardano.Api.IsCardanoEra era => Cardano.Api.EraInMode era Cardano.Api.CardanoMode -> a) ->
Cardano.Api.EraInMode era Cardano.Api.CardanoMode ->
a
workaround k Cardano.Api.ByronEraInCardanoMode = k Cardano.Api.ByronEraInCardanoMode
workaround k Cardano.Api.ShelleyEraInCardanoMode = k Cardano.Api.ShelleyEraInCardanoMode
workaround k Cardano.Api.AllegraEraInCardanoMode = k Cardano.Api.AllegraEraInCardanoMode
workaround k Cardano.Api.MaryEraInCardanoMode = k Cardano.Api.MaryEraInCardanoMode
workaround k Cardano.Api.AlonzoEraInCardanoMode = k Cardano.Api.AlonzoEraInCardanoMode

txInsAndOuts ::
Ledger.CardanoTx ->
(Set Ledger.TxIn, [(Ledger.TxOut, Ledger.TxOutRef)])
txInsAndOuts tx =
(Ledger.getCardanoTxInputs tx, Ledger.getCardanoTxOutRefs tx)

main :: IO ()
main = do
Options {optionsSocketPath, optionsNetworkId, optionsChainPoint} <-
execParser $ info (optionsParser <**> helper) mempty

withSimpleChainSyncEventStream
withSimpleStreamerEventStream
optionsSocketPath
optionsNetworkId
optionsChainPoint
(S.mapM_ Data.ByteString.Lazy.Char8.putStrLn . S.map Data.Aeson.encode . S.map (fmap (map txInsAndOuts . transactions)))

(printJson . S.map (fmap (map txInsAndOuts' . transactions)))
1 change: 1 addition & 0 deletions plutus-streaming/plutus-streaming.cabal
Expand Up @@ -37,6 +37,7 @@ library
Plutus.Streaming
Plutus.Streaming.ChainIndex
Plutus.Streaming.LedgerState
Plutus.Streaming.Transactions
build-depends:
base >=4.9 && <5,
async,
Expand Down
54 changes: 30 additions & 24 deletions plutus-streaming/src/Plutus/Streaming.hs
@@ -1,17 +1,17 @@
module Plutus.Streaming
( SimpleChainSyncEvent,
withSimpleChainSyncEventStream,
ChainSyncEvent (..),
ChainSyncEventException (..),
( SimpleStreamerEvent,
withSimpleStreamerEventStream,
StreamerEvent (..),
StreamerEventException (..),
)
where

import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (ChainSyncClient), ChainTip,
ConsensusModeParams (CardanoModeParams), EpochSlots (EpochSlots),
import Cardano.Api (BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode, ChainPoint (ChainPoint),
ChainSyncClient (ChainSyncClient), ConsensusModeParams (CardanoModeParams), EpochSlots (EpochSlots),
LocalChainSyncClient (LocalChainSyncClient),
LocalNodeClientProtocols (LocalNodeClientProtocols, localChainSyncClient, localStateQueryClient, localTxSubmissionClient),
LocalNodeConnectInfo (LocalNodeConnectInfo, localConsensusModeParams, localNodeNetworkId, localNodeSocketPath),
NetworkId, ToJSON, connectToLocalNode)
NetworkId, ToJSON, connectToLocalNode, getBlockHeader)
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgDone, SendMsgFindIntersect, SendMsgRequestNext),
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
Expand All @@ -23,29 +23,29 @@ import GHC.Generics (Generic)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S

data ChainSyncEvent a
= RollForward a ChainTip
| RollBackward ChainPoint ChainTip
data StreamerEvent a
= Append ChainPoint a
| Revert ChainPoint
deriving (Show, Functor, Generic)

instance ToJSON a => ToJSON (ChainSyncEvent a)
instance ToJSON a => ToJSON (StreamerEvent a)

type SimpleChainSyncEvent = ChainSyncEvent (BlockInMode CardanoMode)
type SimpleStreamerEvent = StreamerEvent (BlockInMode CardanoMode)

data ChainSyncEventException
data StreamerEventException
= NoIntersectionFound
deriving (Show)

instance Exception ChainSyncEventException
instance Exception StreamerEventException

withSimpleChainSyncEventStream ::
withSimpleStreamerEventStream ::
FilePath ->
NetworkId ->
-- | The point on the chain to start streaming from
ChainPoint ->
(Stream (Of SimpleChainSyncEvent) IO r -> IO b) ->
(Stream (Of SimpleStreamerEvent) IO r -> IO b) ->
IO b
withSimpleChainSyncEventStream socketPath networkId point consumer = do
withSimpleStreamerEventStream socketPath networkId point consumer = do
-- The chain-sync client runs in a different thread and it will send us
-- block through this channel.
chan <- newBroadcastTChanIO
Expand Down Expand Up @@ -97,11 +97,15 @@ withSimpleChainSyncEventStream socketPath networkId point consumer = do
--
-- Blocks obtained from the chain-sync mini-protocol are passed to a
-- consumer through a channel. To understand the MVar-Maybe-Chan dance see
-- note in `withSimpleChainSyncEventStream`
-- note in `withSimpleStreamerEventStream`
-- chainSyncStreamingClient ::
-- ChainPoint ->
-- TChan (StreamerEvent e) ->
-- ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient ::
ChainPoint ->
TChan (ChainSyncEvent e) ->
ChainSyncClient e ChainPoint ChainTip IO ()
TChan (StreamerEvent (BlockInMode mode)) ->
ChainSyncClient (BlockInMode mode) ChainPoint tip IO ()
chainSyncStreamingClient point chan =
ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect
where
Expand All @@ -119,12 +123,14 @@ chainSyncStreamingClient point chan =
where
onNext =
ClientStNext
{ recvMsgRollForward = \bim ct ->
{ recvMsgRollForward = \bim@(BlockInMode blk _eim) _ct ->
ChainSyncClient $ do
atomically $ writeTChan chan (RollForward bim ct)
let (BlockHeader slotNo blockHash _blockNo) = getBlockHeader blk
let cp = ChainPoint slotNo blockHash
atomically $ writeTChan chan (Append cp bim)
sendRequestNext,
recvMsgRollBackward = \cp ct ->
recvMsgRollBackward = \cp _ct ->
ChainSyncClient $ do
atomically $ writeTChan chan (RollBackward cp ct)
atomically $ writeTChan chan (Revert cp)
sendRequestNext
}
49 changes: 7 additions & 42 deletions plutus-streaming/src/Plutus/Streaming/ChainIndex.hs
@@ -1,6 +1,3 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}

module Plutus.Streaming.ChainIndex
( utxoState,
utxoState',
Expand All @@ -9,40 +6,30 @@ module Plutus.Streaming.ChainIndex
)
where

import Cardano.Api (AddressAny, AddressInEra (AddressInEra), Block (Block), BlockInMode (BlockInMode), CardanoMode,
ChainPoint (ChainPointAtGenesis), CtxTx, NetworkId (Mainnet), Tx (Tx), TxBody (TxBody),
TxBodyContent (TxBodyContent, txOuts), TxOut (TxOut), toAddressAny)
import Data.Set (Set)
import Ledger (TxIn, TxOut, TxOutRef)
import Ledger.Tx.CardanoAPI (FromCardanoError)
import Plutus.ChainIndex (TxUtxoBalance)
import Plutus.ChainIndex.Compatibility qualified
import Plutus.ChainIndex.Compatibility qualified as CI
import Plutus.ChainIndex.Tx (ChainIndexTx (_citxInputs), txOutsWithRef)
import Plutus.ChainIndex.TxUtxoBalance qualified as TxUtxoBalance
import Plutus.ChainIndex.UtxoState (UtxoIndex, UtxoState)
import Plutus.ChainIndex.UtxoState qualified as UtxoState
import Plutus.Contract.CardanoAPI qualified
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), SimpleChainSyncEvent,
withSimpleChainSyncEventStream)
import Plutus.Streaming (SimpleStreamerEvent, StreamerEvent (Append, Revert))
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S

utxoState ::
Monad m =>
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of SimpleStreamerEvent) m r ->
Stream (Of (UtxoState TxUtxoBalance)) m r
utxoState =
S.map snd . utxoState'

utxoState' ::
Monad m =>
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of (SimpleChainSyncEvent, UtxoState TxUtxoBalance)) m r
Stream (Of SimpleStreamerEvent) m r ->
Stream (Of (SimpleStreamerEvent, UtxoState TxUtxoBalance)) m r
utxoState' =
S.scanned step initial projection
where
step index (RollForward block _) =
step index (Append _cp block) =
case CI.fromCardanoBlock block of
Left err -> error ("FromCardanoError: " <> show err)
Right txs ->
Expand All @@ -53,8 +40,8 @@ utxoState' =
error (show err)
Right (UtxoState.InsertUtxoSuccess newIndex _insertPosition) ->
newIndex
step index (RollBackward cardanoPoint _) =
let point = CI.fromCardanoPoint cardanoPoint
step index (Revert cp) =
let point = CI.fromCardanoPoint cp
in case TxUtxoBalance.rollback point index of
Left err -> error (show err)
Right (UtxoState.RollbackResult _newTip rolledBackIndex) ->
Expand All @@ -64,25 +51,3 @@ utxoState' =
initial = mempty

projection = UtxoState.utxoState

--
-- Experimental stuff
--

data Some f = forall a. Some (f a)

f ::
BlockInMode CardanoMode ->
[Some (Cardano.Api.TxOut CtxTx)]
f (BlockInMode (Block _bh txs) _eim) =
concatMap (\(Tx (TxBody TxBodyContent {txOuts}) _kws) -> map Some txOuts) txs

getTxOuts ::
BlockInMode CardanoMode ->
[AddressAny]
getTxOuts (BlockInMode (Block _bh txs) _eim) =
foldMap go txs
where
go (Tx (TxBody TxBodyContent {txOuts}) _kws) =
map (\(TxOut (AddressInEra _ addr) _ _) -> toAddressAny addr) txOuts

26 changes: 13 additions & 13 deletions plutus-streaming/src/Plutus/Streaming/LedgerState.hs
@@ -1,16 +1,16 @@
module Plutus.Streaming.LedgerState
( ledgerState,
ledgerState',
LedgerState (..),
LedgerEvent (..),
)
where

import Cardano.Api
import Data.Sequence (Seq (..))
import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode),
ChainPoint (ChainPoint, ChainPointAtGenesis), Env, LedgerEvent, LedgerState, LedgerStateError,
SlotNo, ValidationMode, applyBlock, envSecurityParam)
import Data.Sequence (Seq ((:<|)))
import Data.Sequence qualified as Seq
import Plutus.Streaming
import Streaming
import Plutus.Streaming (SimpleStreamerEvent, StreamerEvent (Append, Revert))
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S
import Unsafe.Coerce (unsafeCoerce)

Expand All @@ -34,7 +34,7 @@ ledgerState ::
Env ->
LedgerState ->
ValidationMode ->
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of SimpleStreamerEvent) m r ->
Stream (Of (LedgerState, [LedgerEvent])) m r
ledgerState env ls0 vm = S.map snd . ledgerState' env ls0 vm

Expand All @@ -51,20 +51,20 @@ ledgerState' ::
Env ->
LedgerState ->
ValidationMode ->
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of (SimpleChainSyncEvent, (LedgerState, [LedgerEvent]))) m r
Stream (Of SimpleStreamerEvent) m r ->
Stream (Of (SimpleStreamerEvent, (LedgerState, [LedgerEvent]))) m r
ledgerState' env ls0 vm =
S.scanned step initialHistory projection
where
step ::
(History LedgerState, [LedgerEvent]) ->
SimpleChainSyncEvent ->
SimpleStreamerEvent ->
(History LedgerState, [LedgerEvent])
step (history, _) (RollForward (BlockInMode blk _) _) =
step (history, _) (Append _ (BlockInMode blk _)) =
unsafePushBlock history blk
step _ (RollBackward ChainPointAtGenesis _) =
step _ (Revert ChainPointAtGenesis) =
initialHistory
step (history, _) (RollBackward (ChainPoint sn _) _) =
step (history, _) (Revert (ChainPoint sn _)) =
unsafeRollback history sn

initialHistory :: (History LedgerState, [LedgerEvent])
Expand Down

0 comments on commit a51aa29

Please sign in to comment.