Skip to content

Commit

Permalink
Remove persistence from withReliability
Browse files Browse the repository at this point in the history
  • Loading branch information
locallycompact committed May 7, 2024
1 parent 45292ad commit 7ea77db
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 47 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jobs:
strategy:
matrix:
target: [ hydra-node, hydra-tui, hydraw, hydra-explorer ]
system: [ x86_64-linux, aarch64-linux ]

runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -49,7 +50,7 @@ jobs:
run: |
IMAGE_NAME=ghcr.io/${{github.repository_owner}}/${{matrix.target}}
echo "IMAGE_NAME=${IMAGE_NAME}" >> $GITHUB_ENV
nix build .#docker-${{ matrix.target }} && docker load < ./result
nix build .#packages.${{matrix.system}}.docker-${{ matrix.target }} && docker load < ./result
# Determine whether we are building a tag and if yes, set a VERSION_NAME
BUILDING_TAG=${{github.ref_type == 'tag'}}
[[ ${BUILDING_TAG} = true ]] && \
Expand Down
16 changes: 5 additions & 11 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
-- become a problem, this can be mitigated by closing and reopening a head.
module Hydra.Network.Reliability where

import Hydra.Prelude hiding (empty, fromList, length, replicate, zipWith)
import Hydra.Prelude hiding (empty, fromList, replicate, zipWith)

import Cardano.Binary (serialize')
import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation))
Expand All @@ -95,13 +95,12 @@ import Control.Concurrent.Class.MonadSTM (
import Control.Tracer (Tracer)
import Data.IntMap qualified as IMap
import Data.Sequence.Strict ((|>))
import Data.Sequence.Strict qualified as Seq
import Data.Vector qualified as Vector
import Data.Vector (
Vector,
elemIndex,
fromList,
generate,
length,
replicate,
zipWith,
(!?),
Expand Down Expand Up @@ -216,18 +215,16 @@ withReliability ::
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
-- | Tracer for logging messages.
Tracer m ReliabilityLog ->
-- | Our persistence handle
MessagePersistence m outbound ->
-- | Our own party identifier.
Party ->
-- | Other parties' identifiers.
[Party] ->
-- | Underlying network component providing consuming and sending channels.
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a ->
NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do
acksCache <- loadAcks >>= newTVarIO
sentMessages <- loadMessages >>= newTVarIO . Seq.fromList
withReliability tracer me otherParties withRawNetwork callback action = do
acksCache <- newTVarIO (Vector.replicate (length (me : otherParties)) 0)
sentMessages <- newTVarIO mempty
resendQ <- newTQueueIO
let ourIndex = fromMaybe (error "This cannot happen because we constructed the list with our party inside.") (findPartyIndex me)
let resend = writeTQueue resendQ
Expand All @@ -243,13 +240,10 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa
case msg of
Data{} -> do
localCounter <- atomically $ cacheMessage msg >> incrementAckCounter
saveAcks localCounter
appendMessage msg
traceWith tracer BroadcastCounter{ourIndex, localCounter}
broadcast $ ReliableMsg localCounter msg
Ping{} -> do
localCounter <- readTVarIO acksCache
saveAcks localCounter
traceWith tracer BroadcastPing{ourIndex, localCounter}
broadcast $ ReliableMsg localCounter msg
}
Expand Down
6 changes: 2 additions & 4 deletions hydra-node/src/Hydra/Node/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,17 @@ withNetwork ::
withNetwork tracer configuration callback action = do
let localhost = Host{hostname = show host, port}
me = deriveParty signingKey
numberOfParties = length $ me : otherParties
messagePersistence <- configureMessagePersistence (contramap Node tracer) persistenceDir numberOfParties

let reliability =
withFlipHeartbeats $
withReliability (contramap Reliability tracer) messagePersistence me otherParties $
withReliability (contramap Reliability tracer) me otherParties $
withAuthentication (contramap Authentication tracer) signingKey otherParties $
withOuroborosNetwork (contramap Network tracer) localhost peers

withHeartbeat nodeId reliability (callback . mapHeartbeat) $ \network ->
action network
where
NetworkConfiguration{persistenceDir, signingKey, otherParties, host, port, peers, nodeId} = configuration
NetworkConfiguration{signingKey, otherParties, host, port, peers, nodeId} = configuration

mapHeartbeat :: Either Connectivity (Authenticated (Message tx)) -> NetworkEvent (Message tx)
mapHeartbeat = \case
Expand Down
36 changes: 5 additions & 31 deletions hydra-node/test/Hydra/Network/ReliabilitySpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ import Hydra.Network.Message (Connectivity)
import Hydra.Network.Reliability (MessagePersistence (..), ReliabilityLog (..), ReliableMsg (..), withReliability)
import Hydra.Node.Network (withFlipHeartbeats)
import Hydra.Persistence (
Persistence (..),
PersistenceIncremental (..),
createPersistence,
createPersistenceIncremental,
)
import System.Directory (doesFileExist)
Expand Down Expand Up @@ -99,9 +97,8 @@ spec = parallel $ do
prop "broadcast messages to the network assigning a sequential id" $ \(messages :: [String]) ->
let sentMsgs = runSimOrThrow $ do
sentMessages <- newTVarIO empty
persistence <- mockMessagePersistence 1

withReliability nullTracer persistence alice [] (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do
withReliability nullTracer alice [] (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do
mapM_ (broadcast . Data "node-1") messages

fromList . Vector.toList <$> readTVarIO sentMessages
Expand All @@ -119,16 +116,14 @@ spec = parallel $ do
randomSeed <- newTVarIO $ mkStdGen seed
aliceToBob <- newTQueueIO
bobToAlice <- newTQueueIO
alicePersistence <- mockMessagePersistence 2
bobPersistence <- mockMessagePersistence 2
let
-- this is a NetworkComponent that broadcasts authenticated messages
-- mediated through a read and a write TQueue but drops 0.2 % of them
aliceFailingNetwork = failingNetwork randomSeed alice (bobToAlice, aliceToBob)
bobFailingNetwork = failingNetwork randomSeed bob (aliceToBob, bobToAlice)

bobReliabilityStack = reliabilityStack bobPersistence bobFailingNetwork (captureTraces emittedTraces) "bob" bob [alice]
aliceReliabilityStack = reliabilityStack alicePersistence aliceFailingNetwork (captureTraces emittedTraces) "alice" alice [bob]
bobReliabilityStack = reliabilityStack bobFailingNetwork (captureTraces emittedTraces) "bob" bob [alice]
aliceReliabilityStack = reliabilityStack aliceFailingNetwork (captureTraces emittedTraces) "alice" alice [bob]

runAlice = runPeer aliceReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByBob aliceToBobMessages bobToAliceMessages
runBob = runPeer bobReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages
Expand All @@ -150,10 +145,8 @@ spec = parallel $ do
it "broadcast updates counter from peers" $ do
let receivedMsgs = runSimOrThrow $ do
sentMessages <- newTVarIO empty
alicePersistence <- mockMessagePersistence 2
withReliability
nullTracer
alicePersistence
alice
[bob]
( \incoming action -> do
Expand All @@ -173,26 +166,10 @@ spec = parallel $ do
withTempDir "network-messages-persistence" $ \tmpDir -> do
let networkMessagesFile = tmpDir <> "/network-messages"

Persistence{load, save} <- createPersistence $ tmpDir <> "/acks"
PersistenceIncremental{loadAll, append} <- createPersistenceIncremental networkMessagesFile

let messagePersistence =
MessagePersistence
{ loadAcks = do
mloaded <- load
case mloaded of
Nothing -> pure $ replicate (length [alice, bob]) 0
Just acks -> pure acks
, saveAcks = save
, loadMessages = loadAll
, appendMessage = append
}

receivedMsgs <- do
sentMessages <- newTVarIO empty
withReliability
nullTracer
messagePersistence
alice
[bob]
( \incoming action -> do
Expand All @@ -212,7 +189,6 @@ spec = parallel $ do
reloadAll networkMessagesFile `shouldReturn` [Data "node-1" msg]

doesFileExist (tmpDir </> "acks") `shouldReturn` True
load `shouldReturn` Just (fromList [1, 1])
where
runPeer reliability partyName receivedMessageContainer sentMessageContainer messagesToSend expectedMessages =
reliability (capturePayload receivedMessageContainer) $ \Network{broadcast} -> do
Expand All @@ -224,10 +200,10 @@ spec = parallel $ do
(waitForAllMessages expectedMessages receivedMessageContainer)
(waitForAllMessages messagesToSend sentMessageContainer)

reliabilityStack persistence underlyingNetwork tracer nodeId party peers =
reliabilityStack underlyingNetwork tracer nodeId party peers =
withHeartbeat nodeId $
withFlipHeartbeats $
withReliability tracer persistence party peers underlyingNetwork
withReliability tracer party peers underlyingNetwork

failingNetwork seed peer (readQueue, writeQueue) callback action =
withAsync
Expand Down Expand Up @@ -260,14 +236,12 @@ noop = const $ pure ()
aliceReceivesMessages :: [Authenticated (ReliableMsg (Heartbeat msg))] -> [Authenticated (Heartbeat msg)]
aliceReceivesMessages messages = runSimOrThrow $ do
receivedMessages <- newTVarIO empty
alicePersistence <- mockMessagePersistence 3

let baseNetwork incoming _ = mapM incoming messages

aliceReliabilityStack =
withReliability
nullTracer
alicePersistence
alice
[bob, carol]
baseNetwork
Expand Down

0 comments on commit 7ea77db

Please sign in to comment.