Skip to content

Commit

Permalink
Fix composition of two stream consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
andreabedini authored and raduom committed May 21, 2022
1 parent 6b5d107 commit 97eca22
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions plutus-streaming/app/Main.hs
Expand Up @@ -66,8 +66,9 @@ justBlocks =
. S.take 10

howManyBlocksBeforeRollback ::
Stream (Of ChainSyncEvent) IO () ->
Stream (Of Int) IO ()
Monad m =>
Stream (Of ChainSyncEvent) m () ->
Stream (Of Int) m ()
howManyBlocksBeforeRollback =
S.scan
( \acc ->
Expand All @@ -80,32 +81,30 @@ howManyBlocksBeforeRollback =
. S.take 100

howManyBlocksBeforeRollbackImpure ::
Stream (Of ChainSyncEvent) IO () ->
Stream (Of Int) IO ()
(Monad m, MonadIO m) =>
Stream (Of ChainSyncEvent) m () ->
Stream (Of Int) m ()
howManyBlocksBeforeRollbackImpure =
S.scanM
( \acc ->
\case
RollForward _ _ ->
pure $ acc + 1
RollBackward _ _ -> do
putStrLn $ "Rollback after " ++ show acc ++ " blocks"
liftIO $ putStrLn $ "Rollback after " ++ show acc ++ " blocks"
pure acc
)
(pure 0)
pure
. S.take 100

--
-- Ah! This doesn't do what I meant and I think for good reasons
--
composePureAndImpure ::
Stream (Of ChainSyncEvent) IO () ->
Stream (Of (Int, Int)) IO ()
composePureAndImpure s =
S.zip
(howManyBlocksBeforeRollback s)
(howManyBlocksBeforeRollbackImpure s)
IO ()
composePureAndImpure =
(S.print . howManyBlocksBeforeRollbackImpure)
. (S.print . howManyBlocksBeforeRollback)
. S.copy

--
-- Main
Expand All @@ -130,7 +129,7 @@ main = do
JustBlocks -> S.print . justBlocks
HowManyBlocksBeforeRollback -> S.print . howManyBlocksBeforeRollback
HowManyBlocksBeforeRollbackImpure -> S.print . howManyBlocksBeforeRollbackImpure
ComposePureAndImpure -> S.print . composePureAndImpure
ComposePureAndImpure -> composePureAndImpure

--
-- Utilities for development
Expand Down

0 comments on commit 97eca22

Please sign in to comment.