Skip to content

Commit

Permalink
Swap UnversionedProtocol for HydraVersionedProtocol
Browse files Browse the repository at this point in the history
  • Loading branch information
locallycompact committed Apr 29, 2024
1 parent ba8e9b3 commit 1ea5d34
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 44 deletions.
1 change: 1 addition & 0 deletions hydra-node/hydra-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ library
Hydra.Network.Ouroboros.Client
Hydra.Network.Ouroboros.Server
Hydra.Network.Ouroboros.Type
Hydra.Network.Ouroboros.VersionedProtocol
Hydra.Network.Reliability
Hydra.Node
Hydra.Node.InputQueue
Expand Down
104 changes: 72 additions & 32 deletions hydra-node/src/Hydra/Network/Ouroboros.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@
-- This implements a dumb 'FireForget' protocol and maintains one connection to each peer.
-- Contrary to other protocols implemented in Ouroboros, this is a push-based protocol.
module Hydra.Network.Ouroboros (
withOuroborosNetwork,
withIOManager,
TraceOuroborosNetwork,
WithHost,
module Hydra.Network,
encodeTraceSendRecvFireForget,
module Hydra.Network.Ouroboros,
module Hydra.Network.Ouroboros.VersionedProtocol,
) where

import Control.Monad.Class.MonadAsync (wait)
import Hydra.Network.Ouroboros.VersionedProtocol (
HydraHandshakeRefused (..),
HydraNetworkConfig (..),
HydraVersionedProtocolData (..),
HydraVersionedProtocolNumber (..),
KnownHydraVersions (..),
hydraVersionedProtocolCodec,
hydraVersionedProtocolDataCodec,
)
import Hydra.Prelude

import Codec.CBOR.Term (Term)
Expand All @@ -27,7 +34,8 @@ import Data.Aeson (object, withObject, (.:), (.=))
import Data.Aeson qualified as Aeson
import Data.Aeson.Types qualified as Aeson
import Data.Map.Strict as Map
import Hydra.Logging (Tracer, nullTracer)
import Data.Text qualified as T
import Hydra.Logging (Tracer (..), nullTracer)
import Hydra.Network (
Host (..),
Network (..),
Expand All @@ -51,7 +59,16 @@ import Hydra.Network.Ouroboros.Type (
import Network.Mux.Compat (
WithMuxBearer (..),
)
import Network.Socket (AddrInfo (addrAddress), SockAddr, Socket, defaultHints, getAddrInfo)
import Network.Socket (
AddrInfo (addrAddress),
NameInfoFlag (..),
SockAddr,
Socket,
defaultHints,
getAddrInfo,
getNameInfo,
getPeerName,
)
import Network.TypedProtocol.Codec (
AnyMessageAndAgency (..),
)
Expand Down Expand Up @@ -81,15 +98,9 @@ import Ouroboros.Network.Mux (
RunMiniProtocol (..),
mkMiniProtocolCbFromPeer,
)
import Ouroboros.Network.Protocol.Handshake.Codec (noTimeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Type (Handshake, Message (..), RefuseReason (..))
import Ouroboros.Network.Protocol.Handshake.Unversioned (
UnversionedProtocol,
unversionedHandshakeCodec,
unversionedProtocol,
unversionedProtocolDataCodec,
)
import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, queryVersion)
import Ouroboros.Network.Protocol.Handshake.Codec (codecHandshake, noTimeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Type (Handshake, HandshakeProtocolError (..), Message (..), RefuseReason (..))
import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, queryVersion, simpleSingletonVersions)
import Ouroboros.Network.Server.Socket (AcceptedConnectionsLimit (AcceptedConnectionsLimit))
import Ouroboros.Network.Snocket (makeSocketBearer, socketSnocket)
import Ouroboros.Network.Socket (
Expand Down Expand Up @@ -117,10 +128,10 @@ withOuroborosNetwork ::
(ToCBOR outbound, FromCBOR outbound) =>
(ToCBOR inbound, FromCBOR inbound) =>
Tracer IO (WithHost (TraceOuroborosNetwork outbound)) ->
Host ->
[Host] ->
HydraNetworkConfig ->
(HydraHandshakeRefused -> IO ()) ->
NetworkComponent IO inbound outbound ()
withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
withOuroborosNetwork tracer HydraNetworkConfig{protocolVersion, localHost, remoteHosts} handshakeCallback networkCallback between = do
bchan <- newBroadcastTChanIO
let newBroadcastChannel = atomically $ dupTChan bchan
-- NOTE: There should only be one `IOManager` instance per process. Should we
Expand All @@ -141,6 +152,14 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
(info : _) -> pure $ addrAddress info
_ -> error "getAdrrInfo failed.. do proper error handling"

getHost :: SockAddr -> IO Host
getHost sockAddr = do
(mHost, mPort) <- getNameInfo [NI_NUMERICHOST, NI_NUMERICSERV] True True sockAddr
maybe (error "getNameInfo failed.. do proper error handling") pure $ do
host <- T.pack <$> mHost
port <- readMaybe =<< mPort
pure $ Host host port

connect ::
IOManager ->
IO t ->
Expand All @@ -167,7 +186,28 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
(contramap (WithHost localHost . TraceErrorPolicy) tracer)
networkState
(subscriptionParams localAddr remoteAddrs)
(actualConnect iomgr newBroadcastChannel app)
( \sock ->
actualConnect iomgr newBroadcastChannel app sock `catch` \e -> do
host <- getHost =<< getPeerName sock
onHandshakeError host e
)

onHandshakeError :: Host -> HandshakeProtocolError HydraVersionedProtocolNumber -> IO ()
onHandshakeError remoteHost = \case
HandshakeError (VersionMismatch theirVersions _) -> do
handshakeCallback
HydraHandshakeRefused
{ ourVersion = protocolVersion
, theirVersions = KnownHydraVersions theirVersions
, remoteHost
}
_ ->
handshakeCallback
HydraHandshakeRefused
{ ourVersion = protocolVersion
, theirVersions = NoKnownHydraVersions
, remoteHost
}

subscriptionParams ::
SockAddr ->
Expand All @@ -191,19 +231,19 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
chan <- newBroadcastChannel
connectToNodeSocket
iomgr
unversionedHandshakeCodec
(codecHandshake hydraVersionedProtocolCodec)
noTimeLimitsHandshake
unversionedProtocolDataCodec
hydraVersionedProtocolDataCodec
networkConnectTracers
(HandshakeCallbacks acceptableVersion queryVersion)
(unversionedProtocol (app chan))
(simpleSingletonVersions protocolVersion MkHydraVersionedProtocolData (app chan))
sn
where
networkConnectTracers :: NetworkConnectTracers a v
networkConnectTracers :: NetworkConnectTracers SockAddr HydraVersionedProtocolNumber
networkConnectTracers =
NetworkConnectTracers
{ nctMuxTracer = nullTracer
, nctHandshakeTracer = nullTracer
, nctHandshakeTracer = contramap (WithHost localHost . TraceHandshake) tracer
}

withServerListening ::
Expand All @@ -224,23 +264,23 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
networkState
(AcceptedConnectionsLimit maxBound maxBound 0)
localAddr
unversionedHandshakeCodec
(codecHandshake hydraVersionedProtocolCodec)
noTimeLimitsHandshake
unversionedProtocolDataCodec
hydraVersionedProtocolDataCodec
(HandshakeCallbacks acceptableVersion queryVersion)
(unversionedProtocol (SomeResponderApplication app))
(simpleSingletonVersions protocolVersion MkHydraVersionedProtocolData (SomeResponderApplication app))
nullErrorPolicies
$ \_addr serverAsync -> do
race_ (wait serverAsync) continuation
where
notConfigureSocket :: a -> b -> IO ()
notConfigureSocket _ _ = pure ()

networkServerTracers :: NetworkServerTracers SockAddr v
networkServerTracers :: NetworkServerTracers SockAddr HydraVersionedProtocolNumber
networkServerTracers =
NetworkServerTracers
{ nstMuxTracer = nullTracer
, nstHandshakeTracer = nullTracer
, nstHandshakeTracer = contramap (WithHost localHost . TraceHandshake) tracer
, nstErrorPolicyTracer = contramap (WithHost localHost . TraceErrorPolicy) tracer
, nstAcceptPolicyTracer = contramap (WithHost localHost . TraceAcceptPolicy) tracer
}
Expand Down Expand Up @@ -335,7 +375,7 @@ data TraceOuroborosNetwork msg
= TraceSubscriptions (WithIPList (SubscriptionTrace SockAddr))
| TraceErrorPolicy (WithAddr SockAddr ErrorPolicyTrace)
| TraceAcceptPolicy AcceptConnectionsPolicyTrace
| TraceHandshake (WithMuxBearer (ConnectionId SockAddr) (TraceSendRecv (Handshake UnversionedProtocol CBOR.Term)))
| TraceHandshake (WithMuxBearer (ConnectionId SockAddr) (TraceSendRecv (Handshake HydraVersionedProtocolNumber CBOR.Term)))
| TraceSendRecv (TraceSendRecv (FireForget msg))

-- NOTE: cardano-node would have orphan ToObject instances for most of these
Expand Down Expand Up @@ -374,7 +414,7 @@ encodeWithAddr (WithAddr addr ev) =
]

encodeTraceSendRecvHandshake ::
WithMuxBearer (ConnectionId SockAddr) (TraceSendRecv (Handshake UnversionedProtocol CBOR.Term)) ->
WithMuxBearer (ConnectionId SockAddr) (TraceSendRecv (Handshake HydraVersionedProtocolNumber CBOR.Term)) ->
[Aeson.Pair]
encodeTraceSendRecvHandshake = \case
WithMuxBearer peerId (TraceSendMsg (AnyMessageAndAgency agency msg)) ->
Expand All @@ -391,7 +431,7 @@ encodeTraceSendRecvHandshake = \case
++ encodeMsg msg
where
encodeMsg ::
Message (Handshake UnversionedProtocol Term) from to ->
Message (Handshake HydraVersionedProtocolNumber Term) from to ->
[Aeson.Pair]
encodeMsg = \case
MsgProposeVersions versions ->
Expand Down
75 changes: 75 additions & 0 deletions hydra-node/src/Hydra/Network/Ouroboros/VersionedProtocol.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
module Hydra.Network.Ouroboros.VersionedProtocol where

import Hydra.Prelude

import Codec.CBOR.Term qualified as CBOR
import Data.Text qualified as T
import Hydra.Network (Host (..))
import Network.TypedProtocol.Pipelined ()
import Ouroboros.Network.CodecCBORTerm (CodecCBORTerm (..))
import Ouroboros.Network.Protocol.Handshake.Codec (VersionDataCodec, cborTermVersionDataCodec)
import Ouroboros.Network.Protocol.Handshake.Version (Accept (..), Acceptable, Queryable, acceptableVersion, queryVersion)

hydraVersionedProtocolCodec :: CodecCBORTerm (String, Maybe Int) HydraVersionedProtocolNumber
hydraVersionedProtocolCodec = CodecCBORTerm{encodeTerm, decodeTerm}
where
encodeTerm :: HydraVersionedProtocolNumber -> CBOR.Term
encodeTerm x = CBOR.TInt $ hydraVersionedProtocolNumber x

decodeTerm :: CBOR.Term -> Either (String, Maybe Int) HydraVersionedProtocolNumber
decodeTerm (CBOR.TInt x) = Right $ MkHydraVersionedProtocolNumber x
decodeTerm _ = Left ("unknown tag", Nothing)

type HydraVersionedProtocolNumber :: Type
newtype HydraVersionedProtocolNumber = MkHydraVersionedProtocolNumber {hydraVersionedProtocolNumber :: Int}
deriving stock (Eq, Show, Generic, Ord)

type HydraVersionedProtocolData :: Type
data HydraVersionedProtocolData = MkHydraVersionedProtocolData
deriving stock (Eq, Show, Generic, Ord)

instance Acceptable HydraVersionedProtocolData where
acceptableVersion
MkHydraVersionedProtocolData
MkHydraVersionedProtocolData = Accept MkHydraVersionedProtocolData

instance Queryable HydraVersionedProtocolData where
queryVersion MkHydraVersionedProtocolData = False

hydraVersionedProtocolDataCodec ::
VersionDataCodec
CBOR.Term
HydraVersionedProtocolNumber
HydraVersionedProtocolData
hydraVersionedProtocolDataCodec =
cborTermVersionDataCodec
(const CodecCBORTerm{encodeTerm, decodeTerm})
where
encodeTerm :: HydraVersionedProtocolData -> CBOR.Term
encodeTerm MkHydraVersionedProtocolData = CBOR.TNull

decodeTerm :: CBOR.Term -> Either Text HydraVersionedProtocolData
decodeTerm CBOR.TNull = Right MkHydraVersionedProtocolData
decodeTerm t = Left $ T.pack $ "unexpected term: " ++ show t

type HydraNetworkConfig :: Type
data HydraNetworkConfig = HydraNetworkConfig
{ protocolVersion :: HydraVersionedProtocolNumber
, localHost :: Host
, remoteHosts :: [Host]
}
deriving stock (Eq, Show, Generic)

type KnownHydraVersions :: Type
data KnownHydraVersions
= KnownHydraVersions {fromKnownHydraVersions :: [HydraVersionedProtocolNumber]}
| NoKnownHydraVersions
deriving stock (Eq, Show, Generic)

type HydraHandshakeRefused :: Type
data HydraHandshakeRefused = HydraHandshakeRefused
{ remoteHost :: Host
, ourVersion :: HydraVersionedProtocolNumber
, theirVersions :: KnownHydraVersions
}
deriving stock (Eq, Show, Generic)
9 changes: 5 additions & 4 deletions hydra-node/src/Hydra/Node/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ import Hydra.Network (Host (..), IP, NetworkComponent, NodeId, PortNumber)
import Hydra.Network.Authenticate (Authenticated (..), Signed, withAuthentication)
import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat)
import Hydra.Network.Message (Connectivity, Message, NetworkEvent (..))
import Hydra.Network.Ouroboros (TraceOuroborosNetwork, WithHost, withOuroborosNetwork)
import Hydra.Network.Ouroboros (HydraNetworkConfig (..), HydraVersionedProtocolNumber, TraceOuroborosNetwork, WithHost, withOuroborosNetwork)
import Hydra.Network.Reliability (MessagePersistence, ReliableMsg, mkMessagePersistence, withReliability)
import Hydra.Node (HydraNodeLog (..))
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
Expand Down Expand Up @@ -116,12 +116,13 @@ withNetwork ::
IsTx tx =>
-- | Tracer to use for logging messages.
Tracer IO (LogEntry tx (Message tx)) ->
HydraVersionedProtocolNumber ->
-- | The network configuration
NetworkConfiguration IO ->
-- | Produces a `NetworkComponent` that can send `msg` and consumes `Authenticated` @msg@.
NetworkComponent IO (NetworkEvent (Message tx)) (Message tx) ()
withNetwork tracer configuration callback action = do
let localhost = Host{hostname = show host, port}
withNetwork tracer pVersion configuration callback action = do
let localHost = Host{hostname = show host, port}
me = deriveParty signingKey
numberOfParties = length $ me : otherParties
messagePersistence <- configureMessagePersistence (contramap Node tracer) persistenceDir numberOfParties
Expand All @@ -130,7 +131,7 @@ withNetwork tracer configuration callback action = do
withFlipHeartbeats $
withReliability (contramap Reliability tracer) messagePersistence me otherParties $
withAuthentication (contramap Authentication tracer) signingKey otherParties $
withOuroborosNetwork (contramap Network tracer) localhost peers
withOuroborosNetwork (contramap Network tracer) HydraNetworkConfig{protocolVersion = pVersion, localHost, remoteHosts = peers} (const $ pure ())

withHeartbeat nodeId reliability (callback . mapHeartbeat) $ \network ->
action network
Expand Down
6 changes: 5 additions & 1 deletion hydra-node/src/Hydra/Node/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Hydra.Ledger.Cardano.Configuration (
import Hydra.Logging (Verbosity (..), traceWith, withTracer)
import Hydra.Logging.Messages (HydraLog (..))
import Hydra.Logging.Monitoring (withMonitoring)
import Hydra.Network.Ouroboros (HydraVersionedProtocolNumber (..))
import Hydra.Node (
chainStateHistory,
connect,
Expand Down Expand Up @@ -60,6 +61,9 @@ instance Exception ConfigurationException where
ConfigurationException err ->
"Incorrect protocol parameters configuration provided: " <> show err

currentHydraVersionedProtocol :: HydraVersionedProtocolNumber
currentHydraVersionedProtocol = MkHydraVersionedProtocolNumber 0

run :: RunOptions -> IO ()
run opts = do
either (throwIO . InvalidOptionException) pure $ validateRunOptions opts
Expand Down Expand Up @@ -91,7 +95,7 @@ run opts = do
withAPIServer apiHost apiPort party apiPersistence (contramap APIServer tracer) chain pparams (wireClientInput wetHydraNode) $ \server -> do
-- Network
let networkConfiguration = NetworkConfiguration{persistenceDir, signingKey, otherParties, host, port, peers, nodeId}
withNetwork tracer networkConfiguration (wireNetworkInput wetHydraNode) $ \network -> do
withNetwork tracer currentHydraVersionedProtocol networkConfiguration (wireNetworkInput wetHydraNode) $ \network -> do
-- Main loop
connect chain network server wetHydraNode
>>= runHydraNode
Expand Down

0 comments on commit 1ea5d34

Please sign in to comment.