Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation and cleanup #78

Merged
merged 16 commits into from
Feb 19, 2015
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/Vaultaire/Broker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import System.ZMQ4.Monadic

-- | Start a ZMQ proxy, capture is always a Pub socket.
--
-- This should never return lest catastrophic failure.
-- This should never return except in the case of catastrophic failure.
startProxy :: (SocketType front_t, SocketType back_t)
=> (front_t, String) -- ^ Frontend, clients
-> (back_t, String) -- ^ Backend, workers
Expand Down
17 changes: 17 additions & 0 deletions lib/Vaultaire/Contents.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import Vaultaire.Types
startContents :: DaemonArgs -> IO ()
startContents = flip handleMessages handleRequest

-- | Perform the action requested in the 'Message' and send the
-- appropriate response to the client.
handleRequest :: Message -> Daemon ()
handleRequest (Message reply origin payload) = do
case fromWire payload of
Expand Down Expand Up @@ -72,13 +74,17 @@ performListRequest reply o
(lift . reply . uncurry ContentsListBypass)
reply EndOfContentsList

-- | A request has been made to allocate a new unique address; do this
-- and return it to the client.
performRegisterRequest :: ReplyF -> Origin -> Daemon ()
performRegisterRequest reply o = do
liftIO $ infoM "Contents.performRegisterRequest"
(show o ++ " RegisterListRequest")

allocateNewAddressInVault o >>= reply . RandomAddress

-- | Generate a random address, make sure it's unused, and write an
-- empty source dict for it so it is no longer unused.
allocateNewAddressInVault :: Origin -> Daemon Address
allocateNewAddressInVault o = do
a <- Address . (`clearBit` 0) <$> liftIO rollDice
Expand All @@ -94,6 +100,10 @@ allocateNewAddressInVault o = do
rollDice = getStdRandom (randomR (0, maxBound :: Word64))
isAddressInVault a = isJust <$> InternalStore.readFrom o a

-- | Update the sourcedict associated with the provided address. New
-- tags will be added; new values for existing names will be updated
-- (in the case of sourcedict objects only, last write wins);
-- no tags will be removed.
performUpdateRequest
:: ReplyF
-> Origin
Expand Down Expand Up @@ -124,6 +134,9 @@ performUpdateRequest reply o a input
else writeSourceTagsForAddress o a update
reply UpdateSuccess

-- | Remove the tags specified in the provided sourcedict from the
-- provided address. Tags not specified in the provided sourcedict
-- will remain.
performRemoveRequest
:: ReplyF
-> Origin
Expand All @@ -148,13 +161,17 @@ performRemoveRequest reply o a input = do
(show o ++ " Complete")
reply RemoveSuccess

-- | Read the sourcedict associated with an address.
retreiveSourceTagsForAddress :: Origin -> Address -> Daemon (Maybe SourceDict)
retreiveSourceTagsForAddress o a = do
result <- InternalStore.readFrom o a
return $ case result of
Just b' -> either throw Just (fromWire b')
Nothing -> Nothing

-- | Pack the tags in the provided sourcedict and write them to Ceph.
-- This will overwrite any previously-associated sourcedict for that
-- address.
writeSourceTagsForAddress :: Origin -> Address -> SourceDict -> Daemon ()
writeSourceTagsForAddress o a s = do
liftIO $ infoM "Contents.writeSourceTagsForAddress"
Expand Down
5 changes: 5 additions & 0 deletions lib/Vaultaire/Daemon.hs
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,11 @@ refreshOriginDays origin' = do
Lock management
-}

-- | Lock timeout period, in seconds.
timeout :: Int
timeout = 600 -- 10 minutes

-- | Duration of lock, in seconds.
release :: Double
release = fromIntegral $ timeout + 5

Expand Down Expand Up @@ -390,12 +392,15 @@ dayMapsFromCeph origin' = do
Right day_map ->
return $ Right (fromIntegral (BS.length contents), day_map)

-- | Ceph object ID of the origin's Simple DayMap.
simpleDayOID :: Origin -> ByteString
simpleDayOID (Origin origin') = "02_" `BS.append` origin' `BS.append` "_simple_days"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably have used <> here if I had known of it's greatness back then. No need to change now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed that one - updated.


-- | Ceph object ID of the origin's Extended DayMap.
extendedDayOID :: Origin -> ByteString
extendedDayOID (Origin origin') = "02_" `BS.append` origin' `BS.append` "_extended_days"

-- | Ceph object ID of the bucket at the provided epoch.
bucketOID :: Origin -> Epoch -> Bucket -> String -> ByteString
bucketOID (Origin origin') epoch bucket kind = BS.pack $ printf "02_%s_%020d_%020d_%s"
(BS.unpack origin')
Expand Down
19 changes: 13 additions & 6 deletions lib/Vaultaire/DayMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import qualified Data.Map as Map
import Data.Packer
import Vaultaire.Types

-- | Simple corruption check of input is done by checking that it is a multiple
-- of two Word64s
-- | Parses a DayMap. Simple corruption check of input is done by
-- checking that it is a multiple of two Word64s; Left is returned if
-- corruption is detected, or if the provided ByteString is empty.
loadDayMap :: ByteString -> Either String DayMap
loadDayMap bs
| BS.null bs =
Expand All @@ -31,11 +32,13 @@ loadDayMap bs
then Right loaded
else Left "bad first entry, must start at zero."


-- | Finds the first entry in the provided 'DayMap' that's after the
-- provided 'TimeStamp'.
lookupFirst :: TimeStamp -> DayMap -> (Epoch, NumBuckets)
lookupFirst = (fst .) . splitRemainder
lookupFirst start dm = fst $ splitRemainder start dm

-- Return first and the remainder that is later than that.
-- | Return first entry and the remainder that is later than the provided
-- 'TimeStamp'.
splitRemainder :: TimeStamp -> DayMap -> ((Epoch, NumBuckets), DayMap)
splitRemainder (TimeStamp t) (DayMap m) =
let (left, middle, right) = Map.splitLookup t m
Expand All @@ -46,14 +49,18 @@ splitRemainder (TimeStamp t) (DayMap m) =
Nothing -> Map.findMax left
in (first, DayMap right)

-- | Get the DayMap entries between two TimeStamps.
lookupRange :: TimeStamp -> TimeStamp -> DayMap -> [(Epoch, NumBuckets)]
lookupRange start (TimeStamp end) dm =
let (first, (DayMap remainder)) = splitRemainder start dm
let (first, DayMap remainder) = splitRemainder start dm
(rest,_) = Map.split end remainder
in first : Map.toList rest

-- Internal

-- | Unpack a ByteString consisting of one or more
-- ('Epoch','NumBuckets') pairs into a 'DayMap'. Will throw an
-- 'OutOfBoundUnpacking' error on badly-formed data.
mustLoadDayMap :: ByteString -> DayMap
mustLoadDayMap =
DayMap . Map.fromList . runUnpacking parse
Expand Down
19 changes: 13 additions & 6 deletions lib/Vaultaire/Profiler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ runProfiler e (Profiler x) = do
debugM "Profiler.runProfiler" "Profiler sealed and cleaned up."
return r

-- | Runs the profiling loop which reads reports from the worker and
-- publishes them via ZeroMQ.
startProfiler :: ProfilingEnv -> IO ()
startProfiler env@(ProfilingEnv{..}) =
Z.withContext $ \ctx ->
Expand All @@ -67,19 +69,21 @@ startProfiler env@(ProfilingEnv{..}) =

-- | Interface exposed to worker threads so they can report to the profiler.
-- in case of no profiling, these functions should be basically noops.
--
data ProfilingInterface = ProfilingInterface
{ -- Reporting functions, they will perform the necessary measurements
-- and send them to the profiler.
profCount :: MonadIO m => TeleMsgType -> Origin -> Int -> m ()
-- ^ Queue sending a count profiling message.
, profTime :: MonadIO m => TeleMsgType -> Origin -> m r -> m r
-- ^ Report the time taken by an action.
-- Raw measurement and sending functions.
, measureTime :: MonadIO m => m r -> m (r, Word64)
-- ^ Measure the time elapsed for the provided action (in ms).
, report :: MonadIO m => TeleMsgType -> Origin -> Word64 -> m () }
-- ^ Send a profiling report to be queued.

-- | Arguments needed to be specified by the user for profiling
-- (name, publishing port, period, bound, shutdown signal)
--
-- (name, publishing port, period, bound, shutdown signal).
type ProfilerArgs = (String, URI, Period, Int, MVar ())

-- | Profiling environment.
Expand All @@ -95,11 +99,11 @@ data ProfilingEnv = ProfilingEnv
}

-- | Values that can be sent to the profiling channel.
--
data ChanMsg = Barrier
| Tele TeleMsg
deriving Show

-- | Dummy profiler, does nothing.
noProfiler :: (ProfilingEnv, ProfilingInterface)
noProfiler
= ( ProfilingEnv
Expand All @@ -119,6 +123,8 @@ noProfiler
, measureTime = (>>= return . (,0))
, report = const $ const $ const $ return () } )

-- | Builds a (real, not-dummy) profiler interface. If the agent name
-- provided is invalid, an empty name will be used.
hasProfiler :: ProfilerArgs -> IO (ProfilingEnv, ProfilingInterface)
hasProfiler (name, broker, period, bound, quit) = do
n <- maybe (do errorM "Daemon.setupProfiler"
Expand Down Expand Up @@ -178,6 +184,8 @@ hasProfiler (name, broker, period, bound, quit) = do
in fromIntegral $ secInMilliSec + fromIntegral uSecInMilliSec
raw (CTime x) = x

-- | Reads profiling reports waiting in the channel, packs them into
-- 'TeleResp' messages and publishes them on the provided socket.
profile :: PublishSock -> Profiler ()
profile sock = do
ProfilingEnv{..} <- ask
Expand Down Expand Up @@ -206,7 +214,6 @@ profile sock = do

-- | Reads from input until we either hit a barrier or reach the cap.
-- Like pipes-concurrency's @fromInput@ but non-blocking.
--
fromInputUntil :: MonadIO m => Int -> Input ChanMsg -> Producer TeleMsg m ()
fromInputUntil n chan = evalStateP 0 go
where go = do
Expand All @@ -221,7 +228,6 @@ fromInputUntil n chan = evalStateP 0 go
-- *NOTE* Technically we do not need to report number of requests received,
-- since we can just count the number of latency samples,
-- but to keep things simple and modular we will leave them separate.
--
aggregate :: Monad m => Producer TeleMsg m () -> m [TeleMsg]
aggregate = evalStateT $ foldAll
(\acc x -> M.insertWith (go $ _type x) (_origin x, _type x) (1, _payload x) acc)
Expand Down Expand Up @@ -263,5 +269,6 @@ aggregate = evalStateT $ foldAll
keep (c1, v1) (c2, v2) = (c1 + c2, v1 + v2)
msg (x,y) z = TeleMsg x y z

-- | Suspends current thread for one millisecond.
milliDelay :: Int -> IO ()
milliDelay = threadDelay . (*1000)
8 changes: 8 additions & 0 deletions lib/Vaultaire/Reader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import Vaultaire.Types
startReader :: DaemonArgs -> IO ()
startReader = flip handleMessages handleRequest

-- | Accepts a request 'Message' from a client (either Simple or
-- Extended) and replies with a response terminated by 'EndOfStream'.
handleRequest :: Message -> Daemon ()
handleRequest (Message reply_f origin payload)
= profileTime ReaderRequestLatency origin $ do
Expand All @@ -47,9 +49,13 @@ handleRequest (Message reply_f origin payload)
liftIO . errorM "Reader.handleRequest" $
"failed to decode request: " ++ show e

-- | Yields the ByteString argument to the output Pipe if it's not
-- empty; otherwise yields nothing.
yieldNotNull :: Monad m => ByteString -> Pipe i ByteString m ()
yieldNotNull bs = unless (S.null bs) (yield bs)

-- | processSimple handles a request for a series of simple points,
-- sending the result back to the client.
processSimple :: Address -> TimeStamp -> TimeStamp -> Origin -> ReplyF -> Daemon ()
processSimple addr start end origin reply_f = do
profileCount ReaderSimplePoints origin
Expand Down Expand Up @@ -86,6 +92,8 @@ readSimple origin addr start end = forever $ do
lift $ profileCountN ReaderSimplePoints origin (S.length bs `div` 24)
yieldNotNull bs

-- | processExtended handles a read request for a series of extended
-- points, sending a response back to the client.
processExtended :: Address -> TimeStamp -> TimeStamp -> Origin -> ReplyF -> Daemon ()
processExtended addr start end origin reply_f = do
refreshOriginDays origin
Expand Down
33 changes: 22 additions & 11 deletions lib/Vaultaire/RollOver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,44 @@ module Vaultaire.RollOver
import Control.Monad.State
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Monoid
import Data.Packer
import System.Rados.Monadic
import Vaultaire.Daemon
import Vaultaire.DayMap
import Vaultaire.Types

-- | Roll the cluster onto a new "vault day", this will block until all other
-- daemons are synchronized at acquiring any shared locks.
-- daemons are synchronized at acquiring any shared locks.
--
-- All day maps will be invalidated on roll over, it is up to you to ensure
-- that they are reloaded before next use.
-- All day maps will be invalidated on roll over, it is up to you to ensure
-- that they are reloaded before next use.
rollOverSimpleDay :: Origin -> NumBuckets -> Daemon ()
rollOverSimpleDay origin' =
rollOver origin' (simpleDayOID origin') (simpleLatestOID origin')

-- | Equivalent of 'rollOverSimpleDay' for extended buckets.
rollOverExtendedDay :: Origin -> NumBuckets -> Daemon ()
rollOverExtendedDay origin' =
rollOver origin' (extendedDayOID origin') (extendedLatestOID origin')

-- | This compares the given time against the latest one in ceph, and updates
-- if larger.
-- if larger.
--
-- You should only call this once with the maximum time of whatever data set
-- you are writing down. This should be done within the same lock as that
-- write.
-- You should only call this once with the maximum time of whatever data set
-- you are writing down. This should be done within the same lock as that
-- write.
updateSimpleLatest :: Origin -> TimeStamp -> Daemon ()
updateSimpleLatest origin' = updateLatest (simpleLatestOID origin')

-- | Equivalent of 'updateSimpleLatest' for extended buckets.
updateExtendedLatest :: Origin -> TimeStamp -> Daemon ()
updateExtendedLatest origin' = updateLatest (extendedLatestOID origin')

-- Internal

-- | Updates the latest time specified Ceph object to the provided
-- 'TimeStamp', if it is later than the one the object already has.
updateLatest :: ByteString -> TimeStamp -> Daemon ()
updateLatest oid (TimeStamp time) = withLockExclusive oid . liftPool $ do
result <- runObject oid readFull
Expand All @@ -60,6 +65,8 @@ updateLatest oid (TimeStamp time) = withLockExclusive oid . liftPool $ do
value = runPacking 8 (putWord64LE time)
parse = either (const 0) id . tryUnpacking getWord64LE

-- | Roll an origin over to a new "vault day" - append an entry to the
-- 'DayMap' file with the most recent timestamp and bucket count.
rollOver :: Origin -> ByteString -> ByteString -> NumBuckets -> Daemon ()
rollOver origin day_file latest_file buckets =
withLockExclusive (originLockOID origin) $ do
Expand All @@ -72,7 +79,7 @@ rollOver origin day_file latest_file buckets =
error $ "corrupt latest file in origin': " ++ show origin

app <- liftPool . runObject day_file $
append (latest `BS.append` build buckets)
append (latest <> build buckets)

case app of
Just e -> error $ "failed to append for rollover: " ++ show e
Expand All @@ -82,14 +89,18 @@ rollOver origin day_file latest_file buckets =
mustLatest = either (\e -> error $ "could not get latest_file" ++ show e)
return

-- | Identifier of the object used to lock an origin during rollover.
originLockOID :: Origin -> ByteString
originLockOID = simpleLatestOID

-- | Construct the ID of the Ceph object storing the timestamp of the
-- latest 'SimplePoint' which was written to an origin (note that this is
-- the timestamp of the point, not the time at which it was written).
simpleLatestOID :: Origin -> ByteString
simpleLatestOID (Origin origin') =
"02_" `BS.append` origin' `BS.append` "_simple_latest"
"02_" <> origin' <> "_simple_latest"

-- | Analogous to 'simpleLatestOID' for extended points.
extendedLatestOID :: Origin -> ByteString
extendedLatestOID (Origin origin') =
"02_" `BS.append` origin' `BS.append` "_extended_latest"

"02_" <> origin' <> "_extended_latest"
Loading