From c419fb551db09b7bbb2c15f50d142663c0591b82 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Wed, 18 Feb 2015 22:56:59 +0000 Subject: [PATCH 01/16] Add missing toplevel Haddocks for Reader --- lib/Vaultaire/Reader.hs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/Vaultaire/Reader.hs b/lib/Vaultaire/Reader.hs index 4443e16..3413ba6 100644 --- a/lib/Vaultaire/Reader.hs +++ b/lib/Vaultaire/Reader.hs @@ -29,6 +29,8 @@ import Vaultaire.Types startReader :: DaemonArgs -> IO () startReader = flip handleMessages handleRequest +-- | Accepts a request 'Message' from a client (either Simple or +-- Extended) and replies with a response terminated by 'EndOfStream'. handleRequest :: Message -> Daemon () handleRequest (Message reply_f origin payload) = profileTime ReaderRequestLatency origin $ do @@ -47,9 +49,13 @@ handleRequest (Message reply_f origin payload) liftIO . errorM "Reader.handleRequest" $ "failed to decode request: " ++ show e +-- | Yields the ByteString argument to the output Pipe if it's not +-- empty; otherwise yields nothing. yieldNotNull :: Monad m => ByteString -> Pipe i ByteString m () yieldNotNull bs = unless (S.null bs) (yield bs) +-- | processSimple handles a request for a series of simple points, +-- sending the result back to the client. processSimple :: Address -> TimeStamp -> TimeStamp -> Origin -> ReplyF -> Daemon () processSimple addr start end origin reply_f = do profileCount ReaderSimplePoints origin @@ -86,6 +92,8 @@ readSimple origin addr start end = forever $ do lift $ profileCountN ReaderSimplePoints origin (S.length bs `div` 24) yieldNotNull bs +-- | processExtended handles a read request for a series of extended +-- points, sending a response back to the client. processExtended :: Address -> TimeStamp -> TimeStamp -> Origin -> ReplyF -> Daemon () processExtended addr start end origin reply_f = do refreshOriginDays origin From 489c06ac665ba60287d57d83ec6d4e43b1372900 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Wed, 18 Feb 2015 23:17:23 +0000 Subject: [PATCH 02/16] Make lookupFirst pointy for clarity --- lib/Vaultaire/DayMap.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Vaultaire/DayMap.hs b/lib/Vaultaire/DayMap.hs index a693ebb..26500b6 100644 --- a/lib/Vaultaire/DayMap.hs +++ b/lib/Vaultaire/DayMap.hs @@ -33,7 +33,7 @@ loadDayMap bs lookupFirst :: TimeStamp -> DayMap -> (Epoch, NumBuckets) -lookupFirst = (fst .) . splitRemainder +lookupFirst ts dm = fst $ splitRemainder ts dm -- Return first and the remainder that is later than that. splitRemainder :: TimeStamp -> DayMap -> ((Epoch, NumBuckets), DayMap) From d10a3193752ffc8194796080ea26e093bc261120 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Wed, 18 Feb 2015 23:33:00 +0000 Subject: [PATCH 03/16] Add missing toplevel Haddocks to DayMap definitions --- lib/Vaultaire/DayMap.hs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/lib/Vaultaire/DayMap.hs b/lib/Vaultaire/DayMap.hs index 26500b6..288956c 100644 --- a/lib/Vaultaire/DayMap.hs +++ b/lib/Vaultaire/DayMap.hs @@ -15,8 +15,9 @@ import qualified Data.Map as Map import Data.Packer import Vaultaire.Types --- | Simple corruption check of input is done by checking that it is a multiple --- of two Word64s +-- | Parses a DayMap. Simple corruption check of input is done by +-- checking that it is a multiple of two Word64s; Left is returned if +-- corruption is detected, or if the provided ByteString is empty. loadDayMap :: ByteString -> Either String DayMap loadDayMap bs | BS.null bs = @@ -31,11 +32,13 @@ loadDayMap bs then Right loaded else Left "bad first entry, must start at zero." - +-- | Finds the first entry in the provided 'DayMap' that's after the +-- provided 'TimeStamp'. lookupFirst :: TimeStamp -> DayMap -> (Epoch, NumBuckets) -lookupFirst ts dm = fst $ splitRemainder ts dm +lookupFirst start dm = fst $ splitRemainder start dm --- Return first and the remainder that is later than that. +-- | Return first entry and the remainder that is later than the provided +-- 'TimeStamp'.. splitRemainder :: TimeStamp -> DayMap -> ((Epoch, NumBuckets), DayMap) splitRemainder (TimeStamp t) (DayMap m) = let (left, middle, right) = Map.splitLookup t m @@ -46,14 +49,18 @@ splitRemainder (TimeStamp t) (DayMap m) = Nothing -> Map.findMax left in (first, DayMap right) +-- | Get the DayMap entries between two TimeStamps. lookupRange :: TimeStamp -> TimeStamp -> DayMap -> [(Epoch, NumBuckets)] lookupRange start (TimeStamp end) dm = - let (first, (DayMap remainder)) = splitRemainder start dm + let (first, DayMap remainder) = splitRemainder start dm (rest,_) = Map.split end remainder in first : Map.toList rest -- Internal +-- | Unpack a ByteString consisting of one or more +-- ('Epoch','NumBuckets') pairs into a 'DayMap'. Will throw an +-- 'OutOfBoundUnpacking' error on badly-formed data. mustLoadDayMap :: ByteString -> DayMap mustLoadDayMap = DayMap . Map.fromList . runUnpacking parse From e49d8d59ca2e088c1b2ba9690a954bfaacb64f4a Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Wed, 18 Feb 2015 23:55:23 +0000 Subject: [PATCH 04/16] Toplevel Haddocks in RollOver code --- lib/Vaultaire/DayMap.hs | 2 +- lib/Vaultaire/RollOver.hs | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/Vaultaire/DayMap.hs b/lib/Vaultaire/DayMap.hs index 288956c..d28a2a6 100644 --- a/lib/Vaultaire/DayMap.hs +++ b/lib/Vaultaire/DayMap.hs @@ -38,7 +38,7 @@ lookupFirst :: TimeStamp -> DayMap -> (Epoch, NumBuckets) lookupFirst start dm = fst $ splitRemainder start dm -- | Return first entry and the remainder that is later than the provided --- 'TimeStamp'.. +-- 'TimeStamp'. splitRemainder :: TimeStamp -> DayMap -> ((Epoch, NumBuckets), DayMap) splitRemainder (TimeStamp t) (DayMap m) = let (left, middle, right) = Map.splitLookup t m diff --git a/lib/Vaultaire/RollOver.hs b/lib/Vaultaire/RollOver.hs index 3da6b41..8e2b03c 100644 --- a/lib/Vaultaire/RollOver.hs +++ b/lib/Vaultaire/RollOver.hs @@ -20,32 +20,36 @@ import Vaultaire.DayMap import Vaultaire.Types -- | Roll the cluster onto a new "vault day", this will block until all other --- daemons are synchronized at acquiring any shared locks. +-- daemons are synchronized at acquiring any shared locks. -- --- All day maps will be invalidated on roll over, it is up to you to ensure --- that they are reloaded before next use. +-- All day maps will be invalidated on roll over, it is up to you to ensure +-- that they are reloaded before next use. rollOverSimpleDay :: Origin -> NumBuckets -> Daemon () rollOverSimpleDay origin' = rollOver origin' (simpleDayOID origin') (simpleLatestOID origin') +-- | Equivalent of 'rollOverSimpleDay' for extended buckets. rollOverExtendedDay :: Origin -> NumBuckets -> Daemon () rollOverExtendedDay origin' = rollOver origin' (extendedDayOID origin') (extendedLatestOID origin') -- | This compares the given time against the latest one in ceph, and updates --- if larger. +-- if larger. -- --- You should only call this once with the maximum time of whatever data set --- you are writing down. This should be done within the same lock as that --- write. +-- You should only call this once with the maximum time of whatever data set +-- you are writing down. This should be done within the same lock as that +-- write. updateSimpleLatest :: Origin -> TimeStamp -> Daemon () updateSimpleLatest origin' = updateLatest (simpleLatestOID origin') +-- | Equivalent of 'updateSimpleLatest' for extended buckets. updateExtendedLatest :: Origin -> TimeStamp -> Daemon () updateExtendedLatest origin' = updateLatest (extendedLatestOID origin') -- Internal +-- | Updates the latest time specified Ceph object to the provided +-- 'TimeStamp', if it is later than the one the object already has. updateLatest :: ByteString -> TimeStamp -> Daemon () updateLatest oid (TimeStamp time) = withLockExclusive oid . liftPool $ do result <- runObject oid readFull From b9c38bd18d4150c924650bf60b1885111aeaee88 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Wed, 18 Feb 2015 23:57:36 +0000 Subject: [PATCH 05/16] Use mappend rather than explicit ByteString.append --- lib/Vaultaire/RollOver.hs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/Vaultaire/RollOver.hs b/lib/Vaultaire/RollOver.hs index 8e2b03c..40fbcbb 100644 --- a/lib/Vaultaire/RollOver.hs +++ b/lib/Vaultaire/RollOver.hs @@ -13,6 +13,7 @@ module Vaultaire.RollOver import Control.Monad.State import Data.ByteString (ByteString) import qualified Data.ByteString as BS +import Data.Monoid import Data.Packer import System.Rados.Monadic import Vaultaire.Daemon @@ -76,7 +77,7 @@ rollOver origin day_file latest_file buckets = error $ "corrupt latest file in origin': " ++ show origin app <- liftPool . runObject day_file $ - append (latest `BS.append` build buckets) + append (latest <> build buckets) case app of Just e -> error $ "failed to append for rollover: " ++ show e @@ -91,9 +92,9 @@ originLockOID = simpleLatestOID simpleLatestOID :: Origin -> ByteString simpleLatestOID (Origin origin') = - "02_" `BS.append` origin' `BS.append` "_simple_latest" + "02_" <> origin' <> "_simple_latest" extendedLatestOID :: Origin -> ByteString extendedLatestOID (Origin origin') = - "02_" `BS.append` origin' `BS.append` "_extended_latest" + "02_" <> origin' <> "_extended_latest" From 8a0a358efbb65814073364bbe8d1bd994b590688 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 00:08:18 +0000 Subject: [PATCH 06/16] More RollOver Haddocks --- lib/Vaultaire/RollOver.hs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/Vaultaire/RollOver.hs b/lib/Vaultaire/RollOver.hs index 40fbcbb..5c990c7 100644 --- a/lib/Vaultaire/RollOver.hs +++ b/lib/Vaultaire/RollOver.hs @@ -65,6 +65,8 @@ updateLatest oid (TimeStamp time) = withLockExclusive oid . liftPool $ do value = runPacking 8 (putWord64LE time) parse = either (const 0) id . tryUnpacking getWord64LE +-- | Roll an origin over to a new "vault day" - append an entry to the +-- 'DayMap' file with the most recent timestamp and bucket count. rollOver :: Origin -> ByteString -> ByteString -> NumBuckets -> Daemon () rollOver origin day_file latest_file buckets = withLockExclusive (originLockOID origin) $ do @@ -87,14 +89,18 @@ rollOver origin day_file latest_file buckets = mustLatest = either (\e -> error $ "could not get latest_file" ++ show e) return +-- | Identifier of the object used to lock an origin during rollover. originLockOID :: Origin -> ByteString originLockOID = simpleLatestOID +-- | Construct the ID of the Ceph object storing the timestamp of the +-- latest 'SimplePoint' written to an origin. simpleLatestOID :: Origin -> ByteString simpleLatestOID (Origin origin') = "02_" <> origin' <> "_simple_latest" +-- | Construct the ID of the Ceph object storing the timestamp of the +-- latest 'ExtendedPoint' written to an origin. extendedLatestOID :: Origin -> ByteString extendedLatestOID (Origin origin') = "02_" <> origin' <> "_extended_latest" - From 1c5819f6e10c12b8debf565861a026f31df89ce2 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 00:17:37 +0000 Subject: [PATCH 07/16] The _latest objects store the time at which a point was written, not the timestamp stored in the point --- lib/Vaultaire/RollOver.hs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/Vaultaire/RollOver.hs b/lib/Vaultaire/RollOver.hs index 5c990c7..0d3821b 100644 --- a/lib/Vaultaire/RollOver.hs +++ b/lib/Vaultaire/RollOver.hs @@ -93,14 +93,15 @@ rollOver origin day_file latest_file buckets = originLockOID :: Origin -> ByteString originLockOID = simpleLatestOID --- | Construct the ID of the Ceph object storing the timestamp of the --- latest 'SimplePoint' written to an origin. +-- | Construct the ID of the Ceph object storing the time at which the +-- latest 'SimplePoint' was written to an origin (note that this is +-- the time at which the point was written, *not* the timestamp of the +-- point itself). simpleLatestOID :: Origin -> ByteString simpleLatestOID (Origin origin') = "02_" <> origin' <> "_simple_latest" --- | Construct the ID of the Ceph object storing the timestamp of the --- latest 'ExtendedPoint' written to an origin. +-- | Analogous to 'simpleLatestOID' for extended points. extendedLatestOID :: Origin -> ByteString extendedLatestOID (Origin origin') = "02_" <> origin' <> "_extended_latest" From dc5a2496009236222545bcc90b0d9debd252ac8d Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 01:16:42 +0000 Subject: [PATCH 08/16] _latest is actually point time, not writer time --- lib/Vaultaire/RollOver.hs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/Vaultaire/RollOver.hs b/lib/Vaultaire/RollOver.hs index 0d3821b..dab9c9f 100644 --- a/lib/Vaultaire/RollOver.hs +++ b/lib/Vaultaire/RollOver.hs @@ -93,10 +93,9 @@ rollOver origin day_file latest_file buckets = originLockOID :: Origin -> ByteString originLockOID = simpleLatestOID --- | Construct the ID of the Ceph object storing the time at which the --- latest 'SimplePoint' was written to an origin (note that this is --- the time at which the point was written, *not* the timestamp of the --- point itself). +-- | Construct the ID of the Ceph object storing the timestamp of the +-- latest 'SimplePoint' which was written to an origin (note that this is +-- the timestamp of the point, not the time at which it was written). simpleLatestOID :: Origin -> ByteString simpleLatestOID (Origin origin') = "02_" <> origin' <> "_simple_latest" From 9e1e111268ff2c5ddeff7ec074ad0f27b3bc0294 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 01:31:44 +0000 Subject: [PATCH 09/16] Add Haddocks for writer code --- lib/Vaultaire/Writer.hs | 43 ++++++++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/lib/Vaultaire/Writer.hs b/lib/Vaultaire/Writer.hs index 7dd06b7..bff02a3 100644 --- a/lib/Vaultaire/Writer.hs +++ b/lib/Vaultaire/Writer.hs @@ -45,13 +45,14 @@ import Vaultaire.Util (fatal) type EpochMap = HashMap Epoch type BucketMap = HashMap Bucket +-- | State used by the writer when processing a batch of points. data BatchState = BatchState { simple :: !(EpochMap (BucketMap Builder)) , extended :: !(EpochMap (BucketMap Builder)) , pending :: !(EpochMap (BucketMap (Word64, [Word64 -> Builder]))) , latestSimple :: !TimeStamp , latestExtended :: !TimeStamp - , dayMaps :: !(DayMap, DayMap) -- Simple, extended + , dayMaps :: !(DayMap, DayMap) -- ^ Simple, extended , bucketSize :: !Word64 , start :: !UTCTime } @@ -62,7 +63,11 @@ data Event = Msg Message | Tick startWriter :: DaemonArgs -> BucketSize -> IO () startWriter args bucket_size = handleMessages args (processBatch bucket_size) -batchStateNow :: BucketSize -> (DayMap, DayMap) -> IO BatchState +-- | Gets the relevant batch state for a given BucketSize and simple +-- and extended DayMaps. The 'start' time is the current time. +batchStateNow :: BucketSize + -> (DayMap, DayMap) -- ^ (simple daymap, extended daymap) + -> IO BatchState batchStateNow bucket_size dms = BatchState mempty mempty mempty 0 0 dms bucket_size <$> getCurrentTime @@ -128,14 +133,15 @@ processBatch bucket_size (Message reply origin payload) writeRate :: Int -> NominalDiffTime -> Float writeRate bytes d = (((fromRational . toRational) bytes) / ((fromRational . toRational) d)) / 1000 - +-- | Given a message consisting of one or more simple or extended +-- points, write them to the vault. processPoints :: MonadState BatchState m - => Word64 - -> ByteString - -> (DayMap, DayMap) - -> Origin - -> TimeStamp - -> TimeStamp + => Word64 -- ^ Offset + -> ByteString -- ^ Raw message + -> (DayMap, DayMap) -- ^ (simple daymap, extended daymap) + -> Origin -- ^ Origin to write to + -> TimeStamp -- ^ Latest simple timestamp + -> TimeStamp -- ^ Latest extended timestamp -> m (Int, Int) -- ^ Number of (simple, extended) points processed processPoints offset message day_maps origin latest_simple latest_ext | fromIntegral offset >= S.length message = do @@ -171,20 +177,26 @@ processPoints offset message day_maps origin latest_simple latest_ext (s,e) <- processPoints (offset + 24) message day_maps origin t latest_ext return (s+1,e) +-- | Unpacks a message starting from the given offset. If it corresponds +-- to a simple point, the 'Payload' will be the value; if extended, +-- the 'Payload' will be the number of bytes in the value. parseMessageAt :: Word64 -> Unpacking (Address, TimeStamp, Payload) parseMessageAt offset = do unpackSetPosition (fromIntegral offset) (,,) <$> (Address <$> getWord64LE) <*> (TimeStamp <$> getWord64LE) <*> getWord64LE - -getBytesAt :: Word64 -> Word64 -> Unpacking ByteString +-- | Gets the specified number of bytes, starting from the specified +-- offset. +getBytesAt :: Word64 -- ^ Offset + -> Word64 -- ^ Number of bytes + -> Unpacking ByteString getBytesAt offset len = do unpackSetPosition (fromIntegral offset) getBytes (fromIntegral len) -- | This one is pretty simple, simply append to the builder within the bucket --- map, which is within an epoch map itself. Yes, this is two map lookups per --- insert. +-- map, which is within an epoch map itself. Yes, this is two map lookups per +-- insert. appendSimple :: MonadState BatchState m => Epoch -> Bucket -> ByteString -> m () appendSimple epoch bucket bytes = do @@ -195,6 +207,7 @@ appendSimple epoch bucket bytes = do let !simple' = HashMap.insert epoch simple_map' (simple s) put $ s { simple = simple' } +-- | Analogous to 'appendSimple' for extended points. appendExtended :: MonadState BatchState m => Epoch -> Bucket -> Address -> TimeStamp -> Word64 -> ByteString -> m () appendExtended epoch bucket (Address address) (TimeStamp time) len string = do @@ -361,19 +374,23 @@ write ns origin do_rollovers s = do forWithKey = flip HashMap.traverseWithKey +-- | Get the file size and mtime of an extended bucket. extendedOffset :: Origin -> Epoch -> Bucket -> Pool (AsyncRead StatResult) extendedOffset o e b = runAsync $ runObject (bucketOID o e b "extended") stat +-- | Writes an extended point to a bucket. writeExtended :: Origin -> Epoch -> Bucket -> ByteString -> Pool AsyncWrite writeExtended o e b payload = runAsync $ runObject (bucketOID o e b "extended") (append payload) +-- | Writes a simple point to a bucket. writeSimple :: Origin -> Epoch -> Bucket -> ByteString -> Pool (AsyncRead StatResult, AsyncWrite) writeSimple o e b payload = runAsync $ runObject (bucketOID o e b "simple") $ (,) <$> stat <*> append payload +-- | Object ID of the write lock object for an origin. writeLockOID :: Origin -> ByteString writeLockOID (Origin o') = "02_" `S.append` o' `S.append` "_write_lock" From 09bc5130f3d47e426c8cae2d263331c2426cf196 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 01:33:33 +0000 Subject: [PATCH 10/16] Replace explicit ByteString.append with mappend --- lib/Vaultaire/Writer.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Vaultaire/Writer.hs b/lib/Vaultaire/Writer.hs index bff02a3..c2bd7ab 100644 --- a/lib/Vaultaire/Writer.hs +++ b/lib/Vaultaire/Writer.hs @@ -393,4 +393,4 @@ writeSimple o e b payload = -- | Object ID of the write lock object for an origin. writeLockOID :: Origin -> ByteString writeLockOID (Origin o') = - "02_" `S.append` o' `S.append` "_write_lock" + "02_" <> o' <> "_write_lock" From 01cccabf354fd54b5ed412951227eefe8886a754 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 01:36:41 +0000 Subject: [PATCH 11/16] 'lest x' means 'so x doesn't happen'; I don't think that's what's meant --- lib/Vaultaire/Broker.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Vaultaire/Broker.hs b/lib/Vaultaire/Broker.hs index 33e7d65..699f775 100644 --- a/lib/Vaultaire/Broker.hs +++ b/lib/Vaultaire/Broker.hs @@ -8,7 +8,7 @@ import System.ZMQ4.Monadic -- | Start a ZMQ proxy, capture is always a Pub socket. -- --- This should never return lest catastrophic failure. +-- This should never return except in the case of catastrophic failure. startProxy :: (SocketType front_t, SocketType back_t) => (front_t, String) -- ^ Frontend, clients -> (back_t, String) -- ^ Backend, workers From bab6d3531e0c6e29ac643b4e8f55cd43d51149d7 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 01:44:28 +0000 Subject: [PATCH 12/16] Add missing profiler Haddocks --- lib/Vaultaire/Profiler.hs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/Vaultaire/Profiler.hs b/lib/Vaultaire/Profiler.hs index dfdb459..fea4538 100644 --- a/lib/Vaultaire/Profiler.hs +++ b/lib/Vaultaire/Profiler.hs @@ -67,7 +67,6 @@ startProfiler env@(ProfilingEnv{..}) = -- | Interface exposed to worker threads so they can report to the profiler. -- in case of no profiling, these functions should be basically noops. --- data ProfilingInterface = ProfilingInterface { -- Reporting functions, they will perform the necessary measurements -- and send them to the profiler. @@ -78,8 +77,7 @@ data ProfilingInterface = ProfilingInterface , report :: MonadIO m => TeleMsgType -> Origin -> Word64 -> m () } -- | Arguments needed to be specified by the user for profiling --- (name, publishing port, period, bound, shutdown signal) --- +-- (name, publishing port, period, bound, shutdown signal). type ProfilerArgs = (String, URI, Period, Int, MVar ()) -- | Profiling environment. @@ -95,11 +93,11 @@ data ProfilingEnv = ProfilingEnv } -- | Values that can be sent to the profiling channel. --- data ChanMsg = Barrier | Tele TeleMsg deriving Show +-- | Dummy profiler, does nothing. noProfiler :: (ProfilingEnv, ProfilingInterface) noProfiler = ( ProfilingEnv @@ -119,6 +117,8 @@ noProfiler , measureTime = (>>= return . (,0)) , report = const $ const $ const $ return () } ) +-- | Builds a (real, not-dummy) profiler interface. If the agent name +-- provided is invalid, an empty name will be used. hasProfiler :: ProfilerArgs -> IO (ProfilingEnv, ProfilingInterface) hasProfiler (name, broker, period, bound, quit) = do n <- maybe (do errorM "Daemon.setupProfiler" @@ -178,6 +178,8 @@ hasProfiler (name, broker, period, bound, quit) = do in fromIntegral $ secInMilliSec + fromIntegral uSecInMilliSec raw (CTime x) = x +-- | Reads profiling reports waiting in the channel, packs them into +-- 'TeleResp' messages and publishes them on the provided socket. profile :: PublishSock -> Profiler () profile sock = do ProfilingEnv{..} <- ask @@ -206,7 +208,6 @@ profile sock = do -- | Reads from input until we either hit a barrier or reach the cap. -- Like pipes-concurrency's @fromInput@ but non-blocking. --- fromInputUntil :: MonadIO m => Int -> Input ChanMsg -> Producer TeleMsg m () fromInputUntil n chan = evalStateP 0 go where go = do @@ -221,7 +222,6 @@ fromInputUntil n chan = evalStateP 0 go -- *NOTE* Technically we do not need to report number of requests received, -- since we can just count the number of latency samples, -- but to keep things simple and modular we will leave them separate. --- aggregate :: Monad m => Producer TeleMsg m () -> m [TeleMsg] aggregate = evalStateT $ foldAll (\acc x -> M.insertWith (go $ _type x) (_origin x, _type x) (1, _payload x) acc) @@ -263,5 +263,6 @@ aggregate = evalStateT $ foldAll keep (c1, v1) (c2, v2) = (c1 + c2, v1 + v2) msg (x,y) z = TeleMsg x y z +-- | Suspends current thread for one millisecond. milliDelay :: Int -> IO () milliDelay = threadDelay . (*1000) From 1222f37e78f6e01e8ace7671a835668cb06780fa Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 02:01:34 +0000 Subject: [PATCH 13/16] More Haddocks for Profiler and Daemon code --- lib/Vaultaire/Daemon.hs | 5 +++++ lib/Vaultaire/Profiler.hs | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/lib/Vaultaire/Daemon.hs b/lib/Vaultaire/Daemon.hs index 3234cb5..32d01fb 100644 --- a/lib/Vaultaire/Daemon.hs +++ b/lib/Vaultaire/Daemon.hs @@ -263,9 +263,11 @@ refreshOriginDays origin' = do Lock management -} +-- | Lock timeout period, in seconds. timeout :: Int timeout = 600 -- 10 minutes +-- | Duration of lock, in seconds. release :: Double release = fromIntegral $ timeout + 5 @@ -390,12 +392,15 @@ dayMapsFromCeph origin' = do Right day_map -> return $ Right (fromIntegral (BS.length contents), day_map) +-- | Ceph object ID of the origin's Simple DayMap. simpleDayOID :: Origin -> ByteString simpleDayOID (Origin origin') = "02_" `BS.append` origin' `BS.append` "_simple_days" +-- | Ceph object ID of the origin's Extended DayMap. extendedDayOID :: Origin -> ByteString extendedDayOID (Origin origin') = "02_" `BS.append` origin' `BS.append` "_extended_days" +-- | Ceph object ID of the bucket at the provided epoch. bucketOID :: Origin -> Epoch -> Bucket -> String -> ByteString bucketOID (Origin origin') epoch bucket kind = BS.pack $ printf "02_%s_%020d_%020d_%s" (BS.unpack origin') diff --git a/lib/Vaultaire/Profiler.hs b/lib/Vaultaire/Profiler.hs index fea4538..f360044 100644 --- a/lib/Vaultaire/Profiler.hs +++ b/lib/Vaultaire/Profiler.hs @@ -58,6 +58,8 @@ runProfiler e (Profiler x) = do debugM "Profiler.runProfiler" "Profiler sealed and cleaned up." return r +-- | Runs the profiling loop which reads reports from the worker and +-- publishes them via ZeroMQ. startProfiler :: ProfilingEnv -> IO () startProfiler env@(ProfilingEnv{..}) = Z.withContext $ \ctx -> @@ -71,10 +73,14 @@ data ProfilingInterface = ProfilingInterface { -- Reporting functions, they will perform the necessary measurements -- and send them to the profiler. profCount :: MonadIO m => TeleMsgType -> Origin -> Int -> m () + -- ^ Queue sending a count profiling message. , profTime :: MonadIO m => TeleMsgType -> Origin -> m r -> m r + -- ^ Report the time taken by an action. -- Raw measurement and sending functions. , measureTime :: MonadIO m => m r -> m (r, Word64) + -- ^ Measure the time elapsed for the provided action (in ms). , report :: MonadIO m => TeleMsgType -> Origin -> Word64 -> m () } + -- ^ Send a profiling report to be queued. -- | Arguments needed to be specified by the user for profiling -- (name, publishing port, period, bound, shutdown signal). From fce9993ccf55e40be529455b5e6b2a4fb6ebc594 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 02:03:18 +0000 Subject: [PATCH 14/16] Event type is dead code, should be removed This is a "breaking change" and so there's a case for making a major version bump here, but at this stage I think we're pretty safe to cowboy it - nothing uses or should use the Event type anymore in the Vaultaire-and-associated codebases, and it's very unlikely that anyone else would be relying on the presence of a dead sum type at this stage. --- lib/Vaultaire/Writer.hs | 3 --- tests/ContentsTest.hs | 2 +- tests/ReaderTest.hs | 2 +- tests/WriterTest.hs | 2 +- 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/lib/Vaultaire/Writer.hs b/lib/Vaultaire/Writer.hs index c2bd7ab..3be8708 100644 --- a/lib/Vaultaire/Writer.hs +++ b/lib/Vaultaire/Writer.hs @@ -14,7 +14,6 @@ module Vaultaire.Writer write, batchStateNow, BatchState(..), - Event(..), ) where import Control.Applicative @@ -57,8 +56,6 @@ data BatchState = BatchState , start :: !UTCTime } -data Event = Msg Message | Tick - -- | Start a writer daemon, runs until shutdown. startWriter :: DaemonArgs -> BucketSize -> IO () startWriter args bucket_size = handleMessages args (processBatch bucket_size) diff --git a/tests/ContentsTest.hs b/tests/ContentsTest.hs index d57f373..6d0eeb8 100644 --- a/tests/ContentsTest.hs +++ b/tests/ContentsTest.hs @@ -15,7 +15,7 @@ module Main where -import System.ZMQ4.Monadic hiding (Event) +import System.ZMQ4.Monadic import Test.Hspec hiding (pending) diff --git a/tests/ReaderTest.hs b/tests/ReaderTest.hs index cb4d1e2..3787bab 100644 --- a/tests/ReaderTest.hs +++ b/tests/ReaderTest.hs @@ -7,7 +7,7 @@ import Control.Concurrent import Data.ByteString (ByteString) import qualified Data.ByteString as BS import Data.List.NonEmpty (fromList) -import System.ZMQ4.Monadic hiding (Event) +import System.ZMQ4.Monadic import Test.Hspec hiding (pending) import TestHelpers diff --git a/tests/WriterTest.hs b/tests/WriterTest.hs index e41e3af..b2aedf2 100644 --- a/tests/WriterTest.hs +++ b/tests/WriterTest.hs @@ -16,7 +16,7 @@ import Data.Maybe import Data.Time import Network.URI import System.Rados.Monadic -import System.ZMQ4.Monadic hiding (Event) +import System.ZMQ4.Monadic import Test.Hspec hiding (pending) import TestHelpers From 1609110174fef7d3c95e6ae12cc7a0b3b7ac5f87 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 02:14:02 +0000 Subject: [PATCH 15/16] Haddocks for contents code --- lib/Vaultaire/Contents.hs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/lib/Vaultaire/Contents.hs b/lib/Vaultaire/Contents.hs index c8bc3ed..ec555c8 100644 --- a/lib/Vaultaire/Contents.hs +++ b/lib/Vaultaire/Contents.hs @@ -35,6 +35,8 @@ import Vaultaire.Types startContents :: DaemonArgs -> IO () startContents = flip handleMessages handleRequest +-- | Perform the action requested in the 'Message' and send the +-- appropriate response to the client. handleRequest :: Message -> Daemon () handleRequest (Message reply origin payload) = do case fromWire payload of @@ -72,6 +74,8 @@ performListRequest reply o (lift . reply . uncurry ContentsListBypass) reply EndOfContentsList +-- | A request has been made to allocate a new unique address; do this +-- and return it to the client. performRegisterRequest :: ReplyF -> Origin -> Daemon () performRegisterRequest reply o = do liftIO $ infoM "Contents.performRegisterRequest" @@ -79,6 +83,8 @@ performRegisterRequest reply o = do allocateNewAddressInVault o >>= reply . RandomAddress +-- | Generate a random address, make sure it's unused, and write an +-- empty source dict for it so it is no longer unused. allocateNewAddressInVault :: Origin -> Daemon Address allocateNewAddressInVault o = do a <- Address . (`clearBit` 0) <$> liftIO rollDice @@ -94,6 +100,10 @@ allocateNewAddressInVault o = do rollDice = getStdRandom (randomR (0, maxBound :: Word64)) isAddressInVault a = isJust <$> InternalStore.readFrom o a +-- | Update the sourcedict associated with the provided address. New +-- tags will be added; new values for existing names will be updated +-- (in the case of sourcedict objects only, last write wins); +-- no tags will be removed. performUpdateRequest :: ReplyF -> Origin @@ -124,6 +134,9 @@ performUpdateRequest reply o a input else writeSourceTagsForAddress o a update reply UpdateSuccess +-- | Remove the tags specified in the provided sourcedict from the +-- provided address. Tags not specified in the provided sourcedict +-- will remain. performRemoveRequest :: ReplyF -> Origin @@ -148,6 +161,7 @@ performRemoveRequest reply o a input = do (show o ++ " Complete") reply RemoveSuccess +-- | Read the sourcedict associated with an address. retreiveSourceTagsForAddress :: Origin -> Address -> Daemon (Maybe SourceDict) retreiveSourceTagsForAddress o a = do result <- InternalStore.readFrom o a @@ -155,6 +169,9 @@ retreiveSourceTagsForAddress o a = do Just b' -> either throw Just (fromWire b') Nothing -> Nothing +-- | Pack the tags in the provided sourcedict and write them to Ceph. +-- This will overwrite any previously-associated sourcedict for that +-- address. writeSourceTagsForAddress :: Origin -> Address -> SourceDict -> Daemon () writeSourceTagsForAddress o a s = do liftIO $ infoM "Contents.writeSourceTagsForAddress" From 70989f6cdeaba94835e2ce863820639bf5562750 Mon Sep 17 00:00:00 2001 From: Sharif Olorin Date: Thu, 19 Feb 2015 02:32:33 +0000 Subject: [PATCH 16/16] Replace ByteString append with mappend in Daemon code --- lib/Vaultaire/Daemon.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/Vaultaire/Daemon.hs b/lib/Vaultaire/Daemon.hs index 32d01fb..58f6a89 100644 --- a/lib/Vaultaire/Daemon.hs +++ b/lib/Vaultaire/Daemon.hs @@ -394,11 +394,11 @@ dayMapsFromCeph origin' = do -- | Ceph object ID of the origin's Simple DayMap. simpleDayOID :: Origin -> ByteString -simpleDayOID (Origin origin') = "02_" `BS.append` origin' `BS.append` "_simple_days" +simpleDayOID (Origin origin') = "02_" <> origin' <> "_simple_days" -- | Ceph object ID of the origin's Extended DayMap. extendedDayOID :: Origin -> ByteString -extendedDayOID (Origin origin') = "02_" `BS.append` origin' `BS.append` "_extended_days" +extendedDayOID (Origin origin') = "02_" <> origin' <> "_extended_days" -- | Ceph object ID of the bucket at the provided epoch. bucketOID :: Origin -> Epoch -> Bucket -> String -> ByteString