Skip to content

Commit

Permalink
Snocket.Accept
Browse files Browse the repository at this point in the history
Rescue Alex Vieth's 'Accept' modification.  I couldn't cherry-pick the
commit since it was burried inside a merge commit.

There's no proper way to fix `Ouroboros.Network.Soocket.fromSnocket`,
but this is ok, as it will be removed in a later commit.
  • Loading branch information
coot committed Oct 15, 2020
1 parent 4c2e75c commit 8d4859c
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 65 deletions.
115 changes: 80 additions & 35 deletions ouroboros-network-framework/src/Ouroboros/Network/Snocket.hs
Expand Up @@ -6,6 +6,7 @@
module Ouroboros.Network.Snocket
( -- * Snocket Interface
Accept (..)
, Accepted (..)
, fmapAccept
, AddressFamily (..)
, Snocket (..)
Expand Down Expand Up @@ -86,44 +87,67 @@ import Ouroboros.Network.IOManager
-- created by 'open' and only subsequent calls will create a new file
-- descriptor by `createNamedPipe`, see 'namedPipeSnocket'.
--
newtype Accept addr fd = Accept
{ runAccept :: IO (fd, addr, Accept addr fd)
newtype Accept m err addr fd = Accept
{ runAccept :: m (Accepted err addr fd, Accept m err addr fd)
}

data Accepted err addr fd where
AcceptException :: !err -> Accepted err addr fd
Accepted :: !fd -> !addr -> Accepted err addr fd

-- | Arguments of 'Accept' are in the wrong order.
-- TODO: 'Accept' should have one argument for addr and fd instantiated to a tuple.
-- This would make it possible to have both 'Foldable' and 'Functor' instances.
--
-- TODO: this can be fixed later.
--
fmapAccept :: (addr -> addr') -> Accept addr fd -> Accept addr' fd
instance Foldable (Accepted err addr) where
foldMap f (Accepted fd _) = f fd
foldMap _f (AcceptException _) = mempty

fmapAccept :: Functor m
=> (addr -> addr')
-> Accept m err addr fd -> Accept m err addr' fd
fmapAccept f ac = Accept $ g <$> runAccept ac
where
g (fd, addr, next) = (fd, f addr, fmapAccept f next)
g (AcceptException acceptException, next) =
(AcceptException acceptException, f `fmapAccept` next)
g (Accepted fd addr, next) =
(Accepted fd (f addr), f `fmapAccept` next)



-- | BSD accept loop.
--
berkeleyAccept :: IOManager
-> Socket
-> Accept SockAddr Socket
-> Accept IO SomeException SockAddr Socket
berkeleyAccept ioManager sock = go
where
go = Accept $ do
(sock', addr') <-
go = Accept (acceptOne `catch` handleIOException)

acceptOne
:: IO ( Accepted SomeException SockAddr Socket
, Accept IO SomeException SockAddr Socket
)
acceptOne =
bracketOnError
#if !defined(mingw32_HOST_OS)
Socket.accept sock
(Socket.accept sock)
#else
Win32.Async.accept sock
(Win32.Async.accept sock)
#endif
associateWithIOManager ioManager (Right sock')
`catch` \(e :: IOException) -> do
Socket.close sock'
throwIO e
`catch` \(SomeAsyncException _) -> do
Socket.close sock'
throwIO e
return (sock', addr', go)
(Socket.close . fst)
$ \(sock', addr') -> do
associateWithIOManager ioManager (Right sock')
return (Accepted sock' addr', go)

-- Only IOExceptions will be caught and put into the AcceptException
-- variant. Other exceptions cause the entire Accept chain to become
-- useless (no subsequent Accept term is given).
handleIOException
:: IOException
-> IO ( Accepted SomeException SockAddr Socket
, Accept IO SomeException SockAddr Socket
)
handleIOException err = pure (AcceptException (toException err), go)

-- | Local address, on Unix is associated with `Socket.AF_UNIX` family, on
--
Expand Down Expand Up @@ -182,7 +206,10 @@ data Snocket m fd addr = Snocket {
, bind :: fd -> addr -> m ()
, listen :: fd -> m ()

, accept :: fd -> Accept addr fd
-- SomeException is chosen here to avoid having to include it in the Snocket
-- type, and therefore refactoring a bunch of stuff.
-- FIXME probably a good idea to abstract it.
, accept :: fd -> Accept m SomeException addr fd

, close :: fd -> m ()

Expand Down Expand Up @@ -333,7 +360,7 @@ namedPipeSnocket ioManager path = Snocket {

, accept = \hpipe -> Accept $ do
Win32.Async.connectNamedPipe hpipe
return (hpipe, localAddress, acceptNext)
return (Accepted hpipe localAddress, acceptNext)

, close = Win32.closeHandle

Expand All @@ -343,23 +370,41 @@ namedPipeSnocket ioManager path = Snocket {
localAddress :: LocalAddress
localAddress = LocalAddress path

acceptNext :: Accept LocalAddress Win32.HANDLE
acceptNext = Accept $ do
hpipe <- Win32.createNamedPipe
acceptNext :: Accept IO SomeException LocalAddress Win32.HANDLE
acceptNext = go
where
go = Accept (acceptOne `catch` handleIOException)

handleIOException
:: IOException
-> IO ( Accepted SomeException LocalAddress Win32.HANDLE
, Accept IO SomeException LocalAddress Win32.HANDLE
)
handleIOException err =
pure ( AcceptException (toException err)
, go
)

acceptOne
:: IO ( Accepted SomeException LocalAddress Win32.HANDLE
, Accept IO SomeException LocalAddress Win32.HANDLE
)
acceptOne =
bracketOnError
(Win32.createNamedPipe
path
(Win32.pIPE_ACCESS_DUPLEX .|. Win32.fILE_FLAG_OVERLAPPED)
(Win32.pIPE_TYPE_BYTE .|. Win32.pIPE_READMODE_BYTE)
Win32.pIPE_UNLIMITED_INSTANCES
65536 -- outbound pipe size
16384 -- inbound pipe size
0 -- default timeout
Nothing -- default security
`catch` \(e :: IOException) -> do
putStrLn $ "accept: " ++ show e
throwIO e
associateWithIOManager ioManager (Left hpipe)
Win32.Async.connectNamedPipe hpipe
return (hpipe, localAddress, acceptNext)
65536 -- outbound pipe size
16384 -- inbound pipe size
0 -- default timeout
Nothing) -- default security
Win32.closeHandle
$ \hpipe -> do
associateWithIOManager ioManager (Left hpipe)
Win32.Async.connectNamedPipe hpipe
return (Accepted hpipe localAddress, go)
#endif


Expand Down
17 changes: 11 additions & 6 deletions ouroboros-network-framework/src/Ouroboros/Network/Socket.hs
Expand Up @@ -415,13 +415,18 @@ fromSnocket
-> Server.Socket addr fd
fromSnocket tblVar sn sd = go (Snocket.accept sn sd)
where
go :: Snocket.Accept addr fd -> Server.Socket addr fd
go :: Snocket.Accept IO SomeException addr fd -> Server.Socket addr fd
go (Snocket.Accept accept) = Server.Socket $ do
(sd', remoteAddr, next) <- accept
-- TOOD: we don't need to that on each accept
localAddr <- Snocket.getLocalAddr sn sd'
atomically $ addConnection tblVar remoteAddr localAddr Nothing
pure (remoteAddr, sd', close remoteAddr localAddr sd', go next)
(result, next) <- accept
case result of
Snocket.Accepted sd' remoteAddr -> do
-- TOOD: we don't need to that on each accept
localAddr <- Snocket.getLocalAddr sn sd'
atomically $ addConnection tblVar remoteAddr localAddr Nothing
pure (remoteAddr, sd', close remoteAddr localAddr sd', go next)
Snocket.AcceptException err ->
-- the is no way to construct 'Server.Socket'; This will be removed in a later commit!
throwIO err

close remoteAddr localAddr sd' = do
removeConnection tblVar remoteAddr localAddr
Expand Down
50 changes: 26 additions & 24 deletions ouroboros-network-framework/test/Test/Ouroboros/Network/Socket.hs
Expand Up @@ -35,6 +35,7 @@ import Control.Monad.Class.MonadThrow
import Control.Concurrent (ThreadId)
import Control.Exception (IOException)
import Control.Tracer
import Data.Foldable (traverse_)

import Network.TypedProtocol.Core
import qualified Network.TypedProtocol.ReqResp.Type as ReqResp
Expand Down Expand Up @@ -330,17 +331,19 @@ prop_socket_recv_error f rerr =
-- accept a connection and start mux on it
bracket
(runAccept $ accept snocket sd)
(\(sd', _, _) -> Socket.close sd')
$ \(sd', _, _) -> do
remoteAddress <- Socket.getPeerName sd'
let timeout = if rerr == RecvSDUTimeout then 0.10
else (-1) -- No timeout
bearer = Mx.socketAsMuxBearer timeout nullTracer sd'
connectionId = ConnectionId {
localAddress = Socket.addrAddress muxAddress,
remoteAddress
}
Mx.muxStart nullTracer (toApplication connectionId (continueForever (Proxy :: Proxy IO)) app) bearer
(traverse_ Socket.close . fst)
$ \(accepted, _acceptNext) -> case accepted of
AcceptException err -> throwIO err
Accepted sd' _ -> do
remoteAddress <- Socket.getPeerName sd'
let timeout = if rerr == RecvSDUTimeout then 0.10
else (-1) -- No timeout
bearer = Mx.socketAsMuxBearer timeout nullTracer sd'
connectionId = ConnectionId {
localAddress = Socket.addrAddress muxAddress,
remoteAddress
}
Mx.muxStart nullTracer (toApplication connectionId (continueForever (Proxy :: Proxy IO)) app) bearer
)
$ \muxAsync -> do

Expand Down Expand Up @@ -404,22 +407,21 @@ prop_socket_send_error rerr =
-- accept a connection and start mux on it
bracket
(runAccept $ accept snocket sd)
(\(sd', _, _) -> Socket.close sd')
(\(sd', _, _) ->
let sduTimeout = if rerr == SendSDUTimeout then 0.10
else (-1) -- No timeout
bearer = Mx.socketAsMuxBearer sduTimeout nullTracer sd'
blob = BL.pack $ replicate 0xffff 0xa5 in
withTimeoutSerial $ \timeout ->
-- send maximum mux sdus until we've filled the window.
replicateM 100 $ do
((), Nothing) <$ write bearer timeout (wrap blob ResponderDir (MiniProtocolNum 0))
)

(traverse_ Socket.close . fst)
$ \(accepted, _acceptNext) -> case accepted of
AcceptException err -> throwIO err
Accepted sd' _ -> do
let sduTimeout = if rerr == SendSDUTimeout then 0.10
else (-1) -- No timeout
bearer = Mx.socketAsMuxBearer sduTimeout nullTracer sd'
blob = BL.pack $ replicate 0xffff 0xa5
withTimeoutSerial $ \timeout ->
-- send maximum mux sdus until we've filled the window.
replicateM 100 $ do
((), Nothing) <$ write bearer timeout (wrap blob ResponderDir (MiniProtocolNum 0))
)
$ \muxAsync -> do


sd' <- openToConnect snocket addr
-- connect to muxAddress
_ <- connect snocket sd' addr
Expand Down

0 comments on commit 8d4859c

Please sign in to comment.