Skip to content
Permalink
Browse files

Added SocketStateChange Callback

  • Loading branch information...
coot committed Aug 13, 2019
1 parent 958ec66 commit 6fc85e4a82f527c3cd402e9f48fa2c1eb9531f9e
Showing with 28 additions and 8 deletions.
  1. +28 −8 ouroboros-network/src/Ouroboros/Network/Subscription/Worker.hs
@@ -10,6 +10,8 @@

module Ouroboros.Network.Subscription.Worker
( BeforeConnect
, SocketStateChange
, SocketState (..)
, CompleteApplication
, Result (..)
, Main
@@ -139,6 +141,16 @@ type ThreadsVar m = TVar m (Set (Async m ()))
--
type BeforeConnect m s addr = Time m -> addr -> s -> STM m (s, Bool)

data SocketState m addr
= CreatedSocket !addr !(Async m ())
| ClosedSocket !addr !(Async m ())

-- | Callback which firest: when we create or close a socket.
--
-- Note: this callback runs with async exceptions masked.
--
type SocketStateChange m s addr = SocketState m addr -> s -> STM m s

-- | Complete a connection, which receive application result (or exception).
--
type CompleteApplication m s addr r = Result m addr r -> s -> STM m (s, m ())
@@ -262,6 +274,7 @@ subscriptionLoop

-> Socket m addr sock
-> BeforeConnect m s addr
-> SocketStateChange m s addr
-> CompleteApplication m s addr a
-- ^ callback which fires when either 'connect' fails or the application
-- returns.
@@ -281,7 +294,7 @@ subscriptionLoop
-- ^ application
-> m Void
subscriptionLoop
tr tbl resQ sVar threadsVar socket beforeConn complete
tr tbl resQ sVar threadsVar socket beforeConn sockStChange complete
mbLocalIPv4 mbLocalIPv6 selectAddr connectionAttemptDelay
getTargets valency k = do
valencyVar <- atomically $ newValencyCounter tbl valency
@@ -391,11 +404,17 @@ subscriptionLoop
localAddr
(atomically $ do
modifyTVar' conThreads (Set.insert thread)
modifyTVar' threadsVar (Set.insert thread))
(atomically $
modifyTVar' threadsVar (Set.insert thread)
readTVar sVar
>>= sockStChange (CreatedSocket remoteAddr thread)
>>= (writeTVar sVar $!))
(atomically $ do
-- The thread is removed from 'conThreads'
-- inside 'connAction'.
modifyTVar' threadsVar (Set.delete thread))
modifyTVar' threadsVar (Set.delete thread)
readTVar sVar
>>= sockStChange (ClosedSocket remoteAddr thread)
>>= (writeTVar sVar $!))
(connAction
thread conThreads valencyVar
remoteAddr)
@@ -546,6 +565,7 @@ worker

-- callbacks
-> BeforeConnect IO s addr
-> SocketStateChange IO s addr
-> CompleteApplication IO s addr a
-> Main IO s t

@@ -565,15 +585,15 @@ worker
-> (sock -> IO a)
-- ^ application
-> IO t
worker tr tbl sVar socket beforeConn
worker tr tbl sVar socket beforeConn sockStChange
complete main mbLocalIPv4 mbLocalIPv6 selectAddr
connectionAttemptDelay getTargets valency k = do
resQ <- newResultQ
threadsVar <- atomically $ newTVar Set.empty
withAsync
(subscriptionLoop tr tbl resQ sVar threadsVar socket beforeConn complete
mbLocalIPv4 mbLocalIPv6 selectAddr connectionAttemptDelay
getTargets valency k) $ \_ ->
(subscriptionLoop tr tbl resQ sVar threadsVar socket beforeConn
sockStChange complete mbLocalIPv4 mbLocalIPv6 selectAddr
connectionAttemptDelay getTargets valency k) $ \_ ->
mainLoop resQ threadsVar sVar complete main
`finally` killThreads threadsVar
where

0 comments on commit 6fc85e4

Please sign in to comment.
You can’t perform that action at this time.