Skip to content

Commit

Permalink
WIP - allow close to take extra argument controlling shutdown behavio…
Browse files Browse the repository at this point in the history
…ur; use it to shut down connection when Connection:close is used in request
  • Loading branch information
sof committed Nov 16, 2009
1 parent 97563bf commit 8bfade3
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Network/Browser.hs
Expand Up @@ -956,7 +956,7 @@ updateConnectionPool c = do
pool <- getBS bsConnectionPool
let len_pool = length pool
when (len_pool > maxPoolSize)
(ioAction $ close (last pool))
(ioAction $ close (last pool) Nothing)
let pool'
| len_pool > maxPoolSize = init pool
| otherwise = pool
Expand Down
31 changes: 16 additions & 15 deletions Network/HTTP/HandleStream.hs
Expand Up @@ -36,6 +36,7 @@ import Network.BufferType
import Network.Stream ( fmapE, Result )
import Network.StreamDebugger ( debugByteStream )
import Network.TCP (HStream(..), HandleStream )
import Network.Socket ( ShutdownCmd(..) )

import Network.HTTP.Base
import Network.HTTP.Headers
Expand Down Expand Up @@ -86,7 +87,7 @@ sendHTTP_notify :: HStream ty
-> IO (Result (Response ty))
sendHTTP_notify conn rq onSendComplete =
catchIO (sendMain conn rq onSendComplete providedClose)
(\e -> do { close conn; ioError e })
(\e -> do { close conn Nothing; ioError e })
where
providedClose = findConnClose (rqHeaders rq)

Expand All @@ -106,45 +107,45 @@ sendMain :: HStream ty
-> (IO ())
-> Bool
-> IO (Result (Response ty))
sendMain conn rqst onSendComplete closeOnEOF = do
sendMain conn rqst onSendComplete closeOnSend = do
--let str = if null (rqBody rqst)
-- then show rqst
-- else show (insertHeader HdrExpect "100-continue" rqst)
writeBlock conn (buf_fromStr bufferOps $ show rqst)
-- write body immediately, don't wait for 100 CONTINUE
writeBlock conn (rqBody rqst)
when closeOnSend (close conn (Just ShutdownSend))
onSendComplete
rsp <- getResponseHead conn
switchResponse conn closeOnEOF True False rsp rqst
switchResponse conn True False rsp rqst

-- Hmmm, this could go bad if we keep getting "100 Continue"
-- responses... Except this should never happen according
-- to the RFC.

switchResponse :: HStream ty
=> HandleStream ty
-> Bool {- close on EOF -}
-> Bool {- allow retry? -}
-> Bool {- is body sent? -}
-> Result ResponseData
-> Request ty
-> IO (Result (Response ty))
switchResponse _ _ _ _ (Left e) _ = return (Left e)
switchResponse _ _ _ (Left e) _ = return (Left e)
-- retry on connreset?
-- if we attempt to use the same socket then there is an excellent
-- chance that the socket is not in a completely closed state.

switchResponse conn closeOnEOF allow_retry bdy_sent (Right (cd,rn,hdrs)) rqst =
switchResponse conn allow_retry bdy_sent (Right (cd,rn,hdrs)) rqst =
case matchResponse (rqMethod rqst) cd of
Continue
| not bdy_sent -> do {- Time to send the body -}
writeBlock conn (rqBody rqst) >>= either (return . Left)
(\ _ -> do
rsp <- getResponseHead conn
switchResponse conn closeOnEOF allow_retry True rsp rqst)
switchResponse conn allow_retry True rsp rqst)
| otherwise -> do {- keep waiting -}
rsp <- getResponseHead conn
switchResponse conn closeOnEOF allow_retry bdy_sent rsp rqst
switchResponse conn allow_retry bdy_sent rsp rqst

Retry -> do {- Request with "Expect" header failed.
Trouble is the request contains Expects
Expand All @@ -153,15 +154,15 @@ switchResponse conn closeOnEOF allow_retry bdy_sent (Right (cd,rn,hdrs)) rqst =
(buf_fromStr bufferOps (show rqst))
(rqBody rqst))
rsp <- getResponseHead conn
switchResponse conn closeOnEOF False bdy_sent rsp rqst
switchResponse conn False bdy_sent rsp rqst

Done -> do
when (closeOnEOF || findConnClose hdrs)
(close conn)
when (findConnClose hdrs)
(close conn Nothing)
return (Right $ Response cd rn hdrs (buf_empty bufferOps))

DieHorribly str -> do
close conn
close conn Nothing
return (responseParseError "Invalid response:" str)
ExpectEntity -> do
r <- fmapE (\ (ftrs,bdy) -> Right (Response cd rn (hdrs++ftrs) bdy)) $
Expand All @@ -176,11 +177,11 @@ switchResponse conn closeOnEOF allow_retry bdy_sent (Right (cd,rn,hdrs)) rqst =
tc
case r of
Left{} -> do
close conn
close conn Nothing
return r
Right (Response _ _ hs _) -> do
when (closeOnEOF || findConnClose hs)
(close conn)
when (findConnClose hs)
(close conn Nothing)
return r

where
Expand Down
29 changes: 15 additions & 14 deletions Network/HTTP/Stream.hs
Expand Up @@ -41,6 +41,7 @@ module Network.HTTP.Stream
import Network.Stream
import Network.StreamDebugger (debugStream)
import Network.TCP (openTCPPort)
import Network.Socket (ShutdownCmd(..))
import Network.BufferType ( stringBufferOp )

import Network.HTTP.Base
Expand Down Expand Up @@ -88,7 +89,7 @@ sendHTTP conn rq = sendHTTP_notify conn rq (return ())
sendHTTP_notify :: Stream s => s -> Request_String -> IO () -> IO (Result Response_String)
sendHTTP_notify conn rq onSendComplete =
catchIO (sendMain conn rq onSendComplete providedClose)
(\e -> do { close conn; ioError e })
(\e -> do { close conn Nothing; ioError e })
where
providedClose = findConnClose (rqHeaders rq)

Expand All @@ -111,8 +112,9 @@ sendMain conn rqst onSendComplete closeOnEOF = do
-- write body immediately, don't wait for 100 CONTINUE
writeBlock conn (rqBody rqst)
onSendComplete
when closeOnEOF (close conn (Just ShutdownSend))
rsp <- getResponseHead conn
switchResponse conn closeOnEOF True False rsp rqst
switchResponse conn True False rsp rqst

-- reads and parses headers
getResponseHead :: Stream s => s -> IO (Result ResponseData)
Expand All @@ -125,17 +127,16 @@ getResponseHead conn = do
-- to the RFC.
switchResponse :: Stream s
=> s
-> Bool
-> Bool {- allow retry? -}
-> Bool {- is body sent? -}
-> Result ResponseData
-> Request_String
-> IO (Result Response_String)
switchResponse _ _ _ _ (Left e) _ = return (Left e)
switchResponse _ _ _ (Left e) _ = return (Left e)
-- retry on connreset?
-- if we attempt to use the same socket then there is an excellent
-- chance that the socket is not in a completely closed state.
switchResponse conn closeOnEOF allow_retry bdy_sent (Right (cd,rn,hdrs)) rqst =
switchResponse conn allow_retry bdy_sent (Right (cd,rn,hdrs)) rqst =
case matchResponse (rqMethod rqst) cd of
Continue
| not bdy_sent -> {- Time to send the body -}
Expand All @@ -144,29 +145,29 @@ switchResponse conn closeOnEOF allow_retry bdy_sent (Right (cd,rn,hdrs)) rqst =
Left e -> return (Left e)
Right _ ->
do { rsp <- getResponseHead conn
; switchResponse conn closeOnEOF allow_retry True rsp rqst
; switchResponse conn allow_retry True rsp rqst
}
}
| otherwise -> {- keep waiting -}
do { rsp <- getResponseHead conn
; switchResponse conn closeOnEOF allow_retry bdy_sent rsp rqst
; switchResponse conn allow_retry bdy_sent rsp rqst
}

Retry -> {- Request with "Expect" header failed.
Trouble is the request contains Expects
other than "100-Continue" -}
do { writeBlock conn (show rqst ++ rqBody rqst)
; rsp <- getResponseHead conn
; switchResponse conn closeOnEOF False bdy_sent rsp rqst
; switchResponse conn False bdy_sent rsp rqst
}

Done -> do
when (closeOnEOF || findConnClose hdrs)
(close conn)
when (findConnClose hdrs)
(close conn Nothing)
return (Right $ Response cd rn hdrs "")

DieHorribly str -> do
close conn
close conn Nothing
return $ responseParseError "sendHTTP" ("Invalid response: " ++ str)

ExpectEntity ->
Expand All @@ -184,10 +185,10 @@ switchResponse conn closeOnEOF allow_retry bdy_sent (Right (cd,rn,hdrs)) rqst =
(readLine conn) (readBlock conn)
_ -> uglyDeathTransfer "sendHTTP"
; case rslt of
Left e -> close conn >> return (Left e)
Left e -> close conn Nothing >> return (Left e)
Right (ftrs,bdy) -> do
when (closeOnEOF || findConnClose (hdrs++ftrs))
(close conn)
when (findConnClose (hdrs++ftrs))
(close conn Nothing)
return (Right (Response cd rn (hdrs++ftrs) bdy))
}

Expand Down
6 changes: 5 additions & 1 deletion Network/Stream.hs
Expand Up @@ -22,6 +22,7 @@
-----------------------------------------------------------------------------
module Network.Stream
( Stream(..)
, ShutdownCmd
, ConnError(..)
, Result
, bindE
Expand All @@ -32,6 +33,7 @@ module Network.Stream
) where

import Control.Monad.Error
import Network.Socket (ShutdownCmd)

data ConnError
= ErrorReset
Expand Down Expand Up @@ -78,4 +80,6 @@ class Stream x where
readLine :: x -> IO (Result String)
readBlock :: x -> Int -> IO (Result String)
writeBlock :: x -> String -> IO (Result ())
close :: x -> IO ()
close :: x -> Maybe ShutdownCmd -> IO ()
-- ^ Nothing => close down stream right away.
-- Just f => shut down the stream, for reading, writing, or both.
4 changes: 2 additions & 2 deletions Network/StreamDebugger.hs
Expand Up @@ -44,10 +44,10 @@ instance (Stream x) => Stream (StreamDebugger x) where
do val <- writeBlock x str
hPutStrLn h ("writeBlock " ++ show val ++ ' ' : show str)
return val
close (Dbg h x) =
close (Dbg h x) f =
do hPutStrLn h "closing..."
hFlush h
close x
close x f
hPutStrLn h "...closed"
hClose h

Expand Down
9 changes: 7 additions & 2 deletions Network/StreamSocket.hs
Expand Up @@ -55,8 +55,13 @@ instance Stream Socket where
readBlock sk n = readBlockSocket sk n
readLine sk = readLineSocket sk
writeBlock sk str = writeBlockSocket sk str
close sk = shutdown sk ShutdownBoth >> sClose sk
-- This slams closed the connection (which is considered rude for TCP\/IP)
close sk closer =
-- This slams closed the connection (which is considered rude for TCP\/IP)
case closer of
Nothing -> do
shutdown sk ShutdownBoth
sClose sk
Just flg -> shutdown sk flg

readBlockSocket :: Socket -> Int -> IO (Result String)
readBlockSocket sk n = (liftM Right $ fn n) `catchIO` (handleSocketError sk)
Expand Down
44 changes: 26 additions & 18 deletions Network/TCP.hs
Expand Up @@ -36,7 +36,7 @@ import Network.BSD (getHostByName, hostAddresses)
import Network.Socket
( Socket, SockAddr(SockAddrInet), SocketOption(KeepAlive)
, SocketType(Stream), inet_addr, connect
, shutdown, ShutdownCmd(ShutdownSend, ShutdownReceive)
, shutdown, ShutdownCmd(..)
, sClose, setSocketOption, getPeerName
, socket, Family(AF_INET)
)
Expand All @@ -55,7 +55,7 @@ import Network.Socket ( socketToHandle )
import Data.Char ( toLower )
import Data.Word ( Word8 )
import Control.Concurrent
import Control.Monad ( liftM )
import Control.Monad ( liftM, when )
import System.IO ( Handle, hFlush, IOMode(..), hClose )
import System.IO.Error ( isEOFError )

Expand Down Expand Up @@ -132,29 +132,29 @@ class BufferType bufType => HStream bufType where
readLine :: HandleStream bufType -> IO (Result bufType)
readBlock :: HandleStream bufType -> Int -> IO (Result bufType)
writeBlock :: HandleStream bufType -> bufType -> IO (Result ())
close :: HandleStream bufType -> IO ()
close :: HandleStream bufType -> Maybe ShutdownCmd -> IO ()

instance HStream Strict.ByteString where
openStream = openTCPConnection
openSocketStream = socketConnection
readBlock c n = readBlockBS c n
readLine c = readLineBS c
writeBlock c str = writeBlockBS c str
close c = closeIt c Strict.null
close c f = closeIt c f Strict.null

instance HStream Lazy.ByteString where
openStream = \ a b -> openTCPConnection_ a b True
openSocketStream = \ a b -> socketConnection_ a b True
readBlock c n = readBlockBS c n
readLine c = readLineBS c
writeBlock c str = writeBlockBS c str
close c = closeIt c Lazy.null
close c f = closeIt c f Lazy.null

instance Stream.Stream Connection where
readBlock (Connection c) = Network.TCP.readBlock c
readLine (Connection c) = Network.TCP.readLine c
writeBlock (Connection c) = Network.TCP.writeBlock c
close (Connection c) = Network.TCP.close c
close (Connection c) f = Network.TCP.close c f

instance HStream String where
openStream = openTCPConnection
Expand All @@ -172,7 +172,7 @@ instance HStream String where
-- allow any of the other Stream functions. Notice that a Connection may close
-- at any time before a call to this function. This function is idempotent.
-- (I think the behaviour here is TCP specific)
close c = closeIt c null
close c f = closeIt c f null

-- | @openTCPPort uri port@ establishes a connection to a remote
-- host, using 'getHostByName' which possibly queries the DNS system, hence
Expand Down Expand Up @@ -234,26 +234,34 @@ socketConnection_ hst sock stashInput = do
v <- newMVar conn
return (HandleStream v)

closeConnection :: HandleStream a -> IO Bool -> IO ()
closeConnection ref readL = do
closeConnection :: HandleStream a -> Maybe ShutdownCmd -> IO Bool -> IO ()
closeConnection ref shut readL = do
-- won't hold onto the lock for the duration
-- we are draining it...ToDo: have Connection
-- into a shutting-down state so that other
-- threads will simply back off if/when attempting
-- to also close it.
c <- readMVar (getRef ref)
closeConn c `catchIO` (\_ -> return ())
modifyMVar_ (getRef ref) (\ _ -> return ConnClosed)
when doClose $ modifyMVar_ (getRef ref) (\ _ -> return ConnClosed)
where
-- Be kind to peer & close gracefully.
closeConn ConnClosed = return ()
closeConn conn = do
let sk = connSock conn
shutdown sk ShutdownSend
suck readL
hClose (connHandle conn)
shutdown sk ShutdownReceive
sClose sk
hFlush (connHandle conn)
when shutSend $ shutdown sk ShutdownSend
when doClose $ suck readL
when doClose $ hClose (connHandle conn)
when shutRecv $ shutdown sk ShutdownReceive
when doClose $ sClose sk

(shutSend, doClose, shutRecv) =
case shut of
Nothing -> (True, True, True)
Just ShutdownBoth -> (True, False, True)
Just ShutdownSend -> (True, False, False)
Just ShutdownReceive -> (False, False, True)

suck :: IO Bool -> IO ()
suck rd = do
Expand Down Expand Up @@ -311,9 +319,9 @@ writeBlockBS ref b = onNonClosedDo ref $ \ conn -> do
(connHooks' conn)
return x

closeIt :: HandleStream ty -> (ty -> Bool) -> IO ()
closeIt c p = do
closeConnection c (readLineBS c >>= \ x -> case x of { Right xs -> return (p xs); _ -> return True})
closeIt :: HandleStream ty -> Maybe ShutdownCmd -> (ty -> Bool) -> IO ()
closeIt c f p = do
closeConnection c f (readLineBS c >>= \ x -> case x of { Right xs -> return (p xs); _ -> return True})
conn <- readMVar (getRef c)
maybe (return ())
(hook_close)
Expand Down

0 comments on commit 8bfade3

Please sign in to comment.