-
Notifications
You must be signed in to change notification settings - Fork 0
/
Logster.hs
120 lines (102 loc) · 3.59 KB
/
Logster.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
{-# LANGUAGE CPP #-}
import Buffer (getResultsBufferedBySecond)
import Carbon
import Config
import Control.Concurrent
import Control.Exception (finally)
import Control.Monad (when)
import qualified Data.ByteString.Lazy.Char8 as B
import qualified Data.ByteString.Char8 as SB
import Data.List
import Data.Time.LocalTime (TimeZone, getCurrentTimeZone)
import Metrics.Common
import Metrics
import Network
import System.Console.GetOpt
import System.Environment
import System.Exit
import System.IO
import System.IO.Unsafe (unsafePerformIO)
#ifdef USE_EKG
import System.Remote.Monitoring
#endif
type Line = String
data Flag =
Help |
Graphite String |
#ifdef USE_EKG
EKGPort String |
#endif
Debug
deriving (Eq, Show)
isOutputFlag :: Flag -> Bool
isOutputFlag (Graphite _) = True
isOutputFlag Debug = True
isOutputFlag _ = False
options :: [OptDescr Flag]
options =
[ Option "h" ["help"] (NoArg Help) "Show usage"
, Option "g" ["graphite"] (ReqArg Graphite "HOST:PORT") "Send metric data to HOST:PORT"
, Option "d" ["debug"] (NoArg Debug) "Send metric data to stderr"
#ifdef USE_EKG
, Option "s" ["stats-port"] (ReqArg EKGPort "PORT") "Expose process stats on PORT"
#endif
]
header :: String
header = "Usage: hlogster [OPTION...] CONFIG"
parseOptions :: [String] -> IO ([Flag], [String])
parseOptions argv = case getOpt Permute options argv of
(o, n, []) -> return (o, n)
(_, _, es) -> ioError $ userError $ concat es ++ usageInfo header options
outputFlagToAction :: Flag -> (Handle -> IO a) -> IO a
outputFlagToAction Debug action = action stdout
outputFlagToAction (Graphite hostport) action = do
h <- connectTo host (PortNumber $ fromIntegral portNum)
hSetBuffering h LineBuffering
action h
where
(host:_:port:_) = groupBy (\a b -> a /= ':' && b /= ':') hostport -- bleh
portNum = read port :: Int
outputFlagToAction _ _ = error "Something has gone horribly wrong."
produceOutput :: IMetricState a => TimeZone -> Metric a -> [B.ByteString] -> Handle -> IO ()
produceOutput tz metric input handle = mapM_ (`sendToCarbon` handle) result
where
result = concat $ getResultsBufferedBySecond tz 10 input metric
children :: MVar [a]
children = unsafePerformIO $ newMVar []
waitForChildren :: IO ()
waitForChildren = do
cs <- takeMVar children
case cs of
[] -> return ()
m:ms -> do
putMVar children ms
takeMVar m
waitForChildren
forkChild :: IO () -> IO ThreadId
forkChild io = do
mvar <- newEmptyMVar
childs <- takeMVar children
putMVar children (mvar:childs)
forkIO (io `finally` putMVar mvar ())
main :: IO ()
main = do
args <- getArgs
(opts, _) <- parseOptions args
when (Help `elem` opts) $ putStrLn (usageInfo header options) >> exitSuccess
let outputActions = map outputFlagToAction $ filter isOutputFlag opts
when (null outputActions) $ ioError (userError "Please specify at least one output destination (-g or -d)")
let metrics = map makeMetric config
#ifdef USE_EKG
let isEKGPort (EKGPort _) = True
isEKGPort _ = False
ekgPort = case find isEKGPort opts of
Just (EKGPort p) -> read p
Nothing -> 1030
_ <- forkServer (SB.pack "localhost") ekgPort
#endif
tz <- getCurrentTimeZone
input <- B.getContents
let threads = map (\metric -> mapM_ ($ produceOutput tz metric (B.split '\n' input)) outputActions) metrics :: [IO ()]
mapM_ forkChild threads
waitForChildren