Skip to content
Permalink
Browse files

cleanup exceptions and move some api calls in STM

  • Loading branch information...
kderme committed Apr 15, 2019
1 parent ee33991 commit c9ad5af9b20907a0ebb2ddf1a4fb6fb005f169c1
@@ -42,15 +42,16 @@ module Control.Monad.Class.MonadSTM

import Prelude hiding (read)

import qualified Control.Monad.STM as STM
import qualified Control.Concurrent.STM.TVar as STM
import qualified Control.Concurrent.STM.TBQueue as STM
import qualified Control.Concurrent.STM.TMVar as STM
import qualified Control.Concurrent.STM.TQueue as STM
import qualified Control.Concurrent.STM.TBQueue as STM
import qualified Control.Concurrent.STM.TVar as STM
import qualified Control.Monad.STM as STM

import Control.Exception
import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State
import GHC.Stack
import Numeric.Natural (Natural)

@@ -156,6 +157,45 @@ instance MonadSTM m => MonadSTM (ReaderT e m) where
isEmptyTBQueue = lift . isEmptyTBQueue
isFullTBQueue = lift . isFullTBQueue

instance MonadSTM m => MonadSTM (StateT s m) where
type STM (StateT s m) = StateT s (STM m)
type TVar (StateT s m) = TVar m
type TMVar (StateT s m) = TMVar m
type TQueue (StateT s m) = TQueue m
type TBQueue (StateT s m) = TBQueue m

atomically (StateT t) = StateT $ \e -> atomically (t e)
newTVar = lift . newTVar
readTVar = lift . readTVar
writeTVar t a = lift $ writeTVar t a
retry = lift retry

newTMVar = lift . newTMVar
newTMVarM = lift . newTMVarM
newEmptyTMVar = lift newEmptyTMVar
newEmptyTMVarM = lift newEmptyTMVarM
takeTMVar = lift . takeTMVar
tryTakeTMVar = lift . tryTakeTMVar
putTMVar t a = lift $ putTMVar t a
tryPutTMVar t a = lift $ tryPutTMVar t a
readTMVar = lift . readTMVar
tryReadTMVar = lift . tryReadTMVar
swapTMVar t a = lift $ swapTMVar t a
isEmptyTMVar = lift . isEmptyTMVar

newTQueue = lift $ newTQueue
readTQueue = lift . readTQueue
tryReadTQueue = lift . tryReadTQueue
writeTQueue q a = lift $ writeTQueue q a
isEmptyTQueue = lift . isEmptyTQueue

newTBQueue = lift . newTBQueue
readTBQueue = lift . readTBQueue
tryReadTBQueue = lift . tryReadTBQueue
writeTBQueue q a = lift $ writeTBQueue q a
isEmptyTBQueue = lift . isEmptyTBQueue
isFullTBQueue = lift . isFullTBQueue

instance (Show e, MonadSTM m) => MonadSTM (ExceptT e m) where
type STM (ExceptT e m) = ExceptT e (STM m)
type TVar (ExceptT e m) = TVar m
@@ -7,6 +7,7 @@ module Ouroboros.Storage.VolatileDB.API
, module Ouroboros.Storage.VolatileDB.Types
) where

import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow

import Data.ByteString (ByteString)
@@ -36,5 +37,5 @@ data VolatileDB blockId m = VolatileDB {
, getBlockIds :: HasCallStack => m [blockId]
, getSuccessors :: HasCallStack => m (Maybe blockId -> Set blockId)
, garbageCollect :: HasCallStack => SlotNo -> m ()
, getIsMember :: HasCallStack => m (blockId -> Bool)
, getIsMember :: HasCallStack => STM m (Maybe (blockId -> Bool))
}
@@ -44,6 +44,11 @@
-- + there is no modify block operation. Thanks to that we need not keep any rollback journals
-- to make sure we are safe in case of unexpected shutdowns.
--
-- We only throw VolatileDBError. All internal errors, like io errors, are cought, wrapped
-- and rethrown. For all new calls of HasFs, we must make sure that they are used properly
-- wrapped. All top-level function of this module are safe. You can safely use HasFs calls
-- in modifyState or wrapFsError actions.
--
-- Concurrency
--
-- The same db should only be opened once
@@ -92,13 +97,15 @@ import qualified Data.Map as Map
import Data.Maybe
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Typeable
import Data.Word (Word64)
import GHC.Stack
import qualified System.IO as IO

import Ouroboros.Consensus.Util (SomePair (..))

import Ouroboros.Storage.FS.API
import Ouroboros.Storage.FS.API.Types
import Ouroboros.Storage.Util.ErrorHandling (ErrorHandling (..))
import qualified Ouroboros.Storage.Util.ErrorHandling as EH
import Ouroboros.Storage.VolatileDB.API
@@ -131,15 +138,15 @@ data InternalState blockId h = InternalState {
VolatileDB API
------------------------------------------------------------------------------}

openDB :: (HasCallStack, MonadCatch m, MonadSTM m, Ord blockId)
openDB :: (HasCallStack, MonadCatch m, MonadSTM m, Ord blockId, Typeable blockId, Show blockId)
=> HasFS m h
-> ErrorHandling (VolatileDBError blockId) m
-> Parser (ParserError blockId) m blockId
-> Int
-> m (VolatileDB blockId m)
openDB h e p m = fst <$> openDBFull h e p m

openDBFull :: (HasCallStack, MonadCatch m, MonadSTM m, Ord blockId)
openDBFull :: (HasCallStack, MonadCatch m, MonadSTM m, Ord blockId, Show blockId, Typeable blockId)
=> HasFS m h
-> ErrorHandling (VolatileDBError blockId) m
-> Parser (ParserError blockId) m blockId
@@ -162,21 +169,21 @@ openDBFull hasFS err parser maxBlocksPerFile = do

-- After opening the db once, the same @maxBlocksPerFile@ must be provided all
-- next opens.
openDBImpl :: (HasCallStack, MonadThrow m, MonadSTM m, Ord blockId)
openDBImpl :: (HasCallStack, MonadCatch m, Typeable blockId, MonadThrow m, MonadSTM m, Ord blockId, Show blockId)
=> HasFS m h
-> ErrorHandling (VolatileDBError blockId) m
-> Parser (ParserError blockId) m blockId
-> Int
-> m (VolatileDBEnv m blockId)
openDBImpl hasFS@HasFS{..} err parser maxBlocksPerFile =
if maxBlocksPerFile <= 0
then EH.throwError err $ InvalidArgumentsError "maxBlocksPerFile can't be 0"
then EH.throwError err $ UserError . InvalidArgumentsError $ "maxBlocksPerFile should be positive"
else do
st <- mkInternalStateDB hasFS err parser maxBlocksPerFile
stVar <- atomically $ newTMVar $ Just st
return $ VolatileDBEnv hasFS err stVar maxBlocksPerFile parser

closeDBImpl :: (MonadSTM m)
closeDBImpl :: MonadSTM m
=> VolatileDBEnv m blockId
-> m ()
closeDBImpl VolatileDBEnv{..} = do
@@ -188,7 +195,7 @@ closeDBImpl VolatileDBEnv{..} = do
where
HasFS{..} = _dbHasFS

isOpenDBImpl :: (MonadSTM m)
isOpenDBImpl :: MonadSTM m
=> VolatileDBEnv m blockId
-> m Bool
isOpenDBImpl VolatileDBEnv{..} = do
@@ -197,7 +204,7 @@ isOpenDBImpl VolatileDBEnv{..} = do

-- closeDB . reOpenDB is a no-op. This is achieved because when we reOpen
-- we try to append on the latest created file.
reOpenDBImpl :: (HasCallStack, MonadCatch m, MonadSTM m, Ord blockId)
reOpenDBImpl :: (HasCallStack, MonadCatch m, MonadSTM m, Ord blockId, Show blockId, Typeable blockId)
=> VolatileDBEnv m blockId
-> m ()
reOpenDBImpl VolatileDBEnv{..} = do
@@ -207,7 +214,7 @@ reOpenDBImpl VolatileDBEnv{..} = do
st <- mkInternalStateDB _dbHasFS _dbErr _parser _maxBlocksPerFile
return (Just st, ())

getBlockImpl :: (MonadSTM m, MonadCatch m, Ord blockId)
getBlockImpl :: (Typeable blockId, Show blockId, MonadSTM m, MonadCatch m, Ord blockId)
=> VolatileDBEnv m blockId
-> blockId
-> m (Maybe ByteString)
@@ -233,7 +240,7 @@ getBlockImpl env@VolatileDBEnv{..} slot = do
-- We should be careful about not leaking open fds when we open a new file, since this can affect garbage
-- collection of files.
putBlockImpl :: forall m. (MonadCatch m, MonadSTM m)
=> forall blockId. (Ord blockId)
=> forall blockId. (Ord blockId, Show blockId, Typeable blockId)
=> VolatileDBEnv m blockId
-> BlockInfo blockId
-> BS.Builder
@@ -281,7 +288,8 @@ putBlockImpl env@VolatileDBEnv{..} BlockInfo{..} builder = do
-- This is ok only if any fs updates leave the fs in a consistent state every moment.
-- This approach works since we always close the Database in case of errors,
-- but we should rethink it if this changes in the future.
garbageCollectImpl :: forall m blockId. (MonadCatch m, MonadSTM m, Ord blockId)
garbageCollectImpl :: forall m blockId
. (Typeable blockId, Show blockId, MonadCatch m, MonadSTM m, Ord blockId)
=> VolatileDBEnv m blockId
-> SlotNo
-> m ()
@@ -331,25 +339,24 @@ getInternalState :: forall m blockId. MonadSTM m
getInternalState VolatileDBEnv{..} = do
mSt <- atomically (readTMVar _dbInternalState)
case mSt of
Nothing -> EH.throwError _dbErr ClosedDBError
Nothing -> EH.throwError _dbErr $ UserError ClosedDBError
Just st -> return (SomePair _dbHasFS st)

getIsMemberImpl :: forall m blockId. (MonadSTM m, Ord blockId)
=> VolatileDBEnv m blockId
-> m (blockId -> Bool)
-> STM m (Maybe (blockId -> Bool))
getIsMemberImpl VolatileDBEnv{..} = do
mSt <- atomically (readTMVar _dbInternalState)
case mSt of
Nothing -> EH.throwError _dbErr ClosedDBError
Just st -> return (\bid -> Map.member bid (_currentRevMap st))
mSt <- readTMVar _dbInternalState
return $ flip fmap mSt $ \st -> \bid ->
Map.member bid (_currentRevMap st)

getBlockIdsImpl :: forall m blockId. (MonadSTM m)
=> VolatileDBEnv m blockId
-> m [blockId]
getBlockIdsImpl VolatileDBEnv{..} = do
mSt <- atomically (readTMVar _dbInternalState)
case mSt of
Nothing -> EH.throwError _dbErr ClosedDBError
Nothing -> EH.throwError _dbErr $ UserError ClosedDBError
Just st -> return $ Map.keys $ _currentRevMap st

getSuccessorsImpl :: forall m blockId. (MonadSTM m, Ord blockId)
@@ -358,7 +365,7 @@ getSuccessorsImpl :: forall m blockId. (MonadSTM m, Ord blockId)
getSuccessorsImpl VolatileDBEnv{..} = do
mSt <- atomically (readTMVar _dbInternalState)
case mSt of
Nothing -> EH.throwError _dbErr ClosedDBError
Nothing -> EH.throwError _dbErr $ UserError ClosedDBError
Just st -> return $ \blockId ->
fromMaybe Set.empty (Map.lookup blockId (_currentSuccMap st))

@@ -415,26 +422,26 @@ reOpenFile HasFS{..} _err VolatileDBEnv{..} st@InternalState{..} = do
, _currentWriteOffset = 0
}

mkInternalStateDB :: (HasCallStack, MonadThrow m, Ord blockId)
mkInternalStateDB :: (HasCallStack, MonadThrow m, MonadCatch m, Ord blockId, Show blockId, Typeable blockId)
=> HasFS m h
-> ErrorHandling (VolatileDBError blockId) m
-> Parser (ParserError blockId) m blockId
-> Int
-> m (InternalState blockId h)
mkInternalStateDB hasFS@HasFS{..} err parser maxBlocksPerFile = do
mkInternalStateDB hasFS@HasFS{..} err parser maxBlocksPerFile = wrapFsError err $ do
allFiles <- do
createDirectoryIfMissing True []
listDirectory []
mkInternalState hasFS err parser maxBlocksPerFile allFiles

mkInternalState :: forall blockId m h. (MonadThrow m, HasCallStack, Ord blockId)
mkInternalState :: forall blockId m h. (MonadCatch m, HasCallStack, Ord blockId, Show blockId, Typeable blockId)
=> HasFS m h
-> ErrorHandling (VolatileDBError blockId) m
-> Parser (ParserError blockId) m blockId
-> Int
-> Set String
-> m (InternalState blockId h)
mkInternalState hasFS@HasFS{..} err parser n files = do
mkInternalState hasFS@HasFS{..} err parser n files = wrapFsError err $ do
lastFd <- findNextFd err files
let
go :: Index blockId
@@ -495,7 +502,7 @@ mkInternalState hasFS@HasFS{..} err parser n files = do
withFile hasFS path IO.AppendMode $ \hndl ->
hTruncate hndl (fromIntegral offset)
return ()
Just e -> EH.throwError err $ VParserError e
Just e -> EH.throwError err $ UnexpectedError $ ParserError e
let fileMpNoPred = sizeAndId <$> fileMp
let maxSlotOfFile = maxSlotMap fileMp
let nBlocks = Map.size fileMp
@@ -514,13 +521,23 @@ mkInternalState hasFS@HasFS{..} err parser n files = do
go newMp newRevMp newSuccMp newMaxSlot newHaveLessThanN restFiles
go Map.empty Map.empty Map.empty Nothing [] (Set.toList files)

tryVolDB :: (Show blockId, Typeable blockId, MonadCatch m) => m a -> m (Either (VolatileDBError blockId) a)
tryVolDB = tryDB (UnexpectedError . FileSystemError)

modifyState :: forall blockId m r. (HasCallStack, MonadSTM m, MonadCatch m)
tryDB :: forall e a m. (Exception e, MonadCatch m) => (FsError -> e) -> m a -> m (Either e a)
tryDB fromFS = fmap squash . try . try
where
squash :: Either FsError (Either e x) -> Either e x
squash = either (Left . fromFS) id

-- This is safe in terms of throwing FsErrors.
modifyState :: forall blockId m r
. (HasCallStack, MonadSTM m, MonadCatch m, Show blockId, Typeable blockId)
=> VolatileDBEnv m blockId
-> (forall h. HasFS m h -> (InternalState blockId h) -> m (InternalState blockId h, r))
-> m r
modifyState VolatileDBEnv{_dbHasFS = hasFS :: HasFS m h, ..} action = do
(mr, ()) <- generalBracket open close (EH.try _dbErr . mutation)
(mr, ()) <- generalBracket open close (tryVolDB . mutation)
case mr of
Left e -> throwError e
Right (_, r) -> return r
@@ -551,15 +568,15 @@ modifyState VolatileDBEnv{_dbHasFS = hasFS :: HasFS m h, ..} action = do

mutation :: Maybe (InternalState blockId h)
-> m (InternalState blockId h, r)
mutation Nothing = throwError ClosedDBError
mutation Nothing = throwError $ UserError ClosedDBError
mutation (Just oldState) = action hasFS oldState

-- TODO what if this fails?
closeOpenHandle :: Maybe (InternalState blockId h) -> m ()
closeOpenHandle Nothing = return ()
closeOpenHandle (Just InternalState {..}) = hClose _currentWriteHandle


--
reverseMap :: forall blockId. Ord blockId
=> String
-> ReverseIndex blockId
@@ -572,7 +589,7 @@ reverseMap file revMp mp = foldM f revMp (Map.toList mp)
-> Either (VolatileDBError blockId) (ReverseIndex blockId)
f rv (w, (n, BlockInfo {..})) = case Map.lookup bbid revMp of
Nothing -> Right $ Map.insert bbid (InternalBlockInfo file w n bslot bpreBid) rv
Just blockInfo -> Left $ VParserError
Just blockInfo -> Left $ UnexpectedError . ParserError
$ DuplicatedSlot $ Map.fromList [(bbid, ([file], [ibFile blockInfo]))]

-- Throws an error if one of the given file names does not parse.
@@ -588,5 +605,5 @@ findNextFd err files = foldM go Nothing files
Just a' -> max a' a
go :: Maybe FileId -> String -> m (Maybe FileId)
go fd file = case parseFd file of
Nothing -> EH.throwError err $ VParserError $ InvalidFilename file
Nothing -> EH.throwError err $ UnexpectedError . ParserError $ InvalidFilename file
Just fd' -> return $ Just $ maxMaybe fd fd'
@@ -38,10 +38,22 @@ type SuccessorsIndex blockId = Map (Maybe blockId) (Set blockId)

-- | Errors which might arise when working with this database.
data VolatileDBError blockId =
FileSystemError FsError
| VParserError (ParserError blockId)
| InvalidArgumentsError String
UserError UserError
-- ^ An error thrown because of incorrect usage of the volatile database
-- by the user.
| UnexpectedError (UnexpectedError blockId)
-- ^ An unexpected error thrown because something went wrong on a lower
-- layer.
deriving Show

data UserError =
InvalidArgumentsError String
| ClosedDBError
deriving (Show, Eq)

data UnexpectedError blockId =
FileSystemError FsError
| ParserError (ParserError blockId)
deriving (Show)

instance Eq blockId => Eq (VolatileDBError blockId) where
@@ -61,13 +73,24 @@ instance Eq blockId => Eq (ParserError blockId) where

sameVolatileDBError :: Eq blockId => VolatileDBError blockId -> VolatileDBError blockId -> Bool
sameVolatileDBError e1 e2 = case (e1, e2) of
(FileSystemError fs1, FileSystemError fs2) -> sameFsError fs1 fs2
(VParserError p1, VParserError p2) -> p1 == p2
(ClosedDBError, ClosedDBError) -> True
(InvalidArgumentsError _, InvalidArgumentsError _) -> True
_ -> False
(UserError ue1, UserError ue2) -> ue1 == ue2
(UnexpectedError ue1, UnexpectedError ue2) -> sameUnexpectedError ue1 ue2
_ -> False

-- (FileSystemError fs1, FileSystemError fs2) -> sameFsError fs1 fs2
-- (VParserError p1, VParserError p2) -> p1 == p2
-- (ClosedDBError, ClosedDBError) -> True
-- (InvalidArgumentsError _, InvalidArgumentsError _) -> True
-- _ -> False

-- TODO: Why is this not comparing the arguments to 'DuplicatedSlot'?
sameUnexpectedError :: Eq blockId => UnexpectedError blockId -> UnexpectedError blockId -> Bool
sameUnexpectedError e1 e2 = case (e1, e2) of
(FileSystemError fs1, FileSystemError fs2) -> sameFsError fs1 fs2
(ParserError p1, ParserError p2) -> p1 == p2
_ -> False


sameParseError :: ParserError blockId -> ParserError blockId -> Bool
sameParseError e1 e2 = case (e1, e2) of
(DuplicatedSlot _, DuplicatedSlot _) -> True
Oops, something went wrong.

0 comments on commit c9ad5af

Please sign in to comment.
You can’t perform that action at this time.