Skip to content
Permalink
Browse files

Implement ChainDB.streamBlocks

  • Loading branch information...
mrBliss committed May 14, 2019
1 parent e0faafb commit 93d381e7fb285a6071f143bd216ce1d5fee4f8b0
@@ -1,9 +1,9 @@
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}

module Ouroboros.Storage.ChainDB.API (
-- * Main ChainDB API
@@ -271,7 +271,7 @@ instance Eq (Iterator m blk) where
(==) = (==) `on` iteratorId

newtype IteratorId = IteratorId Int
deriving (Show, Eq, Ord)
deriving (Show, Eq, Ord, Enum)

data IteratorResult blk =
IteratorExhausted
@@ -448,6 +448,10 @@ data ChainDbError blk =
-- return it.
NoGenesisBlock

-- | When there is no chain/fork that satisfies the bounds passed to
-- 'streamBlocks'.
| InvalidIteratorRange (StreamFrom blk) (StreamTo blk)

deriving instance StandardHash blk => Show (ChainDbError blk)

instance (StandardHash blk, Typeable blk) => Exception (ChainDbError blk)
@@ -20,8 +20,9 @@ module Ouroboros.Storage.ChainDB.ImmDB (
, getBlock
, getBlob
-- * Streaming
, streamBlocks
, streamBlobs
, streamBlocksFrom
, streamBlocksFromUnchecked
, streamBlocksAfter
-- * Re-exports
, Iterator(..)
, IteratorResult(..)
@@ -36,6 +37,7 @@ import Control.Monad
import Data.Bifunctor (first)
import qualified Data.ByteString as Strict
import qualified Data.ByteString.Lazy as Lazy
import Data.Functor (($>))
import Data.Proxy
import Data.Word
import System.FilePath ((</>))
@@ -45,11 +47,12 @@ import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow

import Ouroboros.Network.Block (ChainHash (..), HasHeader (..),
Point (..), SlotNo)
Point (..), SlotNo, blockPoint)
import Ouroboros.Network.Chain (genesisSlotNo)

import qualified Ouroboros.Consensus.Util.CBOR as Util.CBOR


import Ouroboros.Storage.ChainDB.API (ChainDbError (..),
ChainDbFailure (..), StreamFrom (..))
import Ouroboros.Storage.Common
@@ -211,36 +214,96 @@ getBlob db epochOrSlot = withDB db $ \imm ->
Streaming
-------------------------------------------------------------------------------}

-- | Stream blocks
-- | Stream blocks from the given 'StreamFrom'.
--
-- Checks whether the block at the lower bound has the right hash. If not,
-- 'Nothing' is returned. This check comes at a cost, as to know the hash of a
-- block, it first has be read from disk.
--
-- When passed @'StreamFromInclusive' pt@ where @pt@ refers to Genesis, a
-- 'NoGenesisBlock' exception will be thrown.
streamBlocksFrom :: forall m blk.
( MonadCatch m
, HasHeader blk
)
=> ImmDB m blk
-> StreamFrom blk
-> m (Maybe (ImmDB.Iterator (HeaderHash blk) m blk))
streamBlocksFrom db from = withDB db $ \imm -> case from of
StreamFromGenesis ->
Just <$> stream imm Nothing Nothing
StreamFromExclusive pt@Point { pointHash = BlockHash hash } -> do
it <- stream imm (Just (pointSlot pt, hash)) Nothing
itRes <- iteratorNext it
if blockMatchesPoint pt itRes
then return $ Just it
else iteratorClose it $> Nothing
StreamFromExclusive Point { pointHash = GenesisHash } ->
Just <$> stream imm Nothing Nothing
StreamFromInclusive pt@Point { pointHash = BlockHash hash } -> do
it <- stream imm (Just (pointSlot pt, hash)) Nothing
itRes <- iteratorPeek it
if blockMatchesPoint pt itRes
then return $ Just it
else iteratorClose it $> Nothing
StreamFromInclusive Point { pointHash = GenesisHash } ->
throwM $ NoGenesisBlock @blk
where
stream imm start end = parseIterator db <$>
ImmDB.streamBinaryBlobs imm start end

-- | Check that the result of the iterator is a block that matches the
-- given point.
blockMatchesPoint :: Point blk
-> ImmDB.IteratorResult (HeaderHash blk) blk
-> Bool
blockMatchesPoint pt itRes = case itRes of
ImmDB.IteratorExhausted -> False
ImmDB.IteratorResult _ blk -> blockPoint blk == pt
ImmDB.IteratorEBB _ _ blk -> blockPoint blk == pt

-- | Same as 'streamBlocksFrom', but without checking the hash of the lower
-- bound.
--
-- There is still a cost when the lower bound is 'StreamFromExclusive': the
-- block will be read from disk, but it won't be parsed.
--
-- See also 'streamBlobs'.
streamBlocks :: forall m blk. (MonadCatch m, HasHeader blk)
=> ImmDB m blk
-> StreamFrom blk
-> m (Iterator (HeaderHash blk) m blk)
streamBlocks db low = wrap <$> streamBlobs db low
-- When passed @'StreamFromInclusive' pt@ where @pt@ refers to Genesis, a
-- 'NoGenesisBlock' exception will be thrown.
streamBlocksFromUnchecked :: forall m blk.
( MonadCatch m
, HasHeader blk
)
=> ImmDB m blk
-> StreamFrom blk
-> m (ImmDB.Iterator (HeaderHash blk) m blk)
streamBlocksFromUnchecked db from = withDB db $ \imm -> case from of
StreamFromGenesis ->
stream imm Nothing Nothing
StreamFromExclusive pt@Point { pointHash = BlockHash hash } -> do
it <- ImmDB.streamBinaryBlobs imm (Just (pointSlot pt, hash)) Nothing
void $ iteratorNext it
return $ parseIterator db it
StreamFromExclusive Point { pointHash = GenesisHash } ->
stream imm Nothing Nothing
StreamFromInclusive pt@Point { pointHash = BlockHash hash } ->
stream imm (Just (pointSlot pt, hash)) Nothing
StreamFromInclusive Point { pointHash = GenesisHash } ->
throwM $ NoGenesisBlock @blk
where
wrap :: Iterator (HeaderHash blk) m Strict.ByteString
-> Iterator (HeaderHash blk) m blk
wrap itr = Iterator {
iteratorNext = parseIteratorResult db =<< iteratorNext itr
, iteratorClose = iteratorClose itr
, iteratorID = ImmDB.DerivedIteratorID $ iteratorID itr
}

-- | Stream blobs
streamBlobs :: forall m blk. (MonadCatch m, HasHeader blk)
=> ImmDB m blk
-> StreamFrom blk
-> m (Iterator (HeaderHash blk) m Strict.ByteString)
streamBlobs db low = case low of
StreamFromExclusive pt -> streamBlobsAfter db pt
StreamFromGenesis -> withDB db $ \imm ->
ImmDB.streamBinaryBlobs imm Nothing Nothing
StreamFromInclusive pt -> case pointHash pt of
GenesisHash -> throwM $ NoGenesisBlock @blk
BlockHash h -> withDB db $ \imm ->
ImmDB.streamBinaryBlobs imm (Just (pointSlot pt, h)) Nothing
stream imm start end = parseIterator db <$>
ImmDB.streamBinaryBlobs imm start end

-- | Stream blocks after the given point
--
-- See also 'streamBlobsAfter'.
--
-- PRECONDITION: the exclusive lower bound is part of the ImmutableDB.
streamBlocksAfter :: forall m blk. (MonadCatch m, HasHeader blk)
=> ImmDB m blk
-> Point blk -- ^ Exclusive lower bound
-> m (Iterator (HeaderHash blk) m blk)
streamBlocksAfter db low = parseIterator db <$> streamBlobsAfter db low

-- | Stream blobs after the given point
--
@@ -292,6 +355,18 @@ streamBlobsAfter db low = withDB db $ \imm -> do
IteratorExhausted ->
throwM $ ImmDbUnexpectedIteratorExhausted low

-- | Parse the bytestrings returned by an iterator as blocks.
parseIterator :: (MonadThrow m, HasHeader blk)
=> ImmDB m blk
-> Iterator (HeaderHash blk) m Strict.ByteString
-> Iterator (HeaderHash blk) m blk
parseIterator db itr = Iterator {
iteratorNext = parseIteratorResult db =<< iteratorNext itr
, iteratorPeek = parseIteratorResult db =<< iteratorPeek itr
, iteratorClose = iteratorClose itr
, iteratorID = ImmDB.DerivedIteratorID $ iteratorID itr
}

parseIteratorResult :: (MonadThrow m, HasHeader blk)
=> ImmDB m blk
-> IteratorResult (HeaderHash blk) Strict.ByteString
Oops, something went wrong.

0 comments on commit 93d381e

Please sign in to comment.
You can’t perform that action at this time.