Skip to content

Commit

Permalink
Merge pull request #78 from anchor/dox
Browse files Browse the repository at this point in the history
Documentation and cleanup
  • Loading branch information
olorin committed Feb 19, 2015
2 parents c81b837 + 70989f6 commit d6a25fe
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 46 deletions.
2 changes: 1 addition & 1 deletion lib/Vaultaire/Broker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import System.ZMQ4.Monadic

-- | Start a ZMQ proxy, capture is always a Pub socket.
--
-- This should never return lest catastrophic failure.
-- This should never return except in the case of catastrophic failure.
startProxy :: (SocketType front_t, SocketType back_t)
=> (front_t, String) -- ^ Frontend, clients
-> (back_t, String) -- ^ Backend, workers
Expand Down
17 changes: 17 additions & 0 deletions lib/Vaultaire/Contents.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import Vaultaire.Types
startContents :: DaemonArgs -> IO ()
startContents = flip handleMessages handleRequest

-- | Perform the action requested in the 'Message' and send the
-- appropriate response to the client.
handleRequest :: Message -> Daemon ()
handleRequest (Message reply origin payload) = do
case fromWire payload of
Expand Down Expand Up @@ -72,13 +74,17 @@ performListRequest reply o
(lift . reply . uncurry ContentsListBypass)
reply EndOfContentsList

-- | A request has been made to allocate a new unique address; do this
-- and return it to the client.
performRegisterRequest :: ReplyF -> Origin -> Daemon ()
performRegisterRequest reply o = do
liftIO $ infoM "Contents.performRegisterRequest"
(show o ++ " RegisterListRequest")

allocateNewAddressInVault o >>= reply . RandomAddress

-- | Generate a random address, make sure it's unused, and write an
-- empty source dict for it so it is no longer unused.
allocateNewAddressInVault :: Origin -> Daemon Address
allocateNewAddressInVault o = do
a <- Address . (`clearBit` 0) <$> liftIO rollDice
Expand All @@ -94,6 +100,10 @@ allocateNewAddressInVault o = do
rollDice = getStdRandom (randomR (0, maxBound :: Word64))
isAddressInVault a = isJust <$> InternalStore.readFrom o a

-- | Update the sourcedict associated with the provided address. New
-- tags will be added; new values for existing names will be updated
-- (in the case of sourcedict objects only, last write wins);
-- no tags will be removed.
performUpdateRequest
:: ReplyF
-> Origin
Expand Down Expand Up @@ -124,6 +134,9 @@ performUpdateRequest reply o a input
else writeSourceTagsForAddress o a update
reply UpdateSuccess

-- | Remove the tags specified in the provided sourcedict from the
-- provided address. Tags not specified in the provided sourcedict
-- will remain.
performRemoveRequest
:: ReplyF
-> Origin
Expand All @@ -148,13 +161,17 @@ performRemoveRequest reply o a input = do
(show o ++ " Complete")
reply RemoveSuccess

-- | Read the sourcedict associated with an address.
retreiveSourceTagsForAddress :: Origin -> Address -> Daemon (Maybe SourceDict)
retreiveSourceTagsForAddress o a = do
result <- InternalStore.readFrom o a
return $ case result of
Just b' -> either throw Just (fromWire b')
Nothing -> Nothing

-- | Pack the tags in the provided sourcedict and write them to Ceph.
-- This will overwrite any previously-associated sourcedict for that
-- address.
writeSourceTagsForAddress :: Origin -> Address -> SourceDict -> Daemon ()
writeSourceTagsForAddress o a s = do
liftIO $ infoM "Contents.writeSourceTagsForAddress"
Expand Down
9 changes: 7 additions & 2 deletions lib/Vaultaire/Daemon.hs
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,11 @@ refreshOriginDays origin' = do
Lock management
-}

-- | Lock timeout period, in seconds.
timeout :: Int
timeout = 600 -- 10 minutes

-- | Duration of lock, in seconds.
release :: Double
release = fromIntegral $ timeout + 5

Expand Down Expand Up @@ -390,12 +392,15 @@ dayMapsFromCeph origin' = do
Right day_map ->
return $ Right (fromIntegral (BS.length contents), day_map)

-- | Ceph object ID of the origin's Simple DayMap.
simpleDayOID :: Origin -> ByteString
simpleDayOID (Origin origin') = "02_" `BS.append` origin' `BS.append` "_simple_days"
simpleDayOID (Origin origin') = "02_" <> origin' <> "_simple_days"

-- | Ceph object ID of the origin's Extended DayMap.
extendedDayOID :: Origin -> ByteString
extendedDayOID (Origin origin') = "02_" `BS.append` origin' `BS.append` "_extended_days"
extendedDayOID (Origin origin') = "02_" <> origin' <> "_extended_days"

-- | Ceph object ID of the bucket at the provided epoch.
bucketOID :: Origin -> Epoch -> Bucket -> String -> ByteString
bucketOID (Origin origin') epoch bucket kind = BS.pack $ printf "02_%s_%020d_%020d_%s"
(BS.unpack origin')
Expand Down
19 changes: 13 additions & 6 deletions lib/Vaultaire/DayMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import qualified Data.Map as Map
import Data.Packer
import Vaultaire.Types

-- | Simple corruption check of input is done by checking that it is a multiple
-- of two Word64s
-- | Parses a DayMap. Simple corruption check of input is done by
-- checking that it is a multiple of two Word64s; Left is returned if
-- corruption is detected, or if the provided ByteString is empty.
loadDayMap :: ByteString -> Either String DayMap
loadDayMap bs
| BS.null bs =
Expand All @@ -31,11 +32,13 @@ loadDayMap bs
then Right loaded
else Left "bad first entry, must start at zero."


-- | Finds the first entry in the provided 'DayMap' that's after the
-- provided 'TimeStamp'.
lookupFirst :: TimeStamp -> DayMap -> (Epoch, NumBuckets)
lookupFirst = (fst .) . splitRemainder
lookupFirst start dm = fst $ splitRemainder start dm

-- Return first and the remainder that is later than that.
-- | Return first entry and the remainder that is later than the provided
-- 'TimeStamp'.
splitRemainder :: TimeStamp -> DayMap -> ((Epoch, NumBuckets), DayMap)
splitRemainder (TimeStamp t) (DayMap m) =
let (left, middle, right) = Map.splitLookup t m
Expand All @@ -46,14 +49,18 @@ splitRemainder (TimeStamp t) (DayMap m) =
Nothing -> Map.findMax left
in (first, DayMap right)

-- | Get the DayMap entries between two TimeStamps.
lookupRange :: TimeStamp -> TimeStamp -> DayMap -> [(Epoch, NumBuckets)]
lookupRange start (TimeStamp end) dm =
let (first, (DayMap remainder)) = splitRemainder start dm
let (first, DayMap remainder) = splitRemainder start dm
(rest,_) = Map.split end remainder
in first : Map.toList rest

-- Internal

-- | Unpack a ByteString consisting of one or more
-- ('Epoch','NumBuckets') pairs into a 'DayMap'. Will throw an
-- 'OutOfBoundUnpacking' error on badly-formed data.
mustLoadDayMap :: ByteString -> DayMap
mustLoadDayMap =
DayMap . Map.fromList . runUnpacking parse
Expand Down
19 changes: 13 additions & 6 deletions lib/Vaultaire/Profiler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ runProfiler e (Profiler x) = do
debugM "Profiler.runProfiler" "Profiler sealed and cleaned up."
return r

-- | Runs the profiling loop which reads reports from the worker and
-- publishes them via ZeroMQ.
startProfiler :: ProfilingEnv -> IO ()
startProfiler env@(ProfilingEnv{..}) =
Z.withContext $ \ctx ->
Expand All @@ -67,19 +69,21 @@ startProfiler env@(ProfilingEnv{..}) =

-- | Interface exposed to worker threads so they can report to the profiler.
-- in case of no profiling, these functions should be basically noops.
--
data ProfilingInterface = ProfilingInterface
{ -- Reporting functions, they will perform the necessary measurements
-- and send them to the profiler.
profCount :: MonadIO m => TeleMsgType -> Origin -> Int -> m ()
-- ^ Queue sending a count profiling message.
, profTime :: MonadIO m => TeleMsgType -> Origin -> m r -> m r
-- ^ Report the time taken by an action.
-- Raw measurement and sending functions.
, measureTime :: MonadIO m => m r -> m (r, Word64)
-- ^ Measure the time elapsed for the provided action (in ms).
, report :: MonadIO m => TeleMsgType -> Origin -> Word64 -> m () }
-- ^ Send a profiling report to be queued.

-- | Arguments needed to be specified by the user for profiling
-- (name, publishing port, period, bound, shutdown signal)
--
-- (name, publishing port, period, bound, shutdown signal).
type ProfilerArgs = (String, URI, Period, Int, MVar ())

-- | Profiling environment.
Expand All @@ -95,11 +99,11 @@ data ProfilingEnv = ProfilingEnv
}

-- | Values that can be sent to the profiling channel.
--
data ChanMsg = Barrier
| Tele TeleMsg
deriving Show

-- | Dummy profiler, does nothing.
noProfiler :: (ProfilingEnv, ProfilingInterface)
noProfiler
= ( ProfilingEnv
Expand All @@ -119,6 +123,8 @@ noProfiler
, measureTime = (>>= return . (,0))
, report = const $ const $ const $ return () } )

-- | Builds a (real, not-dummy) profiler interface. If the agent name
-- provided is invalid, an empty name will be used.
hasProfiler :: ProfilerArgs -> IO (ProfilingEnv, ProfilingInterface)
hasProfiler (name, broker, period, bound, quit) = do
n <- maybe (do errorM "Daemon.setupProfiler"
Expand Down Expand Up @@ -178,6 +184,8 @@ hasProfiler (name, broker, period, bound, quit) = do
in fromIntegral $ secInMilliSec + fromIntegral uSecInMilliSec
raw (CTime x) = x

-- | Reads profiling reports waiting in the channel, packs them into
-- 'TeleResp' messages and publishes them on the provided socket.
profile :: PublishSock -> Profiler ()
profile sock = do
ProfilingEnv{..} <- ask
Expand Down Expand Up @@ -206,7 +214,6 @@ profile sock = do

-- | Reads from input until we either hit a barrier or reach the cap.
-- Like pipes-concurrency's @fromInput@ but non-blocking.
--
fromInputUntil :: MonadIO m => Int -> Input ChanMsg -> Producer TeleMsg m ()
fromInputUntil n chan = evalStateP 0 go
where go = do
Expand All @@ -221,7 +228,6 @@ fromInputUntil n chan = evalStateP 0 go
-- *NOTE* Technically we do not need to report number of requests received,
-- since we can just count the number of latency samples,
-- but to keep things simple and modular we will leave them separate.
--
aggregate :: Monad m => Producer TeleMsg m () -> m [TeleMsg]
aggregate = evalStateT $ foldAll
(\acc x -> M.insertWith (go $ _type x) (_origin x, _type x) (1, _payload x) acc)
Expand Down Expand Up @@ -263,5 +269,6 @@ aggregate = evalStateT $ foldAll
keep (c1, v1) (c2, v2) = (c1 + c2, v1 + v2)
msg (x,y) z = TeleMsg x y z

-- | Suspends current thread for one millisecond.
milliDelay :: Int -> IO ()
milliDelay = threadDelay . (*1000)
8 changes: 8 additions & 0 deletions lib/Vaultaire/Reader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import Vaultaire.Types
startReader :: DaemonArgs -> IO ()
startReader = flip handleMessages handleRequest

-- | Accepts a request 'Message' from a client (either Simple or
-- Extended) and replies with a response terminated by 'EndOfStream'.
handleRequest :: Message -> Daemon ()
handleRequest (Message reply_f origin payload)
= profileTime ReaderRequestLatency origin $ do
Expand All @@ -47,9 +49,13 @@ handleRequest (Message reply_f origin payload)
liftIO . errorM "Reader.handleRequest" $
"failed to decode request: " ++ show e

-- | Yields the ByteString argument to the output Pipe if it's not
-- empty; otherwise yields nothing.
yieldNotNull :: Monad m => ByteString -> Pipe i ByteString m ()
yieldNotNull bs = unless (S.null bs) (yield bs)

-- | processSimple handles a request for a series of simple points,
-- sending the result back to the client.
processSimple :: Address -> TimeStamp -> TimeStamp -> Origin -> ReplyF -> Daemon ()
processSimple addr start end origin reply_f = do
profileCount ReaderSimplePoints origin
Expand Down Expand Up @@ -86,6 +92,8 @@ readSimple origin addr start end = forever $ do
lift $ profileCountN ReaderSimplePoints origin (S.length bs `div` 24)
yieldNotNull bs

-- | processExtended handles a read request for a series of extended
-- points, sending a response back to the client.
processExtended :: Address -> TimeStamp -> TimeStamp -> Origin -> ReplyF -> Daemon ()
processExtended addr start end origin reply_f = do
refreshOriginDays origin
Expand Down
33 changes: 22 additions & 11 deletions lib/Vaultaire/RollOver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,44 @@ module Vaultaire.RollOver
import Control.Monad.State
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Monoid
import Data.Packer
import System.Rados.Monadic
import Vaultaire.Daemon
import Vaultaire.DayMap
import Vaultaire.Types

-- | Roll the cluster onto a new "vault day", this will block until all other
-- daemons are synchronized at acquiring any shared locks.
-- daemons are synchronized at acquiring any shared locks.
--
-- All day maps will be invalidated on roll over, it is up to you to ensure
-- that they are reloaded before next use.
-- All day maps will be invalidated on roll over, it is up to you to ensure
-- that they are reloaded before next use.
rollOverSimpleDay :: Origin -> NumBuckets -> Daemon ()
rollOverSimpleDay origin' =
rollOver origin' (simpleDayOID origin') (simpleLatestOID origin')

-- | Equivalent of 'rollOverSimpleDay' for extended buckets.
rollOverExtendedDay :: Origin -> NumBuckets -> Daemon ()
rollOverExtendedDay origin' =
rollOver origin' (extendedDayOID origin') (extendedLatestOID origin')

-- | This compares the given time against the latest one in ceph, and updates
-- if larger.
-- if larger.
--
-- You should only call this once with the maximum time of whatever data set
-- you are writing down. This should be done within the same lock as that
-- write.
-- You should only call this once with the maximum time of whatever data set
-- you are writing down. This should be done within the same lock as that
-- write.
updateSimpleLatest :: Origin -> TimeStamp -> Daemon ()
updateSimpleLatest origin' = updateLatest (simpleLatestOID origin')

-- | Equivalent of 'updateSimpleLatest' for extended buckets.
updateExtendedLatest :: Origin -> TimeStamp -> Daemon ()
updateExtendedLatest origin' = updateLatest (extendedLatestOID origin')

-- Internal

-- | Updates the latest time specified Ceph object to the provided
-- 'TimeStamp', if it is later than the one the object already has.
updateLatest :: ByteString -> TimeStamp -> Daemon ()
updateLatest oid (TimeStamp time) = withLockExclusive oid . liftPool $ do
result <- runObject oid readFull
Expand All @@ -60,6 +65,8 @@ updateLatest oid (TimeStamp time) = withLockExclusive oid . liftPool $ do
value = runPacking 8 (putWord64LE time)
parse = either (const 0) id . tryUnpacking getWord64LE

-- | Roll an origin over to a new "vault day" - append an entry to the
-- 'DayMap' file with the most recent timestamp and bucket count.
rollOver :: Origin -> ByteString -> ByteString -> NumBuckets -> Daemon ()
rollOver origin day_file latest_file buckets =
withLockExclusive (originLockOID origin) $ do
Expand All @@ -72,7 +79,7 @@ rollOver origin day_file latest_file buckets =
error $ "corrupt latest file in origin': " ++ show origin

app <- liftPool . runObject day_file $
append (latest `BS.append` build buckets)
append (latest <> build buckets)

case app of
Just e -> error $ "failed to append for rollover: " ++ show e
Expand All @@ -82,14 +89,18 @@ rollOver origin day_file latest_file buckets =
mustLatest = either (\e -> error $ "could not get latest_file" ++ show e)
return

-- | Identifier of the object used to lock an origin during rollover.
originLockOID :: Origin -> ByteString
originLockOID = simpleLatestOID

-- | Construct the ID of the Ceph object storing the timestamp of the
-- latest 'SimplePoint' which was written to an origin (note that this is
-- the timestamp of the point, not the time at which it was written).
simpleLatestOID :: Origin -> ByteString
simpleLatestOID (Origin origin') =
"02_" `BS.append` origin' `BS.append` "_simple_latest"
"02_" <> origin' <> "_simple_latest"

-- | Analogous to 'simpleLatestOID' for extended points.
extendedLatestOID :: Origin -> ByteString
extendedLatestOID (Origin origin') =
"02_" `BS.append` origin' `BS.append` "_extended_latest"

"02_" <> origin' <> "_extended_latest"
Loading

0 comments on commit d6a25fe

Please sign in to comment.