Skip to content

Commit

Permalink
Performance improvements re: logging and data output
Browse files Browse the repository at this point in the history
The log messages buffer was causing a lot of thread contention on a single
IORef -- switched it to another striped logging scheme, seems to be quite a bit
better in preliminary testing.

Also switched Snap.Internal.Http.Server to use binary's Put monad for building
up responses rather than B.concat -- profiling results suggest we alloc less
when doing this.
  • Loading branch information
gregorycollins committed Oct 27, 2010
1 parent 2ac275d commit 57308b6
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 83 deletions.
4 changes: 3 additions & 1 deletion snap-server.cabal
Expand Up @@ -109,6 +109,7 @@ Library
attoparsec >= 0.8.1 && < 0.9,
attoparsec-iteratee >= 0.1.1 && <0.2,
base >= 4 && < 5,
binary >=0.5 && <0.6,
bytestring,
bytestring-nums,
bytestring-show >= 0.3.2 && < 0.4,
Expand All @@ -127,7 +128,8 @@ Library
time,
transformers,
unix-compat,
vector >= 0.7 && <0.8
vector >= 0.7 && <0.8,
vector-algorithms >= 0.3.4 && <0.4

if flag(portable) || os(windows)
cpp-options: -DPORTABLE
Expand Down
69 changes: 33 additions & 36 deletions src/Snap/Internal/Http/Server.hs
Expand Up @@ -13,6 +13,7 @@ import Control.Concurrent.MVar
import Control.Exception
import Data.Char
import Data.CIByteString
import Data.Binary.Put
import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import qualified Data.ByteString.Char8 as SC
Expand Down Expand Up @@ -41,7 +42,7 @@ import Snap.Internal.Http.Types hiding (Enumerator)
import Snap.Internal.Http.Parser
import Snap.Internal.Http.Server.Date
import Snap.Internal.Iteratee.Debug
import Snap.Iteratee hiding (foldl', head, take, FileOffset)
import Snap.Iteratee hiding (foldl', head, take, mapM_, FileOffset)
import qualified Snap.Iteratee as I

#ifdef LIBEV
Expand Down Expand Up @@ -420,12 +421,13 @@ checkExpect100Continue req writeEnd = do
where
go = do
let (major,minor) = rqVersion req
let hl = [ "HTTP/"
, bsshow major
, "."
, bsshow minor
, " 100 Continue\r\n\r\n" ]
iter <- liftIO $ enumBS (S.concat hl) writeEnd
let hl = runPut $ do
putByteString "HTTP/"
showp major
putAscii '.'
showp minor
putByteString " 100 Continue\r\n\r\n"
iter <- liftIO $ enumLBS hl writeEnd
liftIO $ run iter


Expand Down Expand Up @@ -660,15 +662,17 @@ sendResponse req rsp' writeEnd onSendFile = do


--------------------------------------------------------------------------
fmtHdrs hdrs =
{-# SCC "fmtHdrs" #-}
concat xs
putHdrs hdrs =
{-# SCC "putHdrs" #-}
mapM_ putHeader $ Map.toList hdrs
where
xs = map f $ Map.toList hdrs
putHeader (k, ys) = mapM_ (putOne k) ys

f (k, ys) = map (g k) ys

g k y = S.concat [ unCI k, ": ", y, "\r\n" ]
putOne k y = do
putByteString $ unCI k
putByteString ": "
putByteString y
putByteString "\r\n"


--------------------------------------------------------------------------
Expand Down Expand Up @@ -759,24 +763,22 @@ sendResponse req rsp' writeEnd onSendFile = do

--------------------------------------------------------------------------
mkHeaderString :: Response -> ByteString
mkHeaderString r =
{-# SCC "mkHeaderString" #-}
S.concat $ concat [hl, hdr, eol]
mkHeaderString r = out
where
hl = [ "HTTP/"
, bsshow major
, "."
, bsshow minor
, " "
, bsshow $ rspStatus r
, " "
, rspStatusReason r
, "\r\n" ]

hdr = fmtHdrs $ headers r

eol = ["\r\n"]

!out = {-# SCC "mkHeaderString" #-}
S.concat $ L.toChunks $ runPut $ do
putByteString "HTTP/"
showp major
putAscii '.'
showp minor
putAscii ' '
showp $ rspStatus r
putAscii ' '
putByteString $ rspStatusReason r
putByteString "\r\n"
putHdrs $ headers r
putByteString "\r\n"

------------------------------------------------------------------------------
checkConnectionClose :: (Int, Int) -> Headers -> ServerMonad ()
Expand Down Expand Up @@ -818,8 +820,3 @@ toBS :: String -> ByteString
toBS = S.pack . map c2w


--------------------------------------------------------------------------
bsshow :: (Show a) => a -> ByteString
bsshow = l2s . show


145 changes: 103 additions & 42 deletions src/System/FastLogger.hs
Expand Up @@ -14,27 +14,87 @@ module System.FastLogger
import Control.Concurrent
import Control.Exception
import Control.Monad
import Data.Binary.Put
import Data.Bits
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as S
import qualified Data.ByteString.Lazy.Char8 as L
import Data.ByteString.Internal (c2w)
import Data.DList (DList)
import qualified Data.DList as D
import Data.Function
import Data.Int
import Data.IORef
import Data.Maybe
import Data.Serialize.Put
import Data.Ord
import qualified Data.Vector as V
import Data.Vector (Vector)
import qualified Data.Vector.Algorithms.Merge as VA
import Data.Word
import Foreign.C.Types (CTime)
import GHC.Conc (numCapabilities)
import Prelude hiding (catch, show)
import qualified Prelude
import System.IO
import Text.Show.ByteString hiding (runPut)
import Text.Show.ByteString

------------------------------------------------------------------------------
import Data.Concurrent.HashMap (hashString, nextHighestPowerOf2)
import Snap.Internal.Http.Server.Date


------------------------------------------------------------------------------
defaultNumberOfLocks :: Word
defaultNumberOfLocks = nextHighestPowerOf2 $ toEnum $ 4 * numCapabilities

------------------------------------------------------------------------------
hashToBucket :: Word -> Word
hashToBucket x = x .&. (defaultNumberOfLocks-1)


------------------------------------------------------------------------------
type Queue = DList (CTime, ByteString)

newtype MessageBuffer = MessageBuffer {
_queues :: Vector (MVar Queue)
}

------------------------------------------------------------------------------
newMessageBuffer :: IO MessageBuffer
newMessageBuffer = liftM MessageBuffer $
V.replicateM (fromEnum defaultNumberOfLocks) (newMVar D.empty)


getAllMessages :: MessageBuffer -> IO (Vector ByteString)
getAllMessages (MessageBuffer queues) = do
vec <- liftM (V.concat . V.toList) $ V.mapM grabQ queues
mvec <- V.unsafeThaw vec

-- sort the list so the messages are emitted in time order
VA.sortBy cmp mvec
dvec <- V.unsafeFreeze mvec
return $ V.map snd dvec

where
grabQ mv = modifyMVar mv $ \q -> return (D.empty, V.fromList $ D.toList q)
cmp = compare `on` fst


addMessageToQueue :: MVar Queue -> CTime -> ByteString -> IO ()
addMessageToQueue mv tm s = modifyMVar_ mv $ \q -> return $ D.snoc q (tm,s)


addMessage :: MessageBuffer -> CTime -> ByteString -> IO ()
addMessage (MessageBuffer queues) tm !s = do
tid <- myThreadId
let hash = hashString $ Prelude.show tid
let bucket = hashToBucket hash
let mv = V.unsafeIndex queues $ fromEnum bucket
addMessageToQueue mv tm s


-- | Holds the state for a logger.
data Logger = Logger
{ _queuedMessages :: !(IORef (DList ByteString))
{ _queuedMessages :: !MessageBuffer
, _dataWaiting :: !(MVar ())
, _loggerPath :: !(FilePath)
, _loggingThread :: !(MVar ThreadId) }
Expand All @@ -46,24 +106,25 @@ data Logger = Logger
-- re-opened every 15 minutes to facilitate external log rotation.
newLogger :: FilePath -> IO Logger
newLogger fp = do
q <- newIORef D.empty
mb <- newMessageBuffer
dw <- newEmptyMVar
th <- newEmptyMVar

let lg = Logger q dw fp th
let lg = Logger mb dw fp th

tid <- forkIO $ loggingThread lg
putMVar th tid

return lg


-- | Prepares a log message with the time prepended.
timestampedLogEntry :: ByteString -> IO ByteString
timestampedLogEntry msg = do
timeStr <- getLogDateString

return $! runPut $! do
putWord8 $ c2w '['
return $! S.concat $! L.toChunks $! runPut $! do
putAscii '['
putByteString timeStr
putByteString "] "
putByteString msg
Expand All @@ -83,35 +144,29 @@ combinedLogEntry :: ByteString -- ^ remote host
-> IO ByteString
combinedLogEntry !host !mbUser !req !status !mbNumBytes !mbReferer !userAgent = do
let user = fromMaybe "-" mbUser
let numBytes = maybe "-" (\s -> strict $ show s) mbNumBytes
let referer = maybe "-" (\s -> S.concat ["\"", s, "\""]) mbReferer

timeStr <- getLogDateString

let !p = [ host
, " - "
, user
, " ["
, timeStr
, "] \""
, req
, "\" "
, strict $ show status
, " "
, numBytes
, " "
, referer
, " \""
, userAgent
, "\"" ]

let !output = S.concat p

return $! output


where
strict = S.concat . L.toChunks
return $ S.concat $ L.toChunks $ runPut $ do
putByteString host
putByteString " - "
putByteString user
putByteString " ["
putByteString timeStr
putByteString "] \""
putByteString req
putByteString "\" "
showp status
putAscii ' '
maybe (putAscii '-')
(showp)
mbNumBytes
putAscii ' '
putByteString referer
putByteString " \""
putByteString userAgent
putAscii '\"'


-- | Sends out a log message verbatim with a newline appended. Note:
Expand All @@ -120,7 +175,8 @@ combinedLogEntry !host !mbUser !req !status !mbNumBytes !mbReferer !userAgent =
logMsg :: Logger -> ByteString -> IO ()
logMsg !lg !s = do
let !s' = S.snoc s '\n'
atomicModifyIORef (_queuedMessages lg) $ \d -> (D.snoc d s',())
tm <- getCurrentDateTime
addMessage (_queuedMessages lg) tm s'
tryPutMVar (_dataWaiting lg) () >> return ()


Expand All @@ -129,16 +185,22 @@ loggingThread (Logger queue notifier filePath _) = do
initialize >>= go

where
--------------------------------------------------------------------------
openIt = if filePath == "-"
then return stdout
else if filePath == "stderr"
then return stderr
else openFile filePath AppendMode
else do
h <- openFile filePath AppendMode
hSetBuffering h $ BlockBuffering $ Just 32768
return h

--------------------------------------------------------------------------
closeIt h = if filePath == "-" || filePath == "stderr"
then return ()
else hClose h

--------------------------------------------------------------------------
go (href, lastOpened) =
(loop (href, lastOpened))
`catches`
Expand All @@ -148,28 +210,27 @@ loggingThread (Logger queue notifier filePath _) = do
threadDelay 20000000
go (href, lastOpened) ]


--------------------------------------------------------------------------
initialize = do
lh <- openIt
href <- newIORef lh
t <- getCurrentDateTime
tref <- newIORef t
return (href, tref)


--------------------------------------------------------------------------
killit (href, lastOpened) = do
flushIt (href, lastOpened)
h <- readIORef href
closeIt h


--------------------------------------------------------------------------
flushIt (!href, !lastOpened) = do
dl <- atomicModifyIORef queue $ \x -> (D.empty,x)
msgs <- getAllMessages queue

let !msgs = D.toList dl
let !s = L.fromChunks msgs
-- flush all messages out to buffer
h <- readIORef href
L.hPut h s
V.mapM_ (S.hPut h) msgs
hFlush h

-- close the file every 15 minutes (for log rotation)
Expand Down
3 changes: 2 additions & 1 deletion test/pongserver/Main.hs
Expand Up @@ -26,5 +26,6 @@ main = do

where
go m = do
httpServe "*" 3000 "localhost" Nothing Nothing pongServer
httpServe "*" 3000 "localhost" (Just "foo.log") Nothing pongServer
--httpServe "*" 3000 "localhost" Nothing Nothing pongServer
putMVar m ()

0 comments on commit 57308b6

Please sign in to comment.