Skip to content

Commit

Permalink
pruning: set connection state to TerminatedState
Browse files Browse the repository at this point in the history
When pruning we can set the state to TerminatedState and then cancel the
connection handler thread, which will recognise this connection to be
pruned.  Note that this avoids our application level WAIT_TIME interval.
  • Loading branch information
coot committed Nov 22, 2021
1 parent ae79af7 commit ded160a
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 38 deletions.
Expand Up @@ -7,6 +7,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
-- Undecidable instances are need for 'Show' instance of 'ConnectionState'.
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE QuantifiedConstraints #-}
Expand All @@ -25,7 +26,7 @@ module Ouroboros.Network.ConnectionManager.Core
) where

import Control.Exception (assert)
import Control.Monad (guard, when)
import Control.Monad (forM_, guard, when)
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadThrow hiding (handle)
Expand Down Expand Up @@ -483,7 +484,13 @@ data DemoteToColdLocal peerAddr handlerTrace handle handleError version m
--
| PruneConnections (ConnectionId peerAddr)

(Map peerAddr (Async m ()))
(Map peerAddr ( Async m ()
, StrictTVar m
(ConnectionState
peerAddr
handle handleError
version m)
))
-- ^ a subset of connections to be prunned

Int
Expand Down Expand Up @@ -1880,31 +1887,44 @@ withConnectionManager ConnectionManagerArguments {
-- have 'ConnectionType' and are running (have a thread).
-- This excludes connections in 'ReservedOutboundState',
-- 'TerminatingState' and 'TerminatedState'.
(choiceMap' :: Map peerAddr (ConnectionType, Async m ()))
(choiceMap' :: Map peerAddr ( ConnectionType
, Async m ()
, StrictTVar m
(ConnectionState
peerAddr
handle handleError
version m)
))
<- flip Map.traverseMaybeWithKey state $ \_peerAddr MutableConnState { connVar = connVar' } ->
(\cs -> do
-- this expression returns @Maybe (connType, connThread)@;
-- 'traverseMaybeWithKey' collects all 'Just' cases.
guard (isInboundConn cs)
(,) <$> getConnType cs
<*> getConnThread cs)
(,,connVar') <$> getConnType cs
<*> getConnThread cs)
<$> readTVar connVar'
let choiceMap =
case getConnType connState' of
Nothing -> assert False choiceMap'
Just a -> Map.insert peerAddr (a, connThread)
Just a -> Map.insert peerAddr (a, connThread, connVar)
choiceMap'

pruneSet <-
cmPrunePolicy
(fst <$> choiceMap)
((\(a, _, _) -> a) <$> choiceMap)
numberToPrune

when (remoteAddress connId `Set.notMember` pruneSet)
$ writeTVar connVar connState'

let pruneMap = choiceMap `Map.restrictKeys` pruneSet
forM_ pruneMap $ \(_, _, connVar') ->

writeTVar connVar' (TerminatedState Nothing)
return
( PruneConnections connId
(snd <$> choiceMap `Map.restrictKeys` pruneSet)
((\(_, a, b) -> (a, b))
<$> pruneMap)
numberToPrune
(Map.keysSet choiceMap)
(Left connState)
Expand Down Expand Up @@ -1973,33 +1993,45 @@ withConnectionManager ConnectionManagerArguments {
-- have 'ConnectionType' and are running (have a thread).
-- This excludes connections in 'ReservedOutboundState',
-- 'TerminatingState' and 'TerminatedState'.
(choiceMap' :: Map peerAddr (ConnectionType, Async m ()))
(choiceMap' :: Map peerAddr ( ConnectionType
, Async m ()
, StrictTVar m
(ConnectionState
peerAddr
handle handleError
version m)
))
<- flip Map.traverseMaybeWithKey state $ \_peerAddr MutableConnState { connVar = connVar' } ->
(\cs -> do
-- this expression returns @Maybe (connType, connThread)@;
-- 'traverseMaybeWithKey' collects all 'Just' cases.
guard (isInboundConn cs)
(,) <$> getConnType cs
<*> getConnThread cs)
(,,connVar') <$> getConnType cs
<*> getConnThread cs)
<$> readTVar connVar'
let choiceMap =
case getConnType connState' of
Nothing -> assert False choiceMap'
Just a -> Map.insert peerAddr (a, connThread)
Just a -> Map.insert peerAddr (a, connThread, connVar)
choiceMap'

pruneSet <-
cmPrunePolicy
(fst <$> choiceMap)
((\(a,_,_) -> a) <$> choiceMap)
numberToPrune

let pruneMap = choiceMap `Map.restrictKeys` pruneSet
forM_ pruneMap $ \(_, _, connVar') ->
writeTVar connVar' (TerminatedState Nothing)

-- If this connection is in the to-prune set we do not let it
-- evolve to a new state. Otherwise we do.
if Set.member peerAddr pruneSet
then
return
( PruneConnections connId
(snd <$> choiceMap `Map.restrictKeys` pruneSet)
((\(_, a, b) -> (a, b))
<$> pruneMap)
numberToPrune
(Map.keysSet choiceMap)
(Left connState)
Expand All @@ -2009,7 +2041,8 @@ withConnectionManager ConnectionManagerArguments {
writeTVar connVar connState'
return
( PruneConnections connId
(snd <$> choiceMap `Map.restrictKeys` pruneSet)
((\(_, a, b) -> (a, b))
<$> pruneMap)
numberToPrune
(Map.keysSet choiceMap)
(Right tr)
Expand Down Expand Up @@ -2081,13 +2114,14 @@ withConnectionManager ConnectionManagerArguments {

PruneConnections _connId pruneMap numberToPrune choiceSet eTr -> do
traverse_ (traceWith trTracer . TransitionTrace peerAddr) eTr
traceCounters stateVar
traceWith tracer (TrPruneConnections (Map.keysSet pruneMap)
numberToPrune
choiceSet)
-- previous comment applies here as well.
traverse_ cancel pruneMap
forM_ pruneMap $ \(connThread', _) -> do
cancel connThread'

traceCounters stateVar
return (OperationSuccess (abstractState (either Known fromState eTr)))

DemoteToColdLocalError trace st -> do
Expand Down Expand Up @@ -2180,31 +2214,44 @@ withConnectionManager ConnectionManagerArguments {
-- have 'ConnectionType' and are running (have a thread).
-- This excludes connections in 'ReservedOutboundState',
-- 'TerminatingState' and 'TerminatedState'.
(choiceMap' :: Map peerAddr (ConnectionType, Async m ()))
(choiceMap' :: Map peerAddr ( ConnectionType
, Async m ()
, StrictTVar m
(ConnectionState
peerAddr
handle handleError
version m)
))
<- flip Map.traverseMaybeWithKey state $ \_peerAddr MutableConnState { connVar = connVar' } ->
(\cs -> do
-- this expression returns @Maybe (connType, connThread)@;
-- 'traverseMaybeWithKey' collects all 'Just' cases.
-- 'traverseMaybeWithKey' collects all 'Just' cases.
guard (isInboundConn cs)
(,) <$> getConnType cs
<*> getConnThread cs)
(,,connVar') <$> getConnType cs
<*> getConnThread cs)
<$> readTVar connVar'
let choiceMap =
case getConnType connState' of
Nothing -> assert False choiceMap'
Just a -> Map.insert peerAddr (a, connThread)
Just a -> Map.insert peerAddr (a, connThread, connVar)
choiceMap'

pruneSet <-
cmPrunePolicy
(fst <$> choiceMap)
((\(a, _, _) -> a)
<$> choiceMap)
numberToPrune

let pruneMap = choiceMap `Map.restrictKeys` pruneSet
forM_ pruneMap $ \(_, _, connVar') ->
writeTVar connVar' (TerminatedState Nothing)

when (remoteAddress connId `Set.notMember` pruneSet)
$ writeTVar connVar connState'

return
( OperationSuccess tr
, Just ( snd <$> choiceMap `Map.restrictKeys` pruneSet
, Just ( pruneMap
, numberToPrune
, Map.keysSet choiceMap
, Nothing
Expand Down Expand Up @@ -2244,30 +2291,43 @@ withConnectionManager ConnectionManagerArguments {
-- have 'ConnectionType' and are running (have a thread).
-- This excludes connections in 'ReservedOutboundState',
-- 'TerminatingState' and 'TerminatedState'.
(choiceMap' :: Map peerAddr (ConnectionType, Async m ()))
(choiceMap' :: Map peerAddr ( ConnectionType
, Async m ()
, StrictTVar m
(ConnectionState
peerAddr
handle handleError
version m)
))
<- flip Map.traverseMaybeWithKey state $ \_peerAddr MutableConnState { connVar = connVar' } ->
(\cs -> do
-- this expression returns @Maybe (connType, connThread)@;
-- 'traverseMaybeWithKey' collects all 'Just' cases.
guard (isInboundConn cs)
(,) <$> getConnType cs
<*> getConnThread cs)
(,,connVar') <$> getConnType cs
<*> getConnThread cs)
<$> readTVar connVar'
let choiceMap =
case getConnType connState' of
Nothing -> assert False choiceMap'
Just a -> Map.insert peerAddr (a, connThread) choiceMap'
Just a -> Map.insert peerAddr (a, connThread, connVar)
choiceMap'

pruneSet <-
cmPrunePolicy
(fst <$> choiceMap)
((\(a, _, _) -> a) <$> choiceMap)
numberToPrune

let pruneMap = choiceMap `Map.restrictKeys` pruneSet
forM_ pruneMap $ \(_, _, connVar') ->
writeTVar connVar' (TerminatedState Nothing)

when (remoteAddress connId `Set.notMember` pruneSet)
$ writeTVar connVar connState'

return
( OperationSuccess tr
, Just ( snd <$> choiceMap `Map.restrictKeys` pruneSet
, Just ( pruneMap
, numberToPrune
, Map.keysSet choiceMap
, Nothing
Expand Down Expand Up @@ -2340,11 +2400,11 @@ withConnectionManager ConnectionManagerArguments {
numberToPrune
choiceSet)

-- We relay on the `finally` handler of connection thread to:
--
-- - close the socket,
-- - set the state to 'TerminatedState'
traverse_ cancel pruneMap
-- We relay on the `finally` handler of connection thread to
-- close the socket.
forM_ pruneMap $ \ (_, connThread', _) -> cancel connThread'

traceCounters stateVar

_ -> return ()
return (abstractState . fromState <$> result)
Expand Down
Expand Up @@ -127,9 +127,8 @@ tests =
prop_connection_manager_pruning
, testProperty "inbound_governor_pruning"
prop_inbound_governor_pruning
-- The test fails at the moment. See issue #3487.
-- , testProperty "never_above_hardlimit"
-- prop_never_above_hardlimit
, testProperty "never_above_hardlimit"
prop_never_above_hardlimit
, testProperty "connection_manager_valid_transitions"
prop_connection_manager_valid_transitions
, testProperty "connection_manager_no_invalid_traces"
Expand Down

0 comments on commit ded160a

Please sign in to comment.