Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
andreabedini authored and raduom committed May 21, 2022
1 parent 8e70035 commit 06eb0a4
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 43 deletions.
72 changes: 33 additions & 39 deletions plutus-streaming/app/Main.hs
Expand Up @@ -5,7 +5,6 @@ module Main where

import Cardano.Api
import Cardano.Api.Extras ()
import Control.Monad ((>=>))
import Data.Maybe qualified as Maybe
import Options.Applicative hiding (header)
import Plutus.Streaming
Expand Down Expand Up @@ -80,7 +79,7 @@ pPrintStream = S.mapM_ pPrint
howManyBlocksBeforeRollback ::
Monad m =>
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of Int) m ()
Stream (Of Int) m r
howManyBlocksBeforeRollback =
S.scan
( \acc ->
Expand All @@ -90,12 +89,11 @@ howManyBlocksBeforeRollback =
)
0
id
. S.take 100

howManyBlocksBeforeRollbackImpure ::
(Monad m, MonadIO m) =>
Stream (Of SimpleChainSyncEvent) m r ->
Stream (Of Int) m ()
Stream (Of Int) m r
howManyBlocksBeforeRollbackImpure =
S.scanM
( \acc ->
Expand All @@ -108,63 +106,59 @@ howManyBlocksBeforeRollbackImpure =
)
(pure 0)
pure
. S.take 100

composePureAndImpure ::
Stream (Of SimpleChainSyncEvent) IO r ->
IO ()
composePureAndImpure =
(pPrintStream . howManyBlocksBeforeRollbackImpure)
. (pPrintStream . howManyBlocksBeforeRollback)
. S.copy
-- composePureAndImpure ::
-- Stream (Of SimpleChainSyncEvent) IO r ->
-- IO r
-- composePureAndImpure =
-- (pPrintStream . howManyBlocksBeforeRollbackImpure)
-- . (pPrintStream . howManyBlocksBeforeRollback)
-- . S.copy

--
-- Main
--

deriving instance Show BlockHeader

main :: IO ()
main = do
options <- execParser $ info (optionsParser <**> helper) mempty

case options of
Simple {optionsSocketPath, optionsChainPoint, optionsExample} -> do
Simple {optionsSocketPath, optionsChainPoint, optionsExample} ->
withSimpleChainSyncEventStream
optionsSocketPath
Mainnet
optionsChainPoint
$ case optionsExample of
Print ->
S.stdoutLn . S.map (
\case
RollForward (BlockInMode (Block header _txs) _era) _ct -> "RollForward, header: " <> show header
RollBackward cp _ct -> "RollBackward, point: " <> show cp
) . S.take 10 >=> print
HowManyBlocksBeforeRollback ->
pPrintStream . howManyBlocksBeforeRollback >=> print
HowManyBlocksBeforeRollbackImpure ->
pPrintStream . howManyBlocksBeforeRollbackImpure >=> print
ComposePureAndImpure ->
composePureAndImpure >=> print
ChainIndex ->
-- pPrintStream . utxoState . S.print . S.map (fmap f) . S.copy . S.take 10 >=> print
pPrintStream . utxoState . S.take 10 >=> print
(doSimple optionsExample)
>>= print
WithLedgerState {optionsNetworkConfigPath, optionsSocketPath, optionsChainPoint} ->
withChainSyncEventStreamWithLedgerState
optionsNetworkConfigPath
optionsSocketPath
Mainnet
optionsChainPoint
(pPrintStream . S.take 10 >=> print)

deriving instance Show LedgerState
pPrintStream
>>= print

deriving instance Show LedgerEvent

deriving instance Show MIRDistributionDetails

deriving instance Show PoolReapDetails
doSimple ::
Example ->
Stream (Of SimpleChainSyncEvent) IO r ->
IO r
doSimple Print =
S.print
. S.map
( \case
RollForward (BlockInMode (Block header _txs) _era) _ct -> "RollForward, header: " <> show header
RollBackward cp _ct -> "RollBackward, point: " <> show cp
)
doSimple HowManyBlocksBeforeRollback =
S.print . howManyBlocksBeforeRollback
doSimple HowManyBlocksBeforeRollbackImpure =
S.print . howManyBlocksBeforeRollbackImpure
doSimple ComposePureAndImpure =
error "Not implemented"
doSimple ChainIndex =
S.print . utxoState

--
-- Utilities for development
Expand Down
11 changes: 11 additions & 0 deletions plutus-streaming/src/Cardano/Api/Extras.hs
@@ -1,4 +1,5 @@
{-# LANGUAGE FlexibleInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-}

module Cardano.Api.Extras where

Expand All @@ -21,3 +22,13 @@ instance IsString (Hash BlockHeader) where
Left msg -> Left ("invalid hex " ++ show str ++ ", " ++ msg)
where
ttoken = proxyToAsType (Proxy :: Proxy a)

deriving instance Show BlockHeader

deriving instance Show LedgerState

deriving instance Show LedgerEvent

deriving instance Show MIRDistributionDetails

deriving instance Show PoolReapDetails
6 changes: 2 additions & 4 deletions plutus-streaming/src/Plutus/Streaming/ChainIndex.hs
Expand Up @@ -16,13 +16,11 @@ utxoState ::
utxoState =
S.scan step initial projection
where
step index (RollForward block cardanoTip) =
step index (RollForward block _) =
case CI.fromCardanoBlock block of
Left err -> error ("FromCardanoError: " <> show err)
Right txs ->
-- this is wrong, there's a tip-vs-point confusion here
-- TxUtxoBalance.fromBlock wants a tip but it's a point instead
let tip = CI.fromCardanoTip cardanoTip
let tip = CI.tipFromCardanoBlock block
balance = TxUtxoBalance.fromBlock tip txs
in case UtxoState.insert balance index of
Left err ->
Expand Down

0 comments on commit 06eb0a4

Please sign in to comment.