Skip to content

Commit

Permalink
Fix NotReleasedConnections multinode_Sim bug
Browse files Browse the repository at this point in the history
The issue was noTimeoutHandshake was being used and there was a thread
 hanging forever waiting on timeout.

Refactor handshakeTimeLimits to top-level:

- This allows to decide when to pass timeLimitsHandhsake or
  noTimeLimitsHandshake, depending on the type of test we're running

Extended Server2 with Shutdown action

Make connectionLoop resilient so that it can't die
  • Loading branch information
bolt12 authored and coot committed Oct 14, 2021
1 parent c8bb993 commit 813be01
Showing 1 changed file with 66 additions and 17 deletions.
83 changes: 66 additions & 17 deletions ouroboros-network-framework/test/Test/Ouroboros/Network/Server2.hs
Expand Up @@ -61,6 +61,8 @@ import Test.Tasty (TestTree, testGroup)

import Control.Concurrent.JobPool

import Codec.CBOR.Term (Term)

import qualified Network.Mux as Mux
import Network.Mux.Types (MuxRuntimeError (..))
import qualified Network.Socket as Socket
Expand All @@ -76,14 +78,17 @@ import Ouroboros.Network.ConnectionId
import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.ConnectionManager.Core
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits)
import Ouroboros.Network.IOManager
import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..),
RemoteSt (..))
import qualified Ouroboros.Network.InboundGovernor.ControlChannel as Server
import Ouroboros.Network.Mux
import Ouroboros.Network.MuxMode
import Ouroboros.Network.Protocol.Handshake
import Ouroboros.Network.Protocol.Handshake.Codec (noTimeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Codec ( noTimeLimitsHandshake
, timeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Type (Handshake)
import Ouroboros.Network.Protocol.Handshake.Unversioned
import Ouroboros.Network.Protocol.Handshake.Version (Acceptable (..))
import Ouroboros.Network.RethrowPolicy
Expand Down Expand Up @@ -303,12 +308,15 @@ withInitiatorOnlyConnectionManager
-> Maybe peerAddr
-> Bundle (ConnectionId peerAddr -> STM m [req])
-- ^ Functions to get the next requests for a given connection
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-- ^ Handshake time limits
-> (MuxConnectionManager
InitiatorMode socket peerAddr
UnversionedProtocol ByteString m [resp] Void
-> m a)
-> m a
withInitiatorOnlyConnectionManager name timeouts cmTrTracer snocket localAddr nextRequests k = do
withInitiatorOnlyConnectionManager name timeouts cmTrTracer snocket localAddr
nextRequests handshakeTimeLimits k = do
mainThreadId <- myThreadId
let muxTracer = (name,) `contramap` nullTracer -- mux tracer
withConnectionManager
Expand Down Expand Up @@ -344,7 +352,7 @@ withInitiatorOnlyConnectionManager name timeouts cmTrTracer snocket localAddr ne
haHandshakeCodec = unversionedHandshakeCodec,
haVersionDataCodec = unversionedProtocolDataCodec,
haAcceptVersion = acceptableVersion,
haTimeLimits = noTimeLimitsHandshake
haTimeLimits = handshakeTimeLimits
}
(unversionedProtocol clientApplication)
(mainThreadId, debugMuxErrorRethrowPolicy
Expand Down Expand Up @@ -473,6 +481,8 @@ withBidirectionalConnectionManager
-- ^ Functions to get the next requests for a given connection
-- ^ series of request possible to do with the bidirectional connection
-- manager towards some peer.
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-- ^ Handshake time limits
-> (MuxConnectionManager
InitiatorResponderMode socket peerAddr
UnversionedProtocol ByteString m [resp] acc
Expand All @@ -483,7 +493,8 @@ withBidirectionalConnectionManager
withBidirectionalConnectionManager name timeouts
inboundTrTracer cmTrTracer inboundTracer
snocket socket localAddress
accumulatorInit nextRequests k = do
accumulatorInit nextRequests
handshakeTimeLimits k = do
mainThreadId <- myThreadId
inbgovControlChannel <- Server.newControlChannel
-- we are not using the randomness
Expand Down Expand Up @@ -523,7 +534,7 @@ withBidirectionalConnectionManager name timeouts
haHandshakeCodec = unversionedHandshakeCodec,
haVersionDataCodec = unversionedProtocolDataCodec,
haAcceptVersion = acceptableVersion,
haTimeLimits = noTimeLimitsHandshake
haTimeLimits = handshakeTimeLimits
}
(unversionedProtocol serverApplication)
(mainThreadId, debugMuxErrorRethrowPolicy
Expand Down Expand Up @@ -556,7 +567,6 @@ withBidirectionalConnectionManager name timeouts
(\serverAsync -> link serverAsync
>> k connectionManager serverAddr serverAsync)
`catch` \(e :: SomeException) -> do
say (show e)
throwIO e
where
-- for a bidirectional mux we need to define 'Mu.xMiniProtocolInfo' for each
Expand Down Expand Up @@ -733,13 +743,14 @@ unidirectionalExperiment
unidirectionalExperiment timeouts snocket socket clientAndServerData = do
nextReqs <- oneshotNextRequests clientAndServerData
withInitiatorOnlyConnectionManager
"client" timeouts nullTracer snocket Nothing nextReqs
"client" timeouts nullTracer snocket Nothing nextReqs timeLimitsHandshake
$ \connectionManager ->
withBidirectionalConnectionManager "server" timeouts
nullTracer nullTracer nullTracer
snocket socket Nothing
[accumulatorInit clientAndServerData]
noNextRequests
timeLimitsHandshake
$ \_ serverAddr _serverAsync -> do
-- client → server: connect
(rs :: [Either SomeException (Bundle [resp])]) <-
Expand Down Expand Up @@ -843,13 +854,15 @@ bidirectionalExperiment
(Just localAddr0)
[accumulatorInit clientAndServerData0]
nextRequests0
noTimeLimitsHandshake
(\connectionManager0 _serverAddr0 _serverAsync0 ->
withBidirectionalConnectionManager "node-1" timeouts
nullTracer nullTracer nullTracer
snocket socket1
(Just localAddr1)
[accumulatorInit clientAndServerData1]
nextRequests1
noTimeLimitsHandshake
(\connectionManager1 _serverAddr1 _serverAsync1 -> do
-- runInitiatorProtocols returns a list of results per each
-- protocol in each bucket (warm \/ hot \/ established); but
Expand Down Expand Up @@ -1014,6 +1027,8 @@ data ConnectionEvent req peerAddr
-- ^ Close an inbound connection.
| CloseOutboundConnection DiffTime peerAddr
-- ^ Close an outbound connection.
| ShutdownClientServer DiffTime peerAddr
-- ^ Shuts down a client/server (simulates power loss)
deriving (Show, Functor)

-- | A sequence of connection events that make up a test scenario for `prop_multinode_Sim`.
Expand All @@ -1040,6 +1055,8 @@ nextState e s@ScriptState{..} =
CloseOutboundConnection _ a -> s{ outboundConnections = delete a outboundConnections }
InboundMiniprotocols{} -> s
OutboundMiniprotocols{} -> s
ShutdownClientServer _ a -> s{ startedClients = delete a startedClients
, startedServers = delete a startedServers }

-- | Check if an event makes sense in a given state.
isValidEvent :: Eq peerAddr => ConnectionEvent req peerAddr -> ScriptState peerAddr -> Bool
Expand All @@ -1053,6 +1070,7 @@ isValidEvent e ScriptState{..} =
CloseOutboundConnection _ a -> elem a outboundConnections
InboundMiniprotocols _ a _ -> elem a inboundConnections
OutboundMiniprotocols _ a _ -> elem a outboundConnections
ShutdownClientServer _ a -> elem a (startedClients ++ startedServers)

-- This could be an Arbitrary instance, but it would be an orphan.
genBundle :: Arbitrary a => Gen (Bundle a)
Expand All @@ -1077,14 +1095,16 @@ instance (Arbitrary peerAddr, Arbitrary req, Eq peerAddr) =>
event <- frequency $
[ (4, StartClient <$> delay <*> newClient)
, (4, StartServer <$> delay <*> newServer <*> arbitrary) ] ++
[ (4, InboundConnection <$> delay <*> elements possibleInboundConnections) | not $ null possibleInboundConnections] ++
[ (4, OutboundConnection <$> delay <*> elements possibleOutboundConnections) | not $ null possibleOutboundConnections] ++
[ (4, CloseInboundConnection <$> delay <*> elements inboundConnections) | not $ null $ inboundConnections ] ++
[ (4, CloseOutboundConnection <$> delay <*> elements outboundConnections) | not $ null $ outboundConnections ] ++
[ (4, InboundConnection <$> delay <*> elements possibleInboundConnections) | not $ null possibleInboundConnections] ++
[ (4, OutboundConnection <$> delay <*> elements possibleOutboundConnections) | not $ null possibleOutboundConnections] ++
[ (4, CloseInboundConnection <$> delay <*> elements inboundConnections) | not $ null $ inboundConnections ] ++
[ (4, CloseOutboundConnection <$> delay <*> elements outboundConnections) | not $ null $ outboundConnections ] ++
[ (16, InboundMiniprotocols <$> delay <*> elements inboundConnections <*> genBundle) | not $ null inboundConnections ] ++
[ (16, OutboundMiniprotocols <$> delay <*> elements outboundConnections <*> genBundle) | not $ null outboundConnections ]
[ (16, OutboundMiniprotocols <$> delay <*> elements outboundConnections <*> genBundle) | not $ null outboundConnections ] ++
[ (8, ShutdownClientServer <$> delay <*> elements possibleStoppable) | not $ null possibleStoppable ]
(event :) <$> go (nextState event s) (n - 1)
where
possibleStoppable = (startedClients ++ startedServers)
possibleInboundConnections = (startedClients ++ startedServers) \\ inboundConnections
possibleOutboundConnections = startedServers \\ outboundConnections
newClient = arbitrary `suchThat` (`notElem` (startedClients ++ startedServers))
Expand Down Expand Up @@ -1115,6 +1135,7 @@ instance (Arbitrary peerAddr, Arbitrary req, Eq peerAddr) =>
shrinkEvent (OutboundMiniprotocols d a r) =
(shrinkBundle r <&> \ r' -> OutboundMiniprotocols d a r') ++
(shrinkDelay d <&> \ d' -> OutboundMiniprotocols d' a r)
shrinkEvent (ShutdownClientServer d a) = shrinkDelay d <&> \ d' -> ShutdownClientServer d' a


prop_generator_MultiNodeScript :: MultiNodeScript Int TestAddr -> Property
Expand Down Expand Up @@ -1156,6 +1177,15 @@ prop_generator_MultiNodeScript (MultiNodeScript script) =
_ -> False)
$ script
))
$ label ("Number of shutdown connections: "
++ ( within_ 2
. length
. filter (\ ev -> case ev of
ShutdownClientServer {} -> True
_ -> False
)
$ script
))
$ True


Expand All @@ -1182,6 +1212,8 @@ data ConnectionHandlerMessage peerAddr req
| RunMiniProtocols peerAddr (Bundle [req])
-- ^ Run a bundle of mini protocols against the server at the given address (requires an active
-- connection).
| Shutdown
-- ^ Shutdowns a server at the given address


data Name addr = Client addr
Expand Down Expand Up @@ -1293,6 +1325,11 @@ multinodeExperiment inboundTrTracer cmTrTracer inboundTracer
threadDelay delay
sendMsg serverAddr $ RunMiniProtocols nodeAddr reqs
loop nodeAccs servers events jobpool

ShutdownClientServer delay nodeAddr -> do
threadDelay delay
sendMsg nodeAddr $ Shutdown
loop nodeAccs servers events jobpool
where
sendMsg :: peerAddr -> ConnectionHandlerMessage peerAddr req -> m ()
sendMsg addr msg = atomically $
Expand Down Expand Up @@ -1325,6 +1362,7 @@ multinodeExperiment inboundTrTracer cmTrTracer inboundTracer
$ Job
( withInitiatorOnlyConnectionManager
name simTimeouts nullTracer snocket (Just localAddr) (mkNextRequests connVar)
timeLimitsHandshake
( \ connectionManager -> do
connectionLoop SingInitiatorMode localAddr cc connectionManager Map.empty connVar
return Nothing
Expand Down Expand Up @@ -1363,6 +1401,7 @@ multinodeExperiment inboundTrTracer cmTrTracer inboundTracer
inboundTrTracer cmTrTracer inboundTracer
snocket fd (Just localAddr) serverAcc
(mkNextRequests connVar)
timeLimitsHandshake
( \ connectionManager _ _serverAsync -> do
connectionLoop SingInitiatorResponderMode localAddr cc connectionManager Map.empty connVar
return Nothing
Expand All @@ -1381,6 +1420,7 @@ multinodeExperiment inboundTrTracer cmTrTracer inboundTracer
Job ( withInitiatorOnlyConnectionManager
name simTimeouts cmTrTracer snocket (Just localAddr)
(mkNextRequests connVar)
timeLimitsHandshake
( \ connectionManager -> do
connectionLoop SingInitiatorMode localAddr cc connectionManager Map.empty connVar
return Nothing
Expand Down Expand Up @@ -1418,13 +1458,15 @@ multinodeExperiment inboundTrTracer cmTrTracer inboundTracer
TokWarm -> "warm"
TokEstablished -> "cold"
q <$ labelTQueue q ("protoVar." ++ temp ++ "@" ++ show localAddr)
qs <- atomically $ traverse id $ makeBundle mkQueue
atomically $ modifyTVar connVar $ Map.insert (connId remoteAddr) qs
connHandle <- requestOutboundConnection cm remoteAddr
connHandle <- try @_ @SomeException
$ requestOutboundConnection cm remoteAddr
case connHandle of
Connected _ _ h ->
Left _ -> connectionLoop muxMode localAddr cc cm connMap connVar
Right (Connected _ _ h) -> do
qs <- atomically $ traverse id $ makeBundle mkQueue
atomically $ modifyTVar connVar $ Map.insert (connId remoteAddr) qs
connectionLoop muxMode localAddr cc cm (Map.insert remoteAddr h connMap) connVar
Disconnected {} -> return ()
Right Disconnected {} -> return ()
Disconnect remoteAddr -> do
atomically $ modifyTVar connVar $ Map.delete (connId remoteAddr)
_ <- unregisterOutboundConnection cm remoteAddr
Expand All @@ -1434,17 +1476,22 @@ multinodeExperiment inboundTrTracer cmTrTracer inboundTracer
mqs <- (Map.lookup $ connId remoteAddr) <$> readTVar connVar
case mqs of
Nothing ->
-- We want to throw because the generator invariant should never put us in
-- this case
throwIO (NoActiveConnection localAddr remoteAddr)
Just qs -> do
sequence_ $ writeTQueue <$> qs <*> reqs
case Map.lookup remoteAddr connMap of
-- We want to throw because the generator invariant should never put us in
-- this case
Nothing -> throwIO (NoActiveConnection localAddr remoteAddr)
Just (Handle mux muxBundle _) ->
-- TODO:
-- At times this throws 'ProtocolAlreadyRunning'.
void $ try @_ @SomeException
$ runInitiatorProtocols muxMode mux muxBundle
connectionLoop muxMode localAddr cc cm connMap connVar
Shutdown -> return ()
where
connId remoteAddr = ConnectionId { localAddress = localAddr
, remoteAddress = remoteAddr }
Expand Down Expand Up @@ -1926,6 +1973,7 @@ ppScript (MultiNodeScript script) = intercalate "\n" $ go 0 script
delay (OutboundMiniprotocols d _ _) = d
delay (CloseInboundConnection d _) = d
delay (CloseOutboundConnection d _) = d
delay (ShutdownClientServer d _) = d

ppEvent (StartServer _ a i) = "Start server " ++ show a ++ " with accInit=" ++ show i
ppEvent (StartClient _ a) = "Start client " ++ show a
Expand All @@ -1935,6 +1983,7 @@ ppScript (MultiNodeScript script) = intercalate "\n" $ go 0 script
ppEvent (OutboundMiniprotocols _ a p) = "Miniprotocols to " ++ show a ++ ": " ++ ppData p
ppEvent (CloseInboundConnection _ a) = "Close connection from " ++ show a
ppEvent (CloseOutboundConnection _ a) = "Close connection to " ++ show a
ppEvent (ShutdownClientServer _ a) = "Shutdown client/server " ++ show a

ppData (Bundle hot warm est) =
concat [ "hot:", show (withoutProtocolTemperature hot)
Expand Down

0 comments on commit 813be01

Please sign in to comment.