Skip to content

Commit

Permalink
p2p-governor: interface to ledger peers
Browse files Browse the repository at this point in the history
withLedgerPeers runs ledger peers worker thread, creates a two way
communication channel with it and exposes a function which allows to
request 'NumberOfPeer' and blocks until the ledger peers will respond.
  • Loading branch information
coot committed Sep 28, 2021
1 parent fcfb5c0 commit 0b9328b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
Expand Up @@ -17,7 +17,7 @@ module Ouroboros.Network.PeerSelection.LedgerPeers (
NumberOfPeers (..),
pickPeers,
accPoolStake,
runLedgerPeers,
withLedgerPeers,
UseLedgerAfter (..),

Socket.PortNumber
Expand Down Expand Up @@ -201,20 +201,21 @@ pickPeers inRng tracer pools (NumberOfPeers cnt) = go inRng cnt []


-- | Run the LedgerPeers worker thread.
runLedgerPeers :: forall m.
( MonadAsync m
, MonadTime m
)
=> StdGen
-> Tracer m TraceLedgerPeers
-> STM m UseLedgerAfter
-> LedgerPeersConsensusInterface m
-> ([DomainAddress] -> m (Map DomainAddress (Set SockAddr)))
-> STM m NumberOfPeers
-> (Maybe (Set SockAddr, DiffTime) -> STM m ())
-> m Void
runLedgerPeers inRng tracer readUseLedgerAfter LedgerPeersConsensusInterface{..} doResolve
getReq putRsp = do
--
ledgerPeersThread :: forall m.
( MonadAsync m
, MonadTime m
)
=> StdGen
-> Tracer m TraceLedgerPeers
-> STM m UseLedgerAfter
-> LedgerPeersConsensusInterface m
-> ([DomainAddress] -> m (Map DomainAddress (Set SockAddr)))
-> STM m NumberOfPeers
-> (Maybe (Set SockAddr, DiffTime) -> STM m ())
-> m Void
ledgerPeersThread inRng tracer readUseLedgerAfter LedgerPeersConsensusInterface{..} doResolve
getReq putRsp =
go inRng (Time 0) Map.empty
where
go :: StdGen -> Time -> Map AccPoolStake (PoolStake, NonEmpty RelayAddress) -> m Void
Expand Down Expand Up @@ -290,3 +291,34 @@ runLedgerPeers inRng tracer readUseLedgerAfter LedgerPeersConsensusInterface{..}
splitPeers (addrs, domains) (RelayAddress ip port) =
let !addr = IP.toSockAddr (ip, port) in
(Set.insert addr addrs, domains)


-- | For a LederPeers worker thread and submit request and receive responses.
--
withLedgerPeers :: forall m a.
( MonadAsync m
, MonadTime m
)
=> StdGen
-> Tracer m TraceLedgerPeers
-> STM m UseLedgerAfter
-> LedgerPeersConsensusInterface m
-> ([DomainAddress] -> m (Map DomainAddress (Set SockAddr)))
-> ( (NumberOfPeers -> m (Maybe (Set SockAddr, DiffTime)))
-> Async m Void
-> m a )
-> m a
withLedgerPeers inRng tracer readUseLedgerAfter interface doResolve k = do
reqVar <- newEmptyTMVarIO
respVar <- newEmptyTMVarIO
let getRequest = takeTMVar reqVar
putResponse = putTMVar respVar
request :: NumberOfPeers -> m (Maybe (Set SockAddr, DiffTime))
request = \numberOfPeers -> atomically $ do
putTMVar reqVar numberOfPeers
takeTMVar respVar
withAsync
( ledgerPeersThread inRng tracer readUseLedgerAfter interface doResolve
getRequest putResponse )
$ \ thread -> k request thread

Expand Up @@ -42,8 +42,7 @@ withPeerSelectionActions
-> STM IO [RelayAddress]
-- ^ public root peers
-> PeerStateActions Socket.SockAddr peerconn IO
-> (NumberOfPeers -> STM IO ())
-> STM IO (Maybe (Set Socket.SockAddr, DiffTime))
-> (NumberOfPeers -> IO (Maybe (Set Socket.SockAddr, DiffTime)))
-> (Maybe (Async IO Void)
-> PeerSelectionActions Socket.SockAddr peerconn IO
-> IO a)
Expand All @@ -58,7 +57,6 @@ withPeerSelectionActions
readLocalRootPeers
readPublicRootPeers
peerStateActions
reqLedgerPeers
getLedgerPeers
k = do
localRootsVar <- newTVarIO mempty
Expand All @@ -85,8 +83,7 @@ withPeerSelectionActions
requestLedgerPeers :: DNSActions DNS.Resolver IOException IO
-> Int -> IO (Set Socket.SockAddr, DiffTime)
requestLedgerPeers dnsActions n = do
atomically $ reqLedgerPeers $ NumberOfPeers $ fromIntegral n
peers_m <- atomically getLedgerPeers
peers_m <- getLedgerPeers (NumberOfPeers $ fromIntegral n)
case peers_m of
Nothing -> requestPublicRootPeers dnsActions n
Just peers -> return peers
Expand Down

0 comments on commit 0b9328b

Please sign in to comment.