Skip to content

Commit

Permalink
moar debug + possible fix
Browse files Browse the repository at this point in the history
  • Loading branch information
karknu committed Sep 28, 2021
1 parent 655aa1f commit cf0dc47
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
Expand Up @@ -148,9 +148,9 @@ data AcceptConnectionsPolicyTrace
| ServerTraceAcceptConnectionHardLimit Word32
| ServerTraceAcceptConnectionResume Int
| ServerTraceAcceptConnections Int
| ServerTraceAcceptAccept String
| ServerTraceAcceptClose String
| ServerTraceAcceptReject String
| ServerTraceAcceptAccept String Int
| ServerTraceAcceptClose String Int
| ServerTraceAcceptReject String Int
deriving (Eq, Ord, Typeable)

instance Show AcceptConnectionsPolicyTrace where
Expand All @@ -166,6 +166,6 @@ instance Show AcceptConnectionsPolicyTrace where
printf "hard rate limit over, accepting connections again, currently serving %d connections"
numberOfConnections
show (ServerTraceAcceptConnections count) = printf "current connetions %d" count
show (ServerTraceAcceptAccept peer) = printf "XXX accept %s" peer
show (ServerTraceAcceptClose peer) = printf "XXX close %s" peer
show (ServerTraceAcceptReject peer) = printf "XXX reject %s" peer
show (ServerTraceAcceptAccept peer size) = printf "XXX accept %s,%d" peer size
show (ServerTraceAcceptClose peer size) = printf "XXX close %s,%d" peer size
show (ServerTraceAcceptReject peer size) = printf "XXX reject %s,%d" peer size
25 changes: 15 additions & 10 deletions ouroboros-network-framework/src/Ouroboros/Network/Server/Socket.hs
Expand Up @@ -117,7 +117,6 @@ data Result addr r = Result
-- the server shuts down.
type ThreadsVar = STM.TVar (Set (Async ()))


-- | The action runs inside `try`, and when it finishes, puts its result
-- into the `ResultQ`. Takes care of inserting/deleting from the `ThreadsVar`.
--
Expand Down Expand Up @@ -227,11 +226,13 @@ acceptOne acceptPolicyTrace resQ threadsVar statusVar acceptedConnectionsLimit b
choice <- decision `onException` close
case choice of
Nothing -> do
traceWith acceptPolicyTrace $ ServerTraceAcceptReject $ show addr
size <- STM.atomically $ Set.size <$> STM.readTVar threadsVar
traceWith acceptPolicyTrace $ ServerTraceAcceptReject (show addr) size
close
Just io -> do
traceWith acceptPolicyTrace $ ServerTraceAcceptAccept $ show addr
spawnOne addr statusVar resQ threadsVar applicationStart (io channel `finally` close)
size <- STM.atomically $ Set.size <$> STM.readTVar threadsVar
traceWith acceptPolicyTrace $ ServerTraceAcceptAccept (show addr) size
pure (Just nextSocket)

trackConnections :: ThreadsVar
Expand All @@ -250,24 +251,24 @@ trackConnections threadsVar tracer = go 0

trackConnectionDeath :: forall addr.
Show addr
=> STM.TQueue addr
=> STM.TQueue (addr, Int)
-> Tracer IO AcceptConnectionsPolicyTrace
-> IO ()
trackConnectionDeath tq tracer = forever $ do
deadPeer <- STM.atomically $ STM.readTQueue tq
traceWith tracer $ ServerTraceAcceptClose $ show deadPeer

(deadPeer, size) <- STM.atomically $ STM.readTQueue tq
traceWith tracer $ ServerTraceAcceptClose (show deadPeer) size


-- | Main server loop, which runs alongside the `acceptLoop`. It waits for
-- the results of connection threads, as well as the `Main` action, which
-- determines when/if the server should stop.
mainLoop
:: forall addr st tr r t .
Tracer IO (WithAddr addr ErrorPolicyTrace)
Show addr
=> Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ResultQ addr r
-> ThreadsVar
-> STM.TQueue addr
-> STM.TQueue (addr, Int)
-> StatusVar st
-> CompleteConnection addr st tr r
-> Main st t
Expand All @@ -290,6 +291,9 @@ mainLoop errorPolicyTrace resQ threadsVar deathsVar statusVar complete main =
connectionTx :: STM (IO t)
connectionTx = do
result <- STM.readTQueue resQ
-- Make sure the don't cleanup before spawnOne has inserted the thread
isMember <- Set.member (resultThread result) <$> STM.readTVar threadsVar
STM.check isMember
st <- STM.readTVar statusVar
CompleteApplicationResult
{ carState
Expand All @@ -301,7 +305,8 @@ mainLoop errorPolicyTrace resQ threadsVar deathsVar statusVar complete main =
STM.writeTVar statusVar carState
-- It was inserted by `spawnOne`.
STM.modifyTVar' threadsVar (Set.delete (resultThread result))
STM.writeTQueue deathsVar (resultAddr result)
size <- Set.size <$> STM.readTVar threadsVar
STM.writeTQueue deathsVar ((resultAddr result), size)
pure $ do
traverse_ Async.cancel carThreads
traverse_ (traceWith errorPolicyTrace) carTrace
Expand Down

0 comments on commit cf0dc47

Please sign in to comment.