Skip to content


Streams of no return
Browse files Browse the repository at this point in the history
The Stream of blocks never ends, move out-of-band communication to be
exception based.
  • Loading branch information
andreabedini authored and raduom committed May 21, 2022
1 parent 95e6cd3 commit d914a5e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 49 deletions.
1 change: 0 additions & 1 deletion plutus-streaming/app/Main.hs
Expand Up @@ -139,7 +139,6 @@ main = do
(doSimple optionsExample)
>>= print

doSimple ::
Example ->
Expand Down
79 changes: 31 additions & 48 deletions plutus-streaming/src/Plutus/Streaming.hs
Expand Up @@ -2,7 +2,7 @@ module Plutus.Streaming
( SimpleChainSyncEvent,
ChainSyncEvent (..),
EventStreamResult (..),
ChainSyncEventException (..),

Expand All @@ -15,9 +15,9 @@ import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (Chain
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgDone, SendMsgFindIntersect, SendMsgRequestNext),
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent (Chan, MVar, newChan, newEmptyMVar, putMVar, readChan, takeMVar, writeChan)
import Control.Concurrent (Chan, newChan, readChan, writeChan)
import Control.Concurrent.Async (withAsync)
import Control.Exception (SomeException, catch)
import Control.Exception (Exception, throw)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S
Expand All @@ -29,38 +29,25 @@ data ChainSyncEvent a

type SimpleChainSyncEvent = ChainSyncEvent (BlockInMode CardanoMode)

data EventStreamResult
data ChainSyncEventException
= NoIntersectionFound
deriving (Show)

instance Exception ChainSyncEventException

withSimpleChainSyncEventStream ::
FilePath ->
NetworkId ->
-- | The point on the chain to start streaming from
ChainPoint ->
(Stream (Of SimpleChainSyncEvent) IO EventStreamResult -> IO b) ->
(Stream (Of SimpleChainSyncEvent) IO r -> IO b) ->
IO b
withSimpleChainSyncEventStream socketPath networkId point consumer = do
-- The chain-sync client runs in a different thread. It needs to send us
-- two kind of information 1) if it has managed to establish a connection
-- and found an intersection 2) the blocks it gets from the protocol.
-- I encapsulated both this information in a single MVar (Maybe Chan _)
-- The MVar needs to be written to by the client.
-- If the MVar has Nothing written to it, the client has run into issues
-- preventing it from finding an intersection.
-- If the MVar has (Just c) written to it, the client has succesfully
-- found an intersection and blocks are going to be available from the
-- channel c.
-- TODO the client needs to be able to reinitialise and keep going if the
-- connection fails.
mChan <- newEmptyMVar
-- The chain-sync client runs in a different thread and it will send us
-- block through this channel.
chan <- newChan

let client = chainSyncStreamingClient point mChan
let client = chainSyncStreamingClient point chan

localNodeClientProtocols =
Expand All @@ -79,21 +66,21 @@ withSimpleChainSyncEventStream socketPath networkId point consumer = do
-- FIXME this comes from the config file but Cardano.Api does not expose readNetworkConfig!
epochSlots = EpochSlots 40

clientThread =
clientThread = do
connectToLocalNode connectInfo localNodeClientProtocols
-- FIXME this is still not good enough, if an exception arises
-- after the client has started streaming, the consumer code
-- below will ignore the value of the MVar and will be stuck
-- waiting on the chan.
`catch` \(_ :: SomeException) -> putMVar mChan Nothing
-- the only reason the clien can terminate successfully is if it
-- doesn't find an intersection, we report that case to the
-- consumer as an exception
throw NoIntersectionFound

-- All exceptions in the client thread are passed to the consumer thread
-- TODO the client should be able to reinitialise and keep going if the
-- connection fails.
-- FIXME we still have a problem here, if the client dies while we are
-- waiting on the channel we get a BlockedIndefinitelyOnMVar right away
-- before the exception that killed the client
withAsync clientThread $ \_ -> do
mc <- takeMVar mChan
case mc of
Nothing ->
consumer $ return NoIntersectionFound
Just c -> do
consumer $ S.repeatM $ readChan c
consumer $ S.repeatM $ readChan chan

-- | `chainSyncStreamingClient` is the client that connects to a local node
-- and runs the chain-sync mini-protocol.
Expand All @@ -109,35 +96,31 @@ withSimpleChainSyncEventStream socketPath networkId point consumer = do
-- note in `withSimpleChainSyncEventStream`
chainSyncStreamingClient ::
ChainPoint ->
MVar (Maybe (Chan (ChainSyncEvent e))) ->
Chan (ChainSyncEvent e) ->
ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient point mChan =
chainSyncStreamingClient point chan =
ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect
onIntersect =
{ recvMsgIntersectFound = \_ _ ->
ChainSyncClient $ do
c <- newChan
putMVar mChan (Just c)
sendRequestNext c,
ChainSyncClient sendRequestNext,
recvMsgIntersectNotFound = \_ ->
ChainSyncClient $ do
putMVar mChan Nothing
pure $ SendMsgDone ()

sendRequestNext c =
sendRequestNext =
pure $ SendMsgRequestNext onNext (pure onNext)
onNext =
{ recvMsgRollForward = \bim ct ->
ChainSyncClient $ do
writeChan c (RollForward bim ct)
sendRequestNext c,
writeChan chan (RollForward bim ct)
recvMsgRollBackward = \cp ct ->
ChainSyncClient $ do
writeChan c (RollBackward cp ct)
sendRequestNext c
writeChan chan (RollBackward cp ct)

0 comments on commit d914a5e

Please sign in to comment.