Skip to content

Commit

Permalink
Add NetworkEvent type to distinguish between connectivity events and …
Browse files Browse the repository at this point in the history
…received messages on the network.

Co-authored by Franco Testagrossa
  • Loading branch information
locallycompact committed Apr 29, 2024
1 parent 4bbbe28 commit 61e81a2
Show file tree
Hide file tree
Showing 18 changed files with 291 additions and 185 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ changes.

- _DEPRECATED_ the `GetUTxO` client input and `GetUTxOResponse` server output. Use `GET /snapshot/utxo` instead.

- `hydra-node` logs will now report `NetworkEvents` to distinguish between `ConnectivityEvent`s and `ReceivedMessage`s on the network.

## [0.17.0] - UNRELEASED

- **BREAKING** `hydra-node` `/commit` enpoint now also accepts a _blueprint/draft_
Expand Down
63 changes: 58 additions & 5 deletions hydra-node/json-schemas/logs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1579,19 +1579,16 @@ definitions:
- tag
- ttl
- message
- party
description: >-
Input representing some message received from peers on the network.
properties:
tag:
type: string
enum: ["NetworkInput"]
message:
$ref: "logs.yaml#/definitions/Message"
networkEvent:
$ref: "logs.yaml#/definitions/NetworkEvent"
ttl:
type: number
party:
$ref: "api.yaml#/components/schemas/Party"
- title: ChainInput
type: object
additionalProperties: false
Expand All @@ -1608,6 +1605,62 @@ definitions:
chainEvent:
$ref: "logs.yaml#/definitions/ChainEvent"

NetworkEvent:
oneOf:
- title: ConnectivityEvent
type: object
additionalProperties: false
required:
- tag
- contents
properties:
tag:
type: string
enum: ["ConnectivityEvent"]
contents:
oneOf:
- title: Connected
type: object
additionalProperties: false
required:
- tag
- nodeId
properties:
tag:
type: string
enum: ["Connected"]
nodeId:
type: string

- title: Disconnected
type: object
additionalProperties: false
required:
- tag
- nodeId
properties:
tag:
type: string
enum: ["Disconnected"]
nodeId:
type: string

- title: ReceivedMessage
type: object
additionalProperties: false
required:
- tag
- sender
- msg
properties:
tag:
type: string
enum: ["ReceivedMessage"]
sender:
$ref: "api.yaml#/components/schemas/Party"
msg:
$ref: "logs.yaml#/definitions/Message"

Message:
description: >-
Messages exchanged by Hydra network peers over a broadcasting network.
Expand Down
21 changes: 15 additions & 6 deletions hydra-node/src/Hydra/HeadLogic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,21 @@ import Hydra.Ledger (
applyTransactions,
txId,
)
import Hydra.Network.Message (Message (..))
import Hydra.Network.Message (Connectivity (..), Message (..), NetworkEvent (..))
import Hydra.OnChainId (OnChainId)
import Hydra.Party (Party (vkey))
import Hydra.Snapshot (ConfirmedSnapshot (..), Snapshot (..), SnapshotNumber, getSnapshot)

defaultTTL :: TTL
defaultTTL = 5

onConnectionEvent :: Connectivity -> Outcome tx
onConnectionEvent = \case
Connected{nodeId} ->
causes [ClientEffect (ServerOutput.PeerConnected nodeId)]
Disconnected{nodeId} ->
causes [ClientEffect (ServerOutput.PeerDisconnected nodeId)]

-- * The Coordinated Head protocol

-- ** On-Chain Protocol
Expand Down Expand Up @@ -691,6 +698,8 @@ update ::
Input tx ->
Outcome tx
update env ledger st ev = case (st, ev) of
(_, NetworkInput _ (ConnectivityEvent conn)) ->
onConnectionEvent conn
(Idle _, ClientInput Init) ->
onIdleClientInit env
(Idle _, ChainInput Observation{observedTx = OnInitTx{headId, headSeed, headParameters, participants}, newChainState}) ->
Expand All @@ -713,14 +722,14 @@ update env ledger st ev = case (st, ev) of
onOpenClientClose openState
(Open{}, ClientInput (NewTx tx)) ->
onOpenClientNewTx tx
(Open openState, NetworkInput ttl _ (ReqTx tx)) ->
(Open openState, NetworkInput ttl (ReceivedMessage{msg = ReqTx tx})) ->
onOpenNetworkReqTx env ledger openState ttl tx
(Open openState, NetworkInput _ otherParty (ReqSn sn txIds)) ->
(Open openState, NetworkInput _ (ReceivedMessage{sender, msg = ReqSn sn txIds})) ->
-- XXX: ttl == 0 not handled for ReqSn
onOpenNetworkReqSn env ledger openState otherParty sn txIds
(Open openState, NetworkInput _ otherParty (AckSn snapshotSignature sn)) ->
onOpenNetworkReqSn env ledger openState sender sn txIds
(Open openState, NetworkInput _ (ReceivedMessage{sender, msg = AckSn snapshotSignature sn})) ->
-- XXX: ttl == 0 not handled for AckSn
onOpenNetworkAckSn env openState otherParty snapshotSignature sn
onOpenNetworkAckSn env openState sender snapshotSignature sn
( Open openState@OpenState{headId = ourHeadId}
, ChainInput Observation{observedTx = OnCloseTx{headId, snapshotNumber = closedSnapshotNumber, contestationDeadline}, newChainState}
)
Expand Down
5 changes: 2 additions & 3 deletions hydra-node/src/Hydra/HeadLogic/Input.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import Hydra.Prelude

import Hydra.API.ClientInput (ClientInput)
import Hydra.Chain (ChainEvent, IsChainState)
import Hydra.Network.Message (Message)
import Hydra.Party (Party)
import Hydra.Network.Message (Message, NetworkEvent)

type TTL = Natural

Expand All @@ -19,7 +18,7 @@ data Input tx
--
-- * `ttl` is a simple counter that's decreased every time the event is
-- reenqueued due to a wait. It's default value is `defaultTTL`
NetworkInput {ttl :: TTL, party :: Party, message :: Message tx}
NetworkInput {ttl :: TTL, networkEvent :: NetworkEvent (Message tx)}
| -- | Input received from the chain via a "Hydra.Chain".
ChainInput {chainEvent :: ChainEvent tx}
deriving stock (Generic)
Expand Down
4 changes: 2 additions & 2 deletions hydra-node/src/Hydra/Logging/Monitoring.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import Hydra.HeadLogic (
import Hydra.Ledger (IsTx (TxIdType), txId)
import Hydra.Logging.Messages (HydraLog (..))
import Hydra.Network (PortNumber)
import Hydra.Network.Message (Message (ReqTx))
import Hydra.Network.Message (Message (ReqTx), NetworkEvent (..))
import Hydra.Node (HydraNodeLog (BeginEffect, BeginInput, EndInput, input))
import Hydra.Snapshot (Snapshot (confirmed))
import System.Metrics.Prometheus.Http.Scrape (serveMetrics)
Expand Down Expand Up @@ -89,7 +89,7 @@ monitor ::
HydraLog tx net ->
m ()
monitor transactionsMap metricsMap = \case
(Node BeginInput{input = NetworkInput _ _ (ReqTx tx)}) -> do
(Node BeginInput{input = NetworkInput _ (ReceivedMessage{msg = ReqTx tx})}) -> do
t <- getMonotonicTime
-- NOTE: If a requested transaction never gets confirmed, it might stick
-- forever in the transactions map which could lead to unbounded growth and
Expand Down
29 changes: 13 additions & 16 deletions hydra-node/src/Hydra/Network/Heartbeat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ heartbeatDelay = 0.5
livenessDelay :: DiffTime
livenessDelay = 3

type ConnectionMessages m = Connectivity -> m ()

-- | Wrap a lower-level `NetworkComponent` and handle sending/receiving of heartbeats.
--
-- Note that the type of consumed and sent messages can be different.
Expand All @@ -89,35 +87,34 @@ withHeartbeat ::
) =>
-- | This node's id, used to identify `Heartbeat` messages broadcast to peers.
NodeId ->
-- | Callback listening to peers' status change as computed by the `withIncomingHeartbeat` layer.
ConnectionMessages m ->
-- | Underlying `NetworkComponent` for sending and consuming `Heartbeat` messages.
NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a ->
-- | Returns a network component that can be used to send and consume arbitrary messages.
-- This layer will take care of peeling out/wrapping messages into `Heartbeat`s.
NetworkComponent m inbound outbound a
withHeartbeat nodeId connectionMessages withNetwork callback action = do
NetworkComponent m (Either Connectivity inbound) outbound a
withHeartbeat nodeId withNetwork callback action = do
heartbeat <- newTVarIO initialHeartbeatState
lastSent <- newTVarIO Nothing
withNetwork (updateStateFromIncomingMessages heartbeat connectionMessages callback) $ \network ->
withAsync (checkRemoteParties heartbeat connectionMessages) $ \_ ->
withNetwork (updateStateFromIncomingMessages heartbeat callback) $ \network ->
withAsync (checkRemoteParties heartbeat onConnectivityChanged) $ \_ ->
withAsync (checkHeartbeatState nodeId lastSent network) $ \_ ->
action (updateStateFromOutgoingMessages nodeId lastSent network)
where
onConnectivityChanged = callback . Left

updateStateFromIncomingMessages ::
(MonadSTM m, MonadMonotonicTime m) =>
TVar m HeartbeatState ->
ConnectionMessages m ->
NetworkCallback inbound m ->
NetworkCallback (Either Connectivity inbound) m ->
NetworkCallback (Heartbeat inbound) m
updateStateFromIncomingMessages heartbeatState connectionMessages callback = \case
Data nodeId msg -> notifyAlive nodeId >> callback msg
updateStateFromIncomingMessages heartbeatState callback = \case
Data nodeId msg -> notifyAlive nodeId >> callback (Right msg)
Ping nodeId -> notifyAlive nodeId
where
notifyAlive peer = do
now <- getMonotonicTime
aliveSet <- alive <$> readTVarIO heartbeatState
unless (peer `Map.member` aliveSet) $ connectionMessages (Connected peer)
unless (peer `Map.member` aliveSet) $ callback (Left $ Connected peer)
atomically $
modifyTVar' heartbeatState $ \s ->
s
Expand Down Expand Up @@ -165,14 +162,14 @@ checkRemoteParties ::
, MonadSTM m
) =>
TVar m HeartbeatState ->
ConnectionMessages m ->
(Connectivity -> m ()) ->
m ()
checkRemoteParties heartbeatState connectionMessages =
checkRemoteParties heartbeatState onConnectivity =
forever $ do
threadDelay (heartbeatDelay * 2)
now <- getMonotonicTime
updateSuspected heartbeatState now
>>= mapM_ (connectionMessages . Disconnected)
>>= mapM_ (onConnectivity . Disconnected)

updateSuspected :: MonadSTM m => TVar m HeartbeatState -> Time -> m (Set NodeId)
updateSuspected heartbeatState now =
Expand Down
13 changes: 13 additions & 0 deletions hydra-node/src/Hydra/Network/Message.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,27 @@ import Cardano.Crypto.Util (SignableRepresentation, getSignableRepresentation)
import Hydra.Crypto (Signature)
import Hydra.Ledger (IsTx (TxIdType), UTxOType)
import Hydra.Network (NodeId)
import Hydra.Party (Party)
import Hydra.Snapshot (Snapshot, SnapshotNumber)

data NetworkEvent msg
= ConnectivityEvent Connectivity
| ReceivedMessage {sender :: Party, msg :: msg}
deriving stock (Eq, Show, Generic)
deriving anyclass (ToJSON, FromJSON)

instance Arbitrary msg => Arbitrary (NetworkEvent msg) where
arbitrary = genericArbitrary

data Connectivity
= Connected {nodeId :: NodeId}
| Disconnected {nodeId :: NodeId}
deriving stock (Generic, Eq, Show)
deriving anyclass (ToJSON, FromJSON)

instance Arbitrary Connectivity where
arbitrary = genericArbitrary

data Message tx
= ReqTx {transaction :: tx}
| ReqSn {snapshotNumber :: SnapshotNumber, transactionIds :: [TxIdType tx]}
Expand Down
14 changes: 6 additions & 8 deletions hydra-node/src/Hydra/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ import Hydra.HeadLogic.State (getHeadParameters)
import Hydra.Ledger (Ledger)
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (Network (..))
import Hydra.Network.Authenticate (Authenticated (..))
import Hydra.Network.Message (Message)
import Hydra.Network.Message (Message, NetworkEvent (..))
import Hydra.Node.InputQueue (InputQueue (..), Queued (..), createInputQueue)
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
import Hydra.Options (ChainConfig (..), DirectChainConfig (..), RunOptions (..), defaultContestationPeriod)
Expand Down Expand Up @@ -202,9 +201,8 @@ wireClientInput node = enqueue . ClientInput
where
DraftHydraNode{inputQueue = InputQueue{enqueue}} = node

wireNetworkInput :: DraftHydraNode tx m -> (Authenticated (Message tx) -> m ())
wireNetworkInput node (Authenticated msg otherParty) =
enqueue $ NetworkInput defaultTTL otherParty msg
wireNetworkInput :: DraftHydraNode tx m -> NetworkEvent (Message tx) -> m ()
wireNetworkInput node = enqueue . NetworkInput defaultTTL
where
DraftHydraNode{inputQueue = InputQueue{enqueue}} = node

Expand Down Expand Up @@ -275,8 +273,8 @@ stepHydraNode node = do
where
maybeReenqueue q@Queued{queuedId, queuedItem} =
case queuedItem of
NetworkInput ttl aParty msg
| ttl > 0 -> reenqueue waitDelay q{queuedItem = NetworkInput (ttl - 1) aParty msg}
NetworkInput ttl msg
| ttl > 0 -> reenqueue waitDelay q{queuedItem = NetworkInput (ttl - 1) msg}
_ -> traceWith tracer $ DroppedFromQueue{inputId = queuedId, input = queuedItem}

Environment{party} = env
Expand Down Expand Up @@ -331,7 +329,7 @@ processEffects node tracer inputId effects = do
traceWith tracer $ BeginEffect party inputId effectId effect
case effect of
ClientEffect i -> sendOutput server i
NetworkEffect msg -> broadcast hn msg >> enqueue (NetworkInput defaultTTL party msg)
NetworkEffect msg -> broadcast hn msg >> enqueue (NetworkInput defaultTTL (ReceivedMessage{sender = party, msg}))
OnChainEffect{postChainTx} ->
postTx postChainTx
`catch` \(postTxError :: PostTxError tx) ->
Expand Down
28 changes: 16 additions & 12 deletions hydra-node/src/Hydra/Node/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ import Hydra.Prelude hiding (fromList, replicate)

import Control.Tracer (Tracer)
import Hydra.Crypto (HydraKey, SigningKey)
import Hydra.Ledger (IsTx)
import Hydra.Logging (traceWith)
import Hydra.Logging.Messages (HydraLog (..))
import Hydra.Network (Host (..), IP, NetworkComponent, NodeId, PortNumber)
import Hydra.Network.Authenticate (Authenticated (Authenticated), Signed, withAuthentication)
import Hydra.Network.Heartbeat (ConnectionMessages, Heartbeat (..), withHeartbeat)
import Hydra.Network.Authenticate (Authenticated (..), Signed, withAuthentication)
import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat)
import Hydra.Network.Message (Connectivity, Message, NetworkEvent (..))
import Hydra.Network.Ouroboros (TraceOuroborosNetwork, WithHost, withOuroborosNetwork)
import Hydra.Network.Reliability (MessagePersistence, ReliableMsg, mkMessagePersistence, withReliability)
import Hydra.Node (HydraNodeLog (..))
Expand Down Expand Up @@ -110,34 +112,36 @@ data NetworkConfiguration m = NetworkConfiguration

-- | Starts the network layer of a node, passing configured `Network` to its continuation.
withNetwork ::
forall msg tx.
(ToCBOR msg, ToJSON msg, FromJSON msg, FromCBOR msg) =>
forall tx.
IsTx tx =>
-- | Tracer to use for logging messages.
Tracer IO (LogEntry tx msg) ->
-- | Callback/observer for connectivity changes in peers.
ConnectionMessages IO ->
Tracer IO (LogEntry tx (Message tx)) ->
-- | The network configuration
NetworkConfiguration IO ->
-- | Produces a `NetworkComponent` that can send `msg` and consumes `Authenticated` @msg@.
NetworkComponent IO (Authenticated msg) msg ()
withNetwork tracer connectionMessages configuration callback action = do
NetworkComponent IO (NetworkEvent (Message tx)) (Message tx) ()
withNetwork tracer configuration callback action = do
let localhost = Host{hostname = show host, port}
me = deriveParty signingKey
numberOfParties = length $ me : otherParties
messagePersistence <- configureMessagePersistence (contramap Node tracer) persistenceDir numberOfParties

let reliability :: NetworkComponent IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
reliability =
let reliability =
withFlipHeartbeats $
withReliability (contramap Reliability tracer) messagePersistence me otherParties $
withAuthentication (contramap Authentication tracer) signingKey otherParties $
withOuroborosNetwork (contramap Network tracer) localhost peers

withHeartbeat nodeId connectionMessages reliability callback $ \network ->
withHeartbeat nodeId reliability (callback . mapHeartbeat) $ \network ->
action network
where
NetworkConfiguration{persistenceDir, signingKey, otherParties, host, port, peers, nodeId} = configuration

mapHeartbeat :: Either Connectivity (Authenticated (Message tx)) -> NetworkEvent (Message tx)
mapHeartbeat = \case
Left connectivity -> ConnectivityEvent connectivity
Right (Authenticated{payload, party}) -> ReceivedMessage{sender = party, msg = payload}

-- | Create `MessagePersistence` handle to be used by `Reliability` network layer.
--
-- This function will `throw` a `ParameterMismatch` exception if:
Expand Down

0 comments on commit 61e81a2

Please sign in to comment.