Skip to content
Permalink
Browse files

Adjust the storage CBOR decoders to provide the input bytes

This is needed to be able to use annotations with the raw input bytes,
without having to use an extra layer of CBOR-in-CBOR encoding (which is
an undersirable change in the on-disk format).

This also makes it easier to have a single definition for the storage
format, rather than different instantiations using different encoders
and decoders that might not agree. This reduces opportunities for
confusion.

The change to readIncrementalOffsets requires a test, since it is quite
subtle and prone to off-by-one errors, but this patch does not include
the tests.
  • Loading branch information...
dcoutts committed Aug 13, 2019
1 parent d1a045a commit 2fce7a2b834b636326add1be19b382880ca94036
@@ -23,6 +23,7 @@ import Control.Monad
import Control.Monad.ST
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import Data.ByteString.Builder.Extra (defaultChunkSize)
import Data.IORef
import Data.Word (Word64)
@@ -147,7 +148,7 @@ readIncremental hasFS@HasFS{..} decoder fp = withLiftST $ \liftST -> do
-- and the error.
readIncrementalOffsets :: forall m h a. (MonadST m, MonadThrow m)
=> HasFS m h
-> (forall s . CBOR.Decoder s a)
-> (forall s . CBOR.Decoder s (LBS.ByteString -> a))
-> FsPath
-> m ([(Word64, (Word64, a))], Maybe ReadIncrementalErr)
-- ^ ((the offset of the start of @a@ in the file,
@@ -161,37 +162,49 @@ readIncrementalOffsets hasFS@HasFS{..} decoder fp = withLiftST $ \liftST ->
-- If the file is empty, we will immediately get "end of input"
then return ([], Nothing)
else liftST (CBOR.deserialiseIncremental decoder) >>=
go liftST h 0 [] Nothing fileSize
go liftST h 0 [] Nothing [] fileSize
where
go :: (forall x. ST s x -> m x)
-> h
-> Word64 -- ^ Offset
-> [(Word64, (Word64, a))] -- ^ Already deserialised (reverse order)
-> Maybe ByteString -- ^ Unconsumed bytes from last time
-> [ByteString] -- ^ Chunks pushed for this item (rev order)
-> Word64 -- ^ Total file size
-> CBOR.IDecode s a
-> CBOR.IDecode s (LBS.ByteString -> a)
-> m ([(Word64, (Word64, a))], Maybe ReadIncrementalErr)
go liftST h offset deserialised mbUnconsumed fileSize dec = case dec of
go liftST h offset deserialised mbUnconsumed bss fileSize dec = case dec of
CBOR.Partial k -> do
-- First use the unconsumed bytes from a previous read before read
-- some more bytes from the file.
bs <- case mbUnconsumed of
Just unconsumed -> return unconsumed
Nothing -> hGetSome h (fromIntegral defaultChunkSize)
dec' <- liftST $ k (checkEmpty bs)
go liftST h offset deserialised Nothing fileSize dec'
go liftST h offset deserialised Nothing (bs:bss) fileSize dec'

CBOR.Done leftover size a -> do
let nextOffset = offset + fromIntegral size
deserialised' = (offset, (fromIntegral size, a)) : deserialised
-- We've been keeping track of the bytes pushed into the decoder
-- for this item so far in bss. Now there's some trailing data to
-- remove and we can get the whole bytes used for this item. We
-- supply the bytes to the final decoded value. This is to support
-- annotating values with their original input bytes.
aBytes = case bss of
[] -> LBS.empty
bs:bss' -> LBS.fromChunks (reverse (bs' : bss'))
where
bs' = BS.take (BS.length bs - BS.length leftover) bs
a' = a aBytes
deserialised' = (offset, (fromIntegral size, a')) : deserialised
case checkEmpty leftover of
Nothing
| nextOffset == fileSize
-- We're at the end of the file, so stop
-> return (reverse deserialised', Nothing)
-- Some more bytes, so try to read the next @a@.
mbLeftover -> liftST (CBOR.deserialiseIncremental decoder) >>=
go liftST h nextOffset deserialised' mbLeftover fileSize
go liftST h nextOffset deserialised' mbLeftover [] fileSize

CBOR.Fail _ _ err -> return (reverse deserialised, Just (ReadFailed err))

@@ -13,6 +13,7 @@ module Ouroboros.Storage.ChainDB.Impl.Args

import Codec.CBOR.Decoding (Decoder)
import Codec.CBOR.Encoding (Encoding)
import qualified Data.ByteString.Lazy as Lazy
import Data.Time.Clock (DiffTime, secondsToDiffTime)

import Control.Monad.Class.MonadSTM
@@ -45,7 +46,7 @@ data ChainDbArgs m blk = forall h1 h2 h3. ChainDbArgs {

-- Decoders
cdbDecodeHash :: forall s. Decoder s (HeaderHash blk)
, cdbDecodeBlock :: forall s. Decoder s blk
, cdbDecodeBlock :: forall s. Decoder s (Lazy.ByteString -> blk)
, cdbDecodeLedger :: forall s. Decoder s (LedgerState blk)
, cdbDecodeChainState :: forall s. Decoder s (ChainState (BlockProtocol blk))

@@ -91,7 +91,7 @@ import qualified Ouroboros.Storage.Util.ErrorHandling as EH
-- | Thin wrapper around the ImmutableDB (opaque type)
data ImmDB m blk = ImmDB {
immDB :: ImmutableDB (HeaderHash blk) m
, decBlock :: forall s. Decoder s blk
, decBlock :: forall s. Decoder s (Lazy.ByteString -> blk)
, encBlock :: blk -> Encoding
, epochInfo :: EpochInfo m
, isEBB :: blk -> Maybe (HeaderHash blk)
@@ -107,7 +107,7 @@ data ImmDB m blk = ImmDB {
-- See also 'defaultArgs'.
data ImmDbArgs m blk = forall h. ImmDbArgs {
immDecodeHash :: forall s. Decoder s (HeaderHash blk)
, immDecodeBlock :: forall s. Decoder s blk
, immDecodeBlock :: forall s. Decoder s (Lazy.ByteString -> blk)
, immEncodeHash :: HeaderHash blk -> Encoding
, immEncodeBlock :: blk -> Encoding
, immErr :: ErrorHandling ImmDB.ImmutableDBError m
@@ -168,7 +168,7 @@ openDB args@ImmDbArgs{..} = do

-- | For testing purposes
mkImmDB :: ImmutableDB (HeaderHash blk) m
-> (forall s. Decoder s blk)
-> (forall s. Decoder s (Lazy.ByteString -> blk))
-> (blk -> Encoding)
-> EpochInfo m
-> (blk -> Maybe (HeaderHash blk))
@@ -508,8 +508,8 @@ epochFileParser ImmDbArgs{..} =
where
-- It is important that we don't first parse all blocks, storing them all
-- in memory, and only /then/ extract the information we need.
decoder' :: forall s. Decoder s (SlotNo, Maybe (HeaderHash blk))
decoder' = (\b -> (blockSlot b, immIsEBB b)) <$> immDecodeBlock
decoder' :: forall s. Decoder s (Lazy.ByteString -> (SlotNo, Maybe (HeaderHash blk)))
decoder' = ((\b -> (blockSlot b, immIsEBB b)) .) <$> immDecodeBlock

-- | Verify that there is at most one EBB in the epoch file and that it
-- lives at the start of the file
@@ -578,17 +578,18 @@ mustExist epochOrSlot Nothing = Left $ ImmDbMissingBlock epochOrSlot
mustExist _ (Just b) = Right $ b

parse :: forall blk.
(forall s. Decoder s blk)
(forall s. Decoder s (Lazy.ByteString -> blk))
-> Either EpochNo SlotNo
-> Lazy.ByteString
-> Either (ChainDbFailure blk) blk
parse dec epochOrSlot =
aux . CBOR.deserialiseFromBytes dec
parse dec epochOrSlot bytes =
aux (CBOR.deserialiseFromBytes dec bytes)
where
aux :: Either CBOR.DeserialiseFailure (Lazy.ByteString, blk)
aux :: Either CBOR.DeserialiseFailure
(Lazy.ByteString, Lazy.ByteString -> blk)
-> Either (ChainDbFailure blk) blk
aux (Right (bs, b))
| Lazy.null bs = Right b
aux (Right (bs, blk))
| Lazy.null bs = Right (blk bytes)
| otherwise = Left $ ImmDbTrailingData epochOrSlot bs
aux (Left err) = Left $ ImmDbParseFailure epochOrSlot err

@@ -87,7 +87,7 @@ import qualified Ouroboros.Storage.VolatileDB as VolDB
-- module.
data VolDB m blk = VolDB {
volDB :: VolatileDB (HeaderHash blk) m
, decBlock :: forall s. Decoder s blk
, decBlock :: forall s. Decoder s (Lazy.ByteString -> blk)
, encBlock :: blk -> Encoding
, err :: ErrorHandling (VolatileDBError (HeaderHash blk)) m
, errSTM :: ThrowCantCatch (VolatileDBError (HeaderHash blk)) (STM m)
@@ -102,7 +102,7 @@ data VolDbArgs m blk = forall h. VolDbArgs {
, volErr :: ErrorHandling (VolatileDBError (HeaderHash blk)) m
, volErrSTM :: ThrowCantCatch (VolatileDBError (HeaderHash blk)) (STM m)
, volBlocksPerFile :: Int
, volDecodeBlock :: forall s. Decoder s blk
, volDecodeBlock :: forall s. Decoder s (Lazy.ByteString -> blk)
, volEncodeBlock :: blk -> Encoding
}

@@ -144,7 +144,7 @@ openDB args@VolDbArgs{..} = do

-- | For testing purposes
mkVolDB :: VolatileDB (HeaderHash blk) m
-> (forall s. Decoder s blk)
-> (forall s. Decoder s (Lazy.ByteString -> blk))
-> (blk -> Encoding)
-> ErrorHandling (VolatileDBError (HeaderHash blk)) m
-> ThrowCantCatch (VolatileDBError (HeaderHash blk)) (STM m)
@@ -432,8 +432,8 @@ blockFileParser :: forall m blk. (MonadST m, MonadThrow m, HasHeader blk)
blockFileParser VolDbArgs{..} =
VolDB.Parser $ Util.CBOR.readIncrementalOffsets volHasFS decoder'
where
decoder' :: forall s. Decoder s (VolDB.BlockInfo (HeaderHash blk))
decoder' = extractInfo <$> volDecodeBlock
decoder' :: forall s. Decoder s (Lazy.ByteString -> VolDB.BlockInfo (HeaderHash blk))
decoder' = (extractInfo .) <$> volDecodeBlock

{-------------------------------------------------------------------------------
Error handling
@@ -471,17 +471,18 @@ mustExist hash Nothing = Left $ VolDbMissingBlock hash
mustExist _ (Just b) = Right $ b

parse :: forall blk.
(forall s. Decoder s blk)
(forall s. Decoder s (Lazy.ByteString -> blk))
-> HeaderHash blk
-> Lazy.ByteString
-> Either (ChainDbFailure blk) blk
parse dec hash =
aux . CBOR.deserialiseFromBytes dec
parse dec hash bytes =
aux (CBOR.deserialiseFromBytes dec bytes)
where
aux :: Either CBOR.DeserialiseFailure (Lazy.ByteString, blk)
aux :: Either CBOR.DeserialiseFailure
(Lazy.ByteString, Lazy.ByteString -> blk)
-> Either (ChainDbFailure blk) blk
aux (Right (bs, b))
| Lazy.null bs = Right b
aux (Right (bs, blk))
| Lazy.null bs = Right (blk bytes)
| otherwise = Left $ VolDbTrailingData hash bs
aux (Left err) = Left $ VolDbParseFailure hash err

0 comments on commit 2fce7a2

Please sign in to comment.
You can’t perform that action at this time.