Permalink
Browse files

Basic CSV logging works, still need to deal with headers / server con…

…fig changing
  • Loading branch information...
1 parent 309c77e commit d322f4d931477e346ca42ddae763048cd6b16a3b Jacob Stanley committed with Jun 12, 2012
Showing with 146 additions and 14 deletions.
  1. +42 −8 app/Main.hs
  2. +5 −0 ntpmon.cabal
  3. +16 −6 src/Network/NTP.hs
  4. +83 −0 src/System/Win32File.hs
View
@@ -23,6 +23,7 @@ import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy as LB
import Data.Conduit (ResourceT, ($$))
import Data.Conduit.Attoparsec (sinkParser)
+import Data.Double.Conversion.Text (toFixed, toShortest)
import Data.Function (on)
import Data.List (find)
import Data.Maybe (catMaybes)
@@ -33,19 +34,22 @@ import qualified Data.Text.Lazy.Encoding as LT
import qualified Data.Text.Read as T
import Data.Time.Clock (UTCTime, NominalDiffTime, addUTCTime)
import qualified Data.Time.Clock as UTC
+import Data.Time.Format (formatTime)
import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as U
import Network (withSocketsDo)
import Network.HTTP.Types
import Network.Wai
import Network.Wai.Application.Static hiding (FilePath)
import Network.Wai.Handler.Warp
+import System.Locale (defaultTimeLocale)
import System.IO
import Network.NTP
import Network.NTP.Config
import Network.NTP.ConfigFinder (findConfig)
import Network.NTP.Types (Time(..), toUTCTime, toSeconds)
+import System.Win32File
------------------------------------------------------------------------
@@ -75,16 +79,18 @@ updateConfig state cfg = do
servers <- catMaybes <$> mapM (resolveServer clock . T.unpack) hosts
atomically $ do
writeTVar (svcConfig state) cfg
- modifyTVar (svcServers state) (mergeServers servers)
+ modifyTVar' (svcServers state) (mergeServers servers)
updateData :: ServiceState -> [Server] -> STM ()
updateData state servers =
- modifyTVar (svcServers state) (flip mergeServers servers)
+ modifyTVar' (svcServers state) (flip mergeServers servers)
-- | Returns the server addresses from the first list combined with
-- the server data from the second list.
mergeServers :: [Server] -> [Server] -> [Server]
-mergeServers xs ys = map (findByName ys) xs
+mergeServers xs ys = length zs `seq` zs
+ where
+ zs = map (findByName ys) xs
-- Finds the server which has a matching address, or if not, simply
-- returns the server used as the search candidate.
@@ -229,10 +235,11 @@ responseJSON s hs x = responseLazyText s hs' (enc x)
runMonitor :: ServiceState -> IO ()
runMonitor state = do
time <- UTC.getCurrentTime
- withNTP (hPutStrLn stderr) (monitor state time)
+ file <- newDailyLogger "."
+ withNTP (hPutStrLn stderr) (monitor state file time)
-monitor :: ServiceState -> UTCTime -> NTP ()
-monitor state transmitTime = do
+monitor :: ServiceState -> FileLogger -> UTCTime -> NTP ()
+monitor state file transmitTime = do
-- give up our CPU time for the next 50ms
liftIO $ threadDelay 50000
@@ -247,7 +254,7 @@ monitor state transmitTime = do
then return transmitTime
else do
sendRequests servers
- -- TODO: Write CSV log here
+ liftIO (writeCSV file servers)
return (sampleInterval `addUTCTime` currentTime)
-- update any servers which received replies
@@ -258,7 +265,7 @@ monitor state transmitTime = do
atomically $ updateData state (map fst servers')
-- loop again
- monitor state transmitTime'
+ monitor state file transmitTime'
where
-- sendRequests inserts a 5ms delay after each transmission so that
@@ -269,6 +276,33 @@ monitor state transmitTime = do
sampleInterval :: NominalDiffTime
sampleInterval = realToFrac (1 / samplesPerSecond)
+writeCSV :: FileLogger -> [Server] -> IO ()
+writeCSV file xs | empty = return ()
+ | otherwise = write
+ where
+ write = do
+ utc <- toUTCTime <$> getCurrentTime clock
+ let time = T.pack (formatTime defaultTimeLocale "%Y-%m-%d %H:%M:%S" utc)
+ line = T.intercalate "," (time : fields) `T.append` "\r\n"
+ appendText file utc line
+
+ clock = svrClock (head xs)
+
+ fields = offsets ++ delays ++ [origin, freq]
+
+ freq = (toShortest . clockFrequency) clock
+ origin = (T.pack . show . clockIndex0) clock
+
+ empty = null xs || any (U.null . svrRawSamples) xs
+ samples = map (U.head . svrRawSamples) xs
+ offsets = map (showMilli . toSeconds . offset clock) samples
+ delays = map (showMilli . fromDiff clock . roundtrip) samples
+
+showMilli :: Seconds -> T.Text
+showMilli t = toFixed 4 ms
+ where
+ ms = (1000 :: Double) * (realToFrac t)
+
------------------------------------------------------------------------
-- Lifted versions of STM IO functions
View
@@ -22,6 +22,7 @@ library
Network.NTP.Config
Network.NTP.ConfigFinder
System.Counter
+ System.Win32File
Text.PrefixSI
build-depends:
@@ -63,6 +64,7 @@ executable ntpmon
, blaze-builder == 0.3.*
, bytestring == 0.9.*
, conduit == 0.4.*
+ , double-conversion == 0.2.*
, http-types == 0.6.*
, mtl == 2.0.*
, network == 2.3.*
@@ -77,6 +79,9 @@ executable ntpmon
, wai-extra == 1.2.*
, warp == 1.2.*
+ ghc-prof-options:
+ -fprof-auto
+
ghc-options:
-O2 -Wall -rtsopts -threaded
-funbox-strict-fields
View
@@ -20,6 +20,7 @@ import Control.Monad.Loops (unfoldM)
import Control.Monad.STM (atomically)
import Control.Monad.State
import qualified Data.ByteString as B
+import Data.IORef
import Data.Int (Int64)
import Data.Maybe (listToMaybe)
import Data.Serialize (encode, decode)
@@ -49,27 +50,36 @@ newtype NTP a = NTP { runNTP :: StateT NTPData IO a }
data NTPData = NTPData {
ntpSocket :: Socket
, ntpIncoming :: TChan AddrPacket
+ , ntpRunning :: IORef Bool
}
data AddrPacket = AddrPacket ClockIndex SockAddr Packet
type Logger = String -> IO ()
withNTP :: Logger -> NTP a -> IO a
-withNTP logger =
+withNTP writeLog =
fmap fst . withSocketsDo . bracket create destroy . runStateT . runNTP
where
create = do
ntpSocket <- initSocket
ntpIncoming <- newTChanIO
+ ntpRunning <- newIORef True
- -- receive packet loop
- forkIO $ forever $ do
- pkt <- receive ntpSocket
- either logger (atomically . writeTChan ntpIncoming) pkt
+ let enqueue = atomically . writeTChan ntpIncoming
+ receiveLoop = do
+ pkt <- receive ntpSocket
+ ok <- readIORef ntpRunning
+ if ok then either writeLog enqueue pkt >> receiveLoop
+ else return ()
+
+ -- start receive packet loop
+ forkIO receiveLoop
return NTPData{..}
- destroy NTPData{..} = sClose ntpSocket
+ destroy NTPData{..} = do
+ writeIORef ntpRunning False
+ sClose ntpSocket
initSocket = do
sock <- socket AF_INET Datagram defaultProtocol
@@ -0,0 +1,83 @@
+{-# LANGUAGE RecordWildCards #-}
+
+module System.Win32File (
+ FileLogger
+ , newDailyLogger
+ , appendText
+ ) where
+
+import Control.Monad (void)
+import Data.Bits ((.|.))
+import qualified Data.ByteString as B
+import Data.ByteString.Internal (ByteString(..))
+import Data.IORef
+import qualified Data.Text as T
+import Data.Text.Encoding (encodeUtf8)
+import Data.Time (UTCTime(..))
+import Data.Time.Calendar (Day(..), showGregorian)
+import Foreign.ForeignPtr (withForeignPtr)
+import Foreign.Ptr (plusPtr)
+import System.FilePath ((</>), (<.>))
+import System.Win32.File
+import System.Win32.Types
+
+------------------------------------------------------------------------
+
+data FileLogger = FileLogger {
+ fileDir :: FilePath
+ , fileRef :: IORef (Day, HANDLE)
+ }
+
+newDailyLogger :: FilePath -> IO FileLogger
+newDailyLogger fileDir = do
+ fileRef <- newIORef (ModifiedJulianDay 0, nullHANDLE)
+ return FileLogger{..}
+
+appendText :: FileLogger -> UTCTime -> T.Text -> IO ()
+appendText FileLogger{..} (UTCTime day' _) text = do
+ (_, h) <- modifyIORef' fileRef update
+ writeBS h (encodeUtf8 text)
+ where
+ update (day, h)
+ | day == day' = return (day, h)
+ | otherwise = do
+ tryCloseHandle h
+ h' <- openOrCreateFile (fileDir </> showGregorian day' <.> "csv")
+ return (day', h')
+
+writeBS :: HANDLE -> B.ByteString -> IO ()
+writeBS _ (PS _ _ 0) = return ()
+writeBS h (PS ps s n) = withForeignPtr ps write
+ where
+ -- TODO: This ignores the returned error code
+ write p = void $ win32_WriteFile h (p `plusPtr` s) (fromIntegral n) Nothing
+
+openOrCreateFile :: FilePath -> IO HANDLE
+openOrCreateFile path = do
+ h <- createFile path fILE_APPEND_DATA (fILE_SHARE_READ .|. fILE_SHARE_DELETE)
+ Nothing oPEN_ALWAYS fILE_ATTRIBUTE_NORMAL Nothing
+
+ err <- getLastError
+ if err /= 0 && err /= 183
+ then failWith "openOrCreateFile" err
+ else return h
+
+tryCloseHandle :: HANDLE -> IO ()
+tryCloseHandle h | h == nullHANDLE = return ()
+ | otherwise = closeHandle h
+
+------------------------------------------------------------------------
+-- Utils
+
+fILE_APPEND_DATA :: AccessMode
+fILE_APPEND_DATA = 4
+
+fILE_SHARE_DELETE :: ShareMode
+fILE_SHARE_DELETE = 4
+
+modifyIORef' :: IORef a -> (a -> IO a) -> IO a
+modifyIORef' ref io = do
+ x <- readIORef ref
+ x' <- io x
+ writeIORef ref x'
+ return x'

0 comments on commit d322f4d

Please sign in to comment.