Skip to content

Commit

Permalink
More tidy up, switch cli to JSON (as much as possible)
Browse files Browse the repository at this point in the history
  • Loading branch information
andreabedini authored and raduom committed May 23, 2022
1 parent 6721053 commit 7273926
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 208 deletions.
133 changes: 18 additions & 115 deletions plutus-streaming/app/Main.hs
@@ -1,39 +1,27 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -Wno-orphans #-}

module Main where

import Cardano.Api (Block (Block), BlockInMode (BlockInMode), CardanoMode, ChainPoint (ChainPoint, ChainPointAtGenesis),
NetworkId (Mainnet, Testnet), NetworkMagic (NetworkMagic), SlotNo (SlotNo), ToJSON)
import Cardano.Api (Block (Block), BlockInMode (BlockInMode), ChainPoint (ChainPoint, ChainPointAtGenesis),
NetworkId (Mainnet, Testnet), NetworkMagic (NetworkMagic), SlotNo (SlotNo))
import Cardano.Api.Extras ()
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Aeson qualified as Aeson
import Data.Maybe qualified as Maybe
import Data.Aeson.Text qualified as Aeson
import Data.Text.Lazy qualified as TL
import Options.Applicative (Alternative ((<|>)), Parser, auto, execParser, flag', help, helper, info, long, metavar,
option, str, strOption, value, (<**>))
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), SimpleChainSyncEvent,
withSimpleChainSyncEventStream)
import Plutus.Streaming.ChainIndex (utxoState)
import Streaming (Of, Stream)
option, str, strOption, (<**>))
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), withSimpleChainSyncEventStream)
import Streaming.Prelude qualified as S

--
-- Options parsing
--

data Example
= Print
| HowManyBlocksBeforeRollback
| HowManyBlocksBeforeRollbackImpure
| ComposePureAndImpure
| ChainIndex
deriving (Show, Read)

data Options = Options
{ optionsSocketPath :: String,
optionsNetworkId :: NetworkId,
optionsChainPoint :: ChainPoint,
optionsExample :: Example
optionsChainPoint :: ChainPoint
}
deriving (Show)

Expand All @@ -43,7 +31,6 @@ optionsParser =
<$> strOption (long "socket-path" <> help "Node socket path")
<*> networkIdParser
<*> chainPointParser
<*> option auto (long "example" <> value Print)

networkIdParser :: Parser NetworkId
networkIdParser =
Expand Down Expand Up @@ -75,108 +62,24 @@ chainPointParser =
<*> option str (long "block-hash" <> metavar "BLOCK-HASH")
)

--
-- Utilities
--

printJson :: (MonadIO m, ToJSON a) => Stream (Of a) m r -> m r
printJson = S.print . S.map Aeson.encode

--
-- Example consumers
--

howManyBlocksBeforeRollback ::
Monad m =>
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of Int) m r
howManyBlocksBeforeRollback =
S.scan
( \acc ->
\case
RollForward _ _ -> acc + 1
RollBackward _ _ -> acc
)
0
id

howManyBlocksBeforeRollbackImpure ::
(Monad m, MonadIO m) =>
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of Int) m r
howManyBlocksBeforeRollbackImpure =
S.scanM
( \acc ->
\case
RollForward _ _ ->
pure $ acc + 1
RollBackward _ _ -> do
liftIO $ putStrLn $ "Rollback after " ++ show acc ++ " blocks"
pure acc
)
(pure 0)
pure

-- composePureAndImpure ::
-- Stream (Of SimpleChainSyncEvent) IO r ->
-- IO r
-- composePureAndImpure =
-- (pPrintStream . howManyBlocksBeforeRollbackImpure)
-- . (pPrintStream . howManyBlocksBeforeRollback)
-- . S.copy

--
-- Main
--

main :: IO ()
main = do
Options {optionsSocketPath, optionsNetworkId, optionsChainPoint, optionsExample} <-
Options {optionsSocketPath, optionsNetworkId, optionsChainPoint} <-
execParser $ info (optionsParser <**> helper) mempty

withSimpleChainSyncEventStream
optionsSocketPath
optionsNetworkId
optionsChainPoint
(doSimple optionsExample)

doSimple ::
Example ->
Stream (Of SimpleChainSyncEvent) IO r ->
IO r
doSimple Print =
S.print
. S.map
( \case
RollForward (BlockInMode (Block header _txs) _era) _ct -> "RollForward, header: " <> show header
RollBackward cp _ct -> "RollBackward, point: " <> show cp
)
doSimple HowManyBlocksBeforeRollback =
S.print . howManyBlocksBeforeRollback
doSimple HowManyBlocksBeforeRollbackImpure =
S.print . howManyBlocksBeforeRollbackImpure
doSimple ComposePureAndImpure =
error "Not implemented"
doSimple ChainIndex =
S.print . utxoState

--
-- Utilities for development
--

nthBlock :: Int -> IO (BlockInMode CardanoMode)
nthBlock = nthBlockAt ChainPointAtGenesis

nthBlockAt :: ChainPoint -> Int -> IO (BlockInMode CardanoMode)
nthBlockAt point n = do
withSimpleChainSyncEventStream
"socket/node.socket"
Mainnet
point
( fmap Maybe.fromJust
. S.head_
. S.drop n
. S.catMaybes
. S.drop n
. S.map (\case RollForward bim _ -> Just bim; _ -> Nothing)
)
$ S.stdoutLn
. S.map
( \case
RollForward (BlockInMode (Block header _txs) _era) _ct ->
"RollForward, header: " <> TL.unpack (Aeson.encodeToLazyText header)
RollBackward cp _ct ->
"RollBackward, point: " <> TL.unpack (Aeson.encodeToLazyText cp)
)
2 changes: 1 addition & 1 deletion plutus-streaming/plutus-streaming.cabal
Expand Up @@ -47,7 +47,6 @@ library
plutus-chain-index-core,
stm,
streaming,
transformers,
ouroboros-network,
hysterical-screams

Expand All @@ -62,3 +61,4 @@ executable plutus-streaming-cli
cardano-api,
optparse-applicative,
streaming,
text
24 changes: 8 additions & 16 deletions plutus-streaming/src/Cardano/Api/Extras.hs
Expand Up @@ -3,17 +3,13 @@

module Cardano.Api.Extras where

import Cardano.Api (BlockHeader (BlockHeader),
GenesisConfigError (NEAlonzoConfig, NEByronConfig, NECardanoConfig, NEError, NEShelleyConfig),
HasTypeProxy (proxyToAsType), Hash,
InitialLedgerStateError (ILSEConfigFile, ILSEGenesisFile, ILSELedgerConsensusConfig),
LedgerEvent (MIRDistribution, PoolReRegistration, PoolReap, PoolRegistration, RewardsDistribution),
LedgerState (LedgerState), MIRDistributionDetails (MIRDistributionDetails),
PoolReapDetails (PoolReapDetails), SerialiseAsRawBytes (deserialiseFromRawBytes))
import Cardano.Api (BlockHeader (BlockHeader), BlockNo, ChainPoint (ChainPoint, ChainPointAtGenesis),
HasTypeProxy (proxyToAsType), Hash, SerialiseAsRawBytes (deserialiseFromRawBytes), ToJSON)
import Data.ByteString.Base16 qualified as Base16
import Data.ByteString.Char8 qualified as C8
import Data.Proxy (Proxy (Proxy))
import Data.String (IsString (fromString))
import GHC.Generics (Generic)

-- FIXME orphan instance
-- https://github.com/input-output-hk/cardano-node/pull/3608
Expand All @@ -29,16 +25,12 @@ instance IsString (Hash BlockHeader) where
where
ttoken = proxyToAsType (Proxy :: Proxy a)

deriving instance Show BlockHeader
deriving instance Generic ChainPoint

deriving instance Show LedgerState
instance ToJSON ChainPoint

deriving instance Show LedgerEvent
instance ToJSON BlockNo

deriving instance Show MIRDistributionDetails
deriving instance Generic BlockHeader

deriving instance Show PoolReapDetails

deriving instance Show InitialLedgerStateError

deriving instance Show GenesisConfigError
instance ToJSON BlockHeader
55 changes: 0 additions & 55 deletions plutus-streaming/src/Cardano/Api/IPC/Extras.hs

This file was deleted.

20 changes: 0 additions & 20 deletions plutus-streaming/src/Cardano/Api/Lens.hs

This file was deleted.

5 changes: 4 additions & 1 deletion plutus-streaming/src/Plutus/Streaming.hs
Expand Up @@ -11,10 +11,11 @@ import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (Chain
LocalChainSyncClient (LocalChainSyncClient),
LocalNodeClientProtocols (LocalNodeClientProtocols, localChainSyncClient, localStateQueryClient, localTxSubmissionClient),
LocalNodeConnectInfo (LocalNodeConnectInfo, localConsensusModeParams, localNodeNetworkId, localNodeSocketPath),
NetworkId, connectToLocalNode)
NetworkId, ToJSON, connectToLocalNode)
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgDone, SendMsgFindIntersect, SendMsgRequestNext),
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Cardano.Api.Extras ()
import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Exception (Exception, throw)
Expand All @@ -27,6 +28,8 @@ data ChainSyncEvent a
| RollBackward ChainPoint ChainTip
deriving (Show, Functor, Generic)

instance ToJSON a => ToJSON (ChainSyncEvent a)

type SimpleChainSyncEvent = ChainSyncEvent (BlockInMode CardanoMode)

data ChainSyncEventException
Expand Down

0 comments on commit 7273926

Please sign in to comment.