Skip to content

Commit

Permalink
Integrate with eclipse-evasion changes
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed May 31, 2023
1 parent 933e72b commit 61d6e73
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 45 deletions.
4 changes: 2 additions & 2 deletions src/System/Metrics/Acceptor.hs
Expand Up @@ -25,8 +25,8 @@ runEKGAcceptor config ekgStore =
Left (_e :: SomeException) -> runEKGAcceptor config ekgStore
Right _ -> return ()
where
mkStores = do
mkStores _ = do
metricsStore <- newTVarIO emptyMetricsLocalStore
return (ekgStore, metricsStore)

peerErrorHandler = return ()
peerErrorHandler _ = return ()
59 changes: 34 additions & 25 deletions src/System/Metrics/Network/Acceptor.hs
Expand Up @@ -19,16 +19,18 @@ import qualified Data.Text as T
import Data.Time.Clock (NominalDiffTime)
import Data.Void (Void)
import qualified Network.Socket as Socket
import Ouroboros.Network.Context (MinimalInitiatorContext, ResponderContext)
import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits)
import Ouroboros.Network.ErrorPolicy (nullErrorPolicies)
import Ouroboros.Network.IOManager (withIOManager)
import Ouroboros.Network.Driver.Simple (runPeer)
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..),
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..),
MiniProtocolLimits (..),
MiniProtocolNum (..), MuxMode (..),
OuroborosApplication (..), MuxPeer (..),
OuroborosApplication (..),
RunMiniProtocol (..),
miniProtocolLimits, miniProtocolNum, miniProtocolRun)
import Ouroboros.Network.Snocket (MakeBearer, Snocket,
import Ouroboros.Network.Snocket (LocalAddress, MakeBearer, Snocket,
localAddressFromPath, localSnocket, socketSnocket,
makeLocalBearer, makeSocketBearer)
import Ouroboros.Network.Socket (AcceptedConnectionsLimit (..),
Expand All @@ -54,20 +56,21 @@ import System.Metrics.Configuration (AcceptorConfiguration (..), HowTo

listenToForwarder
:: AcceptorConfiguration
-> IO (EKG.Store, TVar MetricsLocalStore)
-> IO ()
-> (ResponderContext (Either LocalAddress Socket.SockAddr) -> IO (EKG.Store, TVar MetricsLocalStore))
-> (ResponderContext (Either LocalAddress Socket.SockAddr) -> IO ())
-> IO Void
listenToForwarder config mkStores peerErrorHandler = withIOManager $ \iocp -> do
let app = acceptorApp config mkStores peerErrorHandler
case forwarderEndpoint config of
LocalPipe localPipe -> do
let snocket = localSnocket iocp
let app = acceptorApp config (mkStores . fmap Left) (peerErrorHandler . fmap Left)
snocket = localSnocket iocp
address = localAddressFromPath localPipe
configureSocket = mempty
doListenToForwarder snocket makeLocalBearer configureSocket address noTimeLimitsHandshake app
RemoteSocket host port -> do
listenAddress:_ <- Socket.getAddrInfo Nothing (Just $ T.unpack host) (Just $ show port)
let snocket = socketSnocket iocp
let app = acceptorApp config (mkStores . fmap Right) (peerErrorHandler . fmap Right)
snocket = socketSnocket iocp
address = Socket.addrAddress listenAddress
configureSocket fd _addr =
Socket.setSocketOption fd Socket.ReuseAddr 1
Expand All @@ -80,7 +83,10 @@ doListenToForwarder
-> (fd -> addr -> IO ()) -- ^ configure socket
-> addr
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> OuroborosApplication 'ResponderMode addr LBS.ByteString IO Void ()
-> OuroborosApplication 'ResponderMode
(MinimalInitiatorContext addr)
(ResponderContext addr)
LBS.ByteString IO Void ()
-> IO Void
doListenToForwarder snocket makeBearer configureSocket address timeLimits app = do
networkState <- newNetworkMutableState
Expand All @@ -106,11 +112,14 @@ doListenToForwarder snocket makeBearer configureSocket address timeLimits app =

acceptorApp
:: AcceptorConfiguration
-> IO (EKG.Store, TVar MetricsLocalStore)
-> IO ()
-> OuroborosApplication 'ResponderMode addr LBS.ByteString IO Void ()
-> (ResponderContext addr -> IO (EKG.Store, TVar MetricsLocalStore))
-> (ResponderContext addr -> IO ())
-> OuroborosApplication 'ResponderMode
(MinimalInitiatorContext addr)
(ResponderContext addr)
LBS.ByteString IO Void ()
acceptorApp config mkStores peerErrorHandler =
OuroborosApplication $ \_connectionId _shouldStopSTM -> [
OuroborosApplication [
MiniProtocol
{ miniProtocolNum = MiniProtocolNum 2
, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound }
Expand All @@ -120,35 +129,35 @@ acceptorApp config mkStores peerErrorHandler =

acceptEKGMetricsResp
:: AcceptorConfiguration
-> IO (EKG.Store, TVar MetricsLocalStore)
-> IO ()
-> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void ()
-> (responderCtx -> IO (EKG.Store, TVar MetricsLocalStore))
-> (responderCtx -> IO ())
-> RunMiniProtocol 'ResponderMode initiatorCtx responderCtx LBS.ByteString IO Void ()
acceptEKGMetricsResp config mkStores peerErrorHandler =
ResponderProtocolOnly $ runPeerWithStores config mkStores peerErrorHandler

acceptEKGMetricsInit
:: AcceptorConfiguration
-> IO (EKG.Store, TVar MetricsLocalStore)
-> IO ()
-> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
-> (initiatorCtx -> IO (EKG.Store, TVar MetricsLocalStore))
-> (initiatorCtx -> IO ())
-> RunMiniProtocol 'InitiatorMode initiatorCtx responderCtx LBS.ByteString IO () Void
acceptEKGMetricsInit config mkStores peerErrorHandler =
InitiatorProtocolOnly $ runPeerWithStores config mkStores peerErrorHandler

runPeerWithStores
:: AcceptorConfiguration
-> IO (EKG.Store, TVar MetricsLocalStore)
-> IO ()
-> MuxPeer LBS.ByteString IO ()
-> (ctx -> IO (EKG.Store, TVar MetricsLocalStore))
-> (ctx -> IO ())
-> MiniProtocolCb ctx LBS.ByteString IO ()
runPeerWithStores config mkStores peerErrorHandler =
MuxPeerRaw $ \channel -> do
(ekgStore, metricsStore) <- mkStores
MiniProtocolCb $ \ctx channel -> do
(ekgStore, metricsStore) <- mkStores ctx
runPeer
(acceptorTracer config)
(Acceptor.codecEKGForward CBOR.encode CBOR.decode
CBOR.encode CBOR.decode)
channel
(Acceptor.ekgAcceptorPeer $ acceptorActions True config ekgStore metricsStore)
`finally` peerErrorHandler
`finally` peerErrorHandler ctx

acceptorActions
:: Bool
Expand Down
45 changes: 27 additions & 18 deletions src/System/Metrics/Network/Forwarder.hs
Expand Up @@ -18,11 +18,13 @@ import qualified Data.ByteString.Lazy as LBS
import qualified Data.Text as T
import Data.Void (Void)
import qualified Network.Socket as Socket
import Ouroboros.Network.Context (MinimalInitiatorContext, ResponderContext)
import Ouroboros.Network.Driver.Simple (runPeer)
import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits)
import Ouroboros.Network.IOManager (withIOManager)
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..),
MiniProtocolNum (..), MuxMode (..),
OuroborosApplication (..), MuxPeer (..),
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..),
MiniProtocolLimits (..), MiniProtocolNum (..),
MuxMode (..), OuroborosApplication (..),
RunMiniProtocol (..),
miniProtocolLimits, miniProtocolNum, miniProtocolRun)
import Ouroboros.Network.Protocol.Handshake.Codec (noTimeLimitsHandshake,
Expand Down Expand Up @@ -67,7 +69,10 @@ doConnectToAcceptor
-> (fd -> IO ()) -- ^ configure socket
-> addr
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> OuroborosApplication 'InitiatorMode addr LBS.ByteString IO () Void
-> OuroborosApplication 'InitiatorMode
(MinimalInitiatorContext addr)
(ResponderContext addr)
LBS.ByteString IO () Void
-> IO ()
doConnectToAcceptor snocket makeBearer configureSocket address timeLimits app =
connectToNode
Expand All @@ -89,9 +94,9 @@ doConnectToAcceptor snocket makeBearer configureSocket address timeLimits app =
forwarderApp
:: ForwarderConfiguration
-> EKG.Store
-> OuroborosApplication 'InitiatorMode addr LBS.ByteString IO () Void
-> OuroborosApplication 'InitiatorMode initiatorCtx responderCtx LBS.ByteString IO () Void
forwarderApp config ekgStore =
OuroborosApplication $ \_connectionId _shouldStopSTM ->
OuroborosApplication
[ MiniProtocol
{ miniProtocolNum = MiniProtocolNum 2
, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound }
Expand All @@ -102,43 +107,47 @@ forwarderApp config ekgStore =
forwardEKGMetrics
:: ForwarderConfiguration
-> EKG.Store
-> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
-> RunMiniProtocol 'InitiatorMode initiatorCtx responderCtx LBS.ByteString IO () Void
forwardEKGMetrics config ekgStore =
InitiatorProtocolOnly $
MuxPeer
InitiatorProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
runPeer
(forwarderTracer config)
(Forwarder.codecEKGForward CBOR.encode CBOR.decode
CBOR.encode CBOR.decode)
channel
(Forwarder.ekgForwarderPeer $ mkResponse config ekgStore)

forwardEKGMetricsResp
:: ForwarderConfiguration
-> EKG.Store
-> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void ()
-> RunMiniProtocol 'ResponderMode initiatorCtx responderCtx LBS.ByteString IO Void ()
forwardEKGMetricsResp config ekgStore =
ResponderProtocolOnly $
MuxPeer
ResponderProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
runPeer
(forwarderTracer config)
(Forwarder.codecEKGForward CBOR.encode CBOR.decode
CBOR.encode CBOR.decode)
channel
(Forwarder.ekgForwarderPeer $ mkResponse config ekgStore)

forwardEKGMetricsDummy
:: RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
:: RunMiniProtocol 'InitiatorMode initiatorCtx responderCtx LBS.ByteString IO () Void
forwardEKGMetricsDummy =
InitiatorProtocolOnly $
MuxPeer
InitiatorProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
runPeer
nullTracer
(Forwarder.codecEKGForward CBOR.encode CBOR.decode
CBOR.encode CBOR.decode)
channel
(Forwarder.ekgForwarderPeer mkResponseDummy)

forwardEKGMetricsRespDummy
:: RunMiniProtocol 'ResponderMode LBS.ByteString IO Void ()
:: RunMiniProtocol 'ResponderMode initiatorCtx responderCtx LBS.ByteString IO Void ()
forwardEKGMetricsRespDummy =
ResponderProtocolOnly $
MuxPeer
ResponderProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
runPeer
nullTracer
(Forwarder.codecEKGForward CBOR.encode CBOR.decode
CBOR.encode CBOR.decode)
channel
(Forwarder.ekgForwarderPeer mkResponseDummy)

0 comments on commit 61d6e73

Please sign in to comment.