Skip to content


Rework documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
andreabedini authored and raduom committed May 21, 2022
1 parent cc71792 commit 95e6cd3
Showing 1 changed file with 37 additions and 18 deletions.
55 changes: 37 additions & 18 deletions plutus-streaming/src/Plutus/Streaming.hs
@@ -1,5 +1,3 @@
{-# LANGUAGE FlexibleInstances #-}

module Plutus.Streaming
( SimpleChainSyncEvent,
Expand Down Expand Up @@ -38,11 +36,30 @@ data EventStreamResult
withSimpleChainSyncEventStream ::
FilePath ->
NetworkId ->
-- | The point on the chain to start streaming from
ChainPoint ->
(Stream (Of SimpleChainSyncEvent) IO EventStreamResult -> IO b) ->
IO b
withSimpleChainSyncEventStream socketPath networkId point consumer = do
-- The chain-sync client runs in a different thread. It needs to send us
-- two kind of information 1) if it has managed to establish a connection
-- and found an intersection 2) the blocks it gets from the protocol.
-- I encapsulated both this information in a single MVar (Maybe Chan _)
-- The MVar needs to be written to by the client.
-- If the MVar has Nothing written to it, the client has run into issues
-- preventing it from finding an intersection.
-- If the MVar has (Just c) written to it, the client has succesfully
-- found an intersection and blocks are going to be available from the
-- channel c.
-- TODO the client needs to be able to reinitialise and keep going if the
-- connection fails.
mChan <- newEmptyMVar

let client = chainSyncStreamingClient point mChan

localNodeClientProtocols =
Expand All @@ -64,6 +81,10 @@ withSimpleChainSyncEventStream socketPath networkId point consumer = do

clientThread =
connectToLocalNode connectInfo localNodeClientProtocols
-- FIXME this is still not good enough, if an exception arises
-- after the client has started streaming, the consumer code
-- below will ignore the value of the MVar and will be stuck
-- waiting on the chan.
`catch` \(_ :: SomeException) -> putMVar mChan Nothing

withAsync clientThread $ \_ -> do
Expand All @@ -74,36 +95,34 @@ withSimpleChainSyncEventStream socketPath networkId point consumer = do
Just c -> do
consumer $ S.repeatM $ readChan c

-- | This is the "core" client that connects to a local node and
-- runs the chain-sync mini-protocol. The only job of this client is to
-- keep sending requests for new blocks, and passing the results (a
-- `ChainSyncEvent`) to the consumer. In particular, this client is
-- fire-and-forget and does not require any control from the consumer.
-- | `chainSyncStreamingClient` is the client that connects to a local node
-- and runs the chain-sync mini-protocol.
-- I am using the term "streaming client" because the only things it does
-- is to keep sending requests for new blocks.
-- In particular, this client is fire-and-forget and does not require any
-- control.
-- Blocks obtained from the chain-sync mini-protocol are passed to a
-- consumer through a channel. To understand the MVar-Maybe-Chan dance see
-- note in `withSimpleChainSyncEventStream`
chainSyncStreamingClient ::
-- | The point on the chain to start from
ChainPoint ->
-- | This MVar is how we communicate back to the consumer. The idea here
-- is that the client might fail to initialise but once it's initalised
-- it will always be able to spit out `ChainSyncEvent`s.
MVar (Maybe (Chan (ChainSyncEvent e))) ->
-- | The entry point to the client to pass to `connectToLocalNode`
ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient point mChan =
ChainSyncClient $ do
putStrLn "Connecting ..."
pure $ SendMsgFindIntersect [point] onIntersect
ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect
onIntersect =
{ recvMsgIntersectFound = \point' _ ->
{ recvMsgIntersectFound = \_ _ ->
ChainSyncClient $ do
putStrLn $ "Intersection found at " ++ show point'
c <- newChan
putMVar mChan (Just c)
sendRequestNext c,
recvMsgIntersectNotFound = \_ ->
ChainSyncClient $ do
putStrLn "Intersection not found"
putMVar mChan Nothing
pure $ SendMsgDone ()
Expand Down

0 comments on commit 95e6cd3

Please sign in to comment.