Skip to content

Commit

Permalink
You need to dup a broadcast channel in order to read. (#422)
Browse files Browse the repository at this point in the history
  • Loading branch information
raduom committed May 21, 2022
1 parent 52b4dd5 commit 1b0483a
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions plutus-streaming/src/Plutus/Streaming.hs
Expand Up @@ -16,7 +16,7 @@ import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgDone, SendMsgFindInter
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM (TChan, atomically, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Exception (Exception, throw)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
Expand Down Expand Up @@ -46,6 +46,7 @@ withSimpleChainSyncEventStream socketPath networkId point consumer = do
-- The chain-sync client runs in a different thread and it will send us
-- block through this channel.
chan <- newBroadcastTChanIO
readerChannel <- atomically $ dupTChan chan

let client = chainSyncStreamingClient point chan

Expand Down Expand Up @@ -80,7 +81,7 @@ withSimpleChainSyncEventStream socketPath networkId point consumer = do
-- waiting on the channel we get a BlockedIndefinitelyOnMVar right away
-- before the exception that killed the client
withAsync clientThread $ \_ -> do
consumer $ S.repeatM $ atomically (readTChan chan)
consumer $ S.repeatM $ atomically (readTChan readerChannel)

-- | `chainSyncStreamingClient` is the client that connects to a local node
-- and runs the chain-sync mini-protocol.
Expand Down

0 comments on commit 1b0483a

Please sign in to comment.