Permalink
Browse files

Merge

  • Loading branch information...
2 parents 46571bc + 5676737 commit 2845ae7171b8bc717a72c82c71e522585ad9e68f Bryan O'Sullivan committed May 9, 2010
Showing with 292 additions and 52 deletions.
  1. +1 −0 .hgignore
  2. +43 −11 benchmarks/EventHttp.hs
  3. +12 −4 benchmarks/EventSocket.hs
  4. +148 −0 benchmarks/HttpClient.hs
  5. +9 −2 benchmarks/Makefile
  6. +1 −1 benchmarks/PongServer.hs
  7. +56 −24 benchmarks/RFC2616.hs
  8. +22 −10 benchmarks/StaticHttp.hs
View
@@ -24,6 +24,7 @@ benchmarks/Simple
benchmarks/dead-conn
benchmarks/deadconn
benchmarks/event-http
+benchmarks/http-client
benchmarks/pong-server
benchmarks/simple
benchmarks/static-http
View
@@ -11,13 +11,15 @@ import Foreign.C.Error
import Foreign.C.Types
import Foreign.Marshal.Alloc
import Foreign.Marshal.Utils
+import Foreign.ForeignPtr
import Foreign.Ptr
import System.Posix.Types
import Network.Socket hiding (accept, recv)
import Network.Socket.Internal
-import EventSocket (recv, sendAll)
+import EventSocket (recv, sendAll, c_recv, c_send)
import EventUtil (setNonBlocking)
-import Data.ByteString.Char8 as B
+import Data.ByteString.Char8 as B hiding (zip)
+import Data.ByteString.Internal as B
main = do
ensureIOManagerIsRunning
@@ -28,12 +30,14 @@ main = do
sock <- socket (addrFamily ai) (addrSocketType ai) (addrProtocol ai)
setSocketOption sock ReuseAddr 1
bindSocket sock (addrAddress ai)
- listen sock maxListenQueue
+ listen sock 1024
mgrs <- replicateM numCapabilities E.new
done <- newEmptyMVar
- forM_ mgrs $ \mgr -> do
- forkIO $ E.loop mgr >> putMVar done ()
- accept mgr sock client
+ forM_ (zip [0..] mgrs) $ \(cpu,mgr) -> do
+ forkOnIO cpu $ do
+ accept mgr sock clinet
+ loop mgr
+ putMVar done ()
takeMVar done
repeatOnIntr :: IO (Either Errno a) -> IO (Either Errno a)
@@ -58,7 +62,7 @@ blocking mgr efdk act on_success = do
ioError (errnoToIOError "accept" err Nothing Nothing)
| otherwise ->
case efdk of
- Left (fd,evts) -> registerFd mgr retry fd evts >> return ()
+ Left (fd,evts) -> registerFd_ mgr retry fd evts >> return ()
Right _ -> return ()
Right a -> case efdk of
Left (fd,_evts) -> on_success (Left fd) a
@@ -84,17 +88,45 @@ accept mgr sock@(MkSocket fd family stype proto _status) serve = do
nsock <- MkSocket nfd family stype proto `fmap` newMVar Connected
serve mgr nsock addr
+clinet :: EventManager -> Socket -> SockAddr -> IO ()
+clinet mgr sock addr = do
+ let MkSocket fd _ _ _ _ = sock
+ act = do
+ let bufSize = 4096
+ fp <- B.mallocByteString bufSize
+ withForeignPtr fp $ \ptr -> do
+ ret <- c_recv fd ptr (fromIntegral bufSize) 0
+ if ret == -1
+ then Left `fmap` getErrno
+ else if ret == 0
+ then return $! Right empty
+ else do
+ let !bs = PS (castForeignPtr fp) 0 (fromIntegral ret)
+ return $! Right bs
+ blocking mgr (Left (fromIntegral fd,evtRead)) act $ \efdk bs -> do
+ fd <- case efdk of
+ Left fd -> return fd
+ Right fdk -> unregisterFd_ mgr fdk >> return (keyFd fdk)
+ let (PS fp off len) = "HTTP/1.0 200 OK\r\nConnection: Close\r\nContent-Length: 5\r\n\r\nPong!"
+ withForeignPtr fp $ \s ->
+ c_send (fromIntegral fd) (s `plusPtr` off) (fromIntegral len) 0
+ sClose sock
+
client :: EventManager -> Socket -> SockAddr -> IO ()
-client _mgr sock _addr = (`finally` sClose sock) $ do
- recvRequest ""
- sendAll sock msg
+client _mgr sock _addr = loop `finally` sClose sock
where
+ loop = do
+ req <- recvRequest ""
+ sendAll sock msg
+ if "Connection: Keep-Alive" `isInfixOf` req
+ then loop
+ else return ()
msg = "HTTP/1.0 200 OK\r\nConnection: Close\r\nContent-Length: 5\r\n\r\nPong!"
recvRequest r = do
s <- recv sock 4096
let t = B.append r s
if B.null s || "\r\n\r\n" `B.isInfixOf` t
- then return ()
+ then return t
else recvRequest t
foreign import ccall unsafe "sys/socket.h accept"
View
@@ -8,18 +8,21 @@ module EventSocket
, recv
, send
, sendAll
+ , c_recv
+ , c_send
) where
import Control.Concurrent (modifyMVar_, newMVar)
import Control.Monad (liftM, when)
import Data.ByteString (ByteString)
-import Data.ByteString.Internal (createAndTrim)
import Data.Word (Word8)
import qualified Data.ByteString as B
+import qualified Data.ByteString.Internal as B
import Data.ByteString.Unsafe (unsafeUseAsCStringLen)
import Foreign.C.Types (CChar, CInt, CSize)
import Foreign.Marshal.Alloc (allocaBytes)
import Foreign.Marshal.Utils (with)
+import Foreign.ForeignPtr (withForeignPtr)
import Foreign.Ptr (Ptr, castPtr)
import Foreign.C.Error (Errno(..), eINPROGRESS, eINTR,
errnoToIOError, getErrno, throwErrno)
@@ -72,15 +75,20 @@ connect sock@(MkSocket s _family _stype _protocol socketStatus) addr = do
Nothing Nothing)
foreign import ccall unsafe "connect"
- c_connect :: CInt -> Ptr SockAddr -> CInt{-CSockLen???-} -> IO CInt
+ c_connect :: CInt -> Ptr SockAddr -> CInt{-CSockLen?? -} -> IO CInt
------------------------------------------------------------------------
-- Receiving
recv :: Socket -> Int -> IO ByteString
recv (MkSocket s _ _ _ _) nbytes
| nbytes <= 0 = ioError (mkInvalidRecvArgError "Network.Socket.ByteString.recv")
- | otherwise = createAndTrim nbytes $ recvInner s nbytes
+ | otherwise = do
+ fp <- B.mallocByteString nbytes
+ n <- withForeignPtr fp $ recvInner s nbytes
+ if n <= 0
+ then return B.empty
+ else return $! B.PS fp 0 n
recvInner :: CInt -> Int -> Ptr Word8 -> IO Int
recvInner s nbytes ptr = do
@@ -145,7 +153,7 @@ mkInvalidRecvArgError loc = ioeSetErrorString (mkIOError
"non-positive length"
foreign import ccall unsafe "sys/socket.h accept"
- c_accept :: CInt -> Ptr SockAddr -> Ptr CInt{-CSockLen???-} -> IO CInt
+ c_accept :: CInt -> Ptr SockAddr -> Ptr CInt{-CSockLen?? -} -> IO CInt
foreign import ccall unsafe "sys/socket.h send"
c_send :: CInt -> Ptr a -> CSize -> CInt -> IO CInt
View
@@ -0,0 +1,148 @@
+{-# LANGUAGE BangPatterns, FlexibleContexts, OverloadedStrings #-}
+
+import Text.Printf
+import System.Event.Clock
+import qualified Data.Attoparsec as A (parseWith)
+import qualified Data.Attoparsec.Char8 as A
+import RFC2616
+import Control.Exception
+import Control.Concurrent.QSemN
+import Control.Monad
+import Network.Socket hiding (connect, recv)
+import System.Console.GetOpt
+import Data.Function
+import Data.Monoid
+import GHC.Conc (numCapabilities)
+import Args (ljust, parseArgs, positive, theLast)
+import Control.Concurrent (forkIO)
+import System.Environment (getArgs)
+import qualified Data.ByteString.Char8 as B
+import Text.Parsec
+import Text.Parsec.String
+import Control.Applicative hiding (many, (<|>))
+import Data.Char (isSpace)
+import System.Event.Thread
+import EventSocket
+
+type URL = (String, String, String)
+
+url :: Parser URL
+url =
+ (,,) <$> (string "http://" *> (many . satisfy $ \c -> c /= ':' && c /= '/'))
+ <*> ((char ':' *> many digit) <|> pure "80")
+ <*> ((many1 . satisfy $ not . isSpace) <|> pure "/")
+
+urlConnector :: String -> IO (IO (Socket, B.ByteString))
+urlConnector urlStr = do
+ let (host, port, uri) = case parse url "<cmdline>" urlStr of
+ Left err -> error (show err)
+ Right req -> req
+ myHints = defaultHints { addrSocketType = Stream }
+ (ai:_) <- getAddrInfo (Just myHints) (Just host) (Just port)
+ return $ do
+ sock <- socket (addrFamily ai) (addrSocketType ai) (addrProtocol ai)
+ let req = B.concat ["GET ", B.pack uri, " HTTP/1.1\r\n"
+ ,"Host: ", B.pack host, ":", B.pack port, "\r\n"]
+ connect sock (addrAddress ai)
+ return (sock, req)
+
+client ctors reqs = do
+ forM_ ctors $ \connector -> do
+ let loop slop !reqno sock reqStart = do
+ let refill = recv sock 65536
+ req = B.concat [reqStart, "\r\n"]
+ sendAll sock req
+ resp <- (if B.null slop then refill else return slop) >>=
+ A.parseWith refill RFC2616.response
+ case resp of
+ err@(A.Partial _) -> print err
+ err@(A.Fail bs _ msg) -> print (msg, B.take 10 bs)
+ A.Done bs (st, chdrs) -> do
+ let hdrs = map lowerHeader chdrs
+ close = Header "connection" ["close"]
+ contentLength = case A.parse A.decimal (B.concat (lookupHeader "content-length" hdrs)) `A.feed` "" of
+ A.Done _ n -> n
+ err -> error (show chdrs)
+ let slurp !n s = do
+ let len = B.length s
+ if len == 0 || len >= n
+ then return $! B.drop n s
+ else slurp (n-len) =<< recv sock 65536
+ if B.length bs >= contentLength
+ then if reqno >= reqs || close `elem` hdrs
+ then return ()
+ else loop (B.drop contentLength bs) (reqno+1) sock reqStart
+ else slurp contentLength bs >>= \s ->
+ if reqno >= reqs || close `elem` hdrs
+ then return ()
+ else loop s (reqno+1) sock reqStart
+ bracket connector (sClose . fst) . uncurry $ loop "" 1
+
+
+main = do
+ (cfg, urls) <- parseArgs defaultConfig defaultOptions =<< getArgs
+ when (null urls) $ error "no URLs"
+ ensureIOManagerIsRunning
+ ctors <- mapM urlConnector urls
+ let clients = theLast cfgClients cfg
+ conns = theLast cfgConnections cfg
+ requests = theLast cfgRequests cfg
+ total = clients * conns * requests
+ putStrLn $ "issuing " ++ show total ++ " requests"
+ sem <- newQSemN 0
+ start <- getCurrentTime
+ replicateM_ clients $ do
+ forkIO $ (client (take conns (cycle ctors)) requests `finally` signalQSemN sem 1)
+ return ()
+ waitQSemN sem clients
+ end <- getCurrentTime
+ let elapsed = end - start
+ rate = fromIntegral total / elapsed
+ printf "%.6g reqs/sec in %.6g secs\n" rate elapsed
+
+------------------------------------------------------------------------
+-- Configuration
+
+data Config = Config {
+ cfgClients :: Last Int
+ , cfgConnections :: Last Int
+ , cfgRequests :: Last Int
+ }
+
+defaultConfig :: Config
+defaultConfig = Config {
+ cfgClients = ljust numCapabilities
+ , cfgConnections = ljust numCapabilities
+ , cfgRequests = ljust 1
+ }
+
+instance Monoid Config where
+ mempty = Config {
+ cfgClients = mempty
+ , cfgConnections = mempty
+ , cfgRequests = mempty
+ }
+
+ mappend a b = Config {
+ cfgClients = app cfgClients a b
+ , cfgConnections = app cfgConnections a b
+ , cfgRequests = app cfgRequests a b
+ }
+ where app :: (Monoid b) => (a -> b) -> a -> a -> b
+ app = on mappend
+
+defaultOptions :: [OptDescr (IO Config)]
+defaultOptions = [
+ Option ['c'] ["clients"]
+ (ReqArg (positive "number of concurrent clients" $ \n ->
+ mempty { cfgClients = n }) "N")
+ "number of concurrent clients"
+ , Option ['n'] ["connections"]
+ (ReqArg (positive "number of connections" $ \n ->
+ mempty { cfgConnections = n }) "N")
+ "number of connections"
+ , Option ['r'] ["requests"]
+ (ReqArg (positive "number of requests per connection" $ \n ->
+ mempty { cfgRequests = n }) "N")
+ "number of requests"
+ ]
View
@@ -1,7 +1,8 @@
include ../tests/common.mk
ghc-bench-flags := -package network -package network-bytestring \
- -package attoparsec -package bytestring-show -package mtl
+ -package parsec-3.1.0 -package attoparsec -package bytestring-show \
+ -package mtl
ifdef USE_GHC_IO_MANAGER
ghc-bench-flags += -DUSE_GHC_IO_MANAGER
@@ -10,7 +11,8 @@ endif
ghc-opt-flags = -O2
cc = gcc
cc-opt-flags = -O2
-programs := dead-conn deadconn pong-server signal simple static-http thread-delay timers
+programs := dead-conn deadconn pong-server signal simple static-http \
+ thread-delay timers http-client
.PHONY: all
all: $(programs)
@@ -39,6 +41,11 @@ static-http: $(lib) EventUtil.o EventFile.o EventSocket.o NoPush.o RFC2616.o Sta
ranlib $(lib)
$(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) $(lib)
+http-client: ghc-flags += $(ghc-bench-flags)
+http-client: $(lib) Args.o EventUtil.o EventSocket.o RFC2616.o HttpClient.o
+ ranlib $(lib)
+ $(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) $(lib)
+
signal: $(lib) Signal.o
ranlib $(lib)
$(ghc) $(ghc-flags) -threaded -o $@ $(filter %.o,$^) $(lib)
View
@@ -79,7 +79,7 @@ data Config = Config {
defaultConfig :: Config
defaultConfig = Config {
- cfgListenBacklog = ljust maxListenQueue
+ cfgListenBacklog = ljust 1024
, cfgMaxFds = ljust 256
, cfgPort = ljust "5002"
}
Oops, something went wrong.

0 comments on commit 2845ae7

Please sign in to comment.