diff --git a/cardano-byron-proxy.cabal b/cardano-byron-proxy.cabal index 96312078..d21bfae0 100644 --- a/cardano-byron-proxy.cabal +++ b/cardano-byron-proxy.cabal @@ -36,6 +36,7 @@ library cardano-sl-crypto, cardano-sl-db, cardano-sl-infra, + cardano-sl-util, cborg, conduit, containers, @@ -57,6 +58,7 @@ library tagged, text, time, + time-units, transformers, typed-protocols, unliftio-core, diff --git a/nix/.stack.nix/cardano-byron-proxy.nix b/nix/.stack.nix/cardano-byron-proxy.nix index 07bb5c9d..2e16ce28 100644 --- a/nix/.stack.nix/cardano-byron-proxy.nix +++ b/nix/.stack.nix/cardano-byron-proxy.nix @@ -30,6 +30,7 @@ (hsPkgs.cardano-sl-crypto) (hsPkgs.cardano-sl-db) (hsPkgs.cardano-sl-infra) + (hsPkgs.cardano-sl-util) (hsPkgs.cborg) (hsPkgs.conduit) (hsPkgs.containers) @@ -51,6 +52,7 @@ (hsPkgs.tagged) (hsPkgs.text) (hsPkgs.time) + (hsPkgs.time-units) (hsPkgs.transformers) (hsPkgs.typed-protocols) (hsPkgs.unliftio-core) diff --git a/src/exec/Byron.hs b/src/exec/Byron.hs index a5a3c879..0b570264 100644 --- a/src/exec/Byron.hs +++ b/src/exec/Byron.hs @@ -13,13 +13,16 @@ module Byron import Control.Concurrent.STM (STM, atomically, check, readTVar, registerDelay, retry) import Control.Exception (IOException, catch, throwIO) -import Control.Monad (forM_, when) +import Control.Monad (when) import Control.Tracer (Tracer, traceWith) import qualified Data.ByteString.Lazy as Lazy (fromStrict) +import Data.Foldable (foldlM) import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as NE +import Data.Maybe (mapMaybe) import qualified Data.Text.Lazy.Builder as Text import Data.Typeable (Typeable) +import Data.Word (Word64) import System.Random (StdGen, getStdGen, randomR) import qualified Cardano.Binary as Binary @@ -28,126 +31,159 @@ import qualified Cardano.Chain.Slotting as Cardano import qualified Pos.Binary.Class as CSL (decodeFull, serialize) import qualified Pos.Chain.Block as CSL (Block, BlockHeader (..), GenesisBlock, - MainBlockHeader, headerHash) + MainBlockHeader, HeaderHash, headerHash) import qualified Pos.Infra.Diffusion.Types as CSL import Ouroboros.Byron.Proxy.Block (Block, ByronBlockOrEBB (..), coerceHashToLegacy, unByronHeaderOrEBB, headerHash) import Ouroboros.Byron.Proxy.Main +import Ouroboros.Consensus.Block (Header) import Ouroboros.Consensus.Ledger.Byron (ByronGiven) +import Ouroboros.Consensus.Protocol.Abstract (SecurityParam (maxRollbacks)) +import Ouroboros.Network.Block (ChainHash (..), Point, pointHash) import qualified Ouroboros.Network.AnchoredFragment as AF import qualified Ouroboros.Network.ChainFragment as CF import Ouroboros.Storage.ChainDB.API (ChainDB) import qualified Ouroboros.Storage.ChainDB.API as ChainDB -- | Download the best available chain from Byron peers and write to the --- database, over and over again. +-- database, over and over again. It will download the best chain from its +-- Byron peers regardless of whether it has a better one in the database. -- --- No exception handling is done. +-- The ByronGiven and Typeable constraints are needed in order to use +-- AF.selectPoints, that's all. download - :: Tracer IO Text.Builder + :: forall cfg void . + ( ByronGiven, Typeable cfg ) + => Tracer IO Text.Builder -> CSL.GenesisBlock -- ^ For use as checkpoint when DB is empty. Also will -- be put into an empty DB. -- Sadly, old Byron net API doesn't give any meaning to an -- empty checkpoint set; it'll just fall over. -> Cardano.EpochSlots + -> SecurityParam -> ChainDB IO (Block cfg) -> ByronProxy - -> (CSL.Block -> Block cfg -> IO ()) - -> IO x -download tracer genesisBlock epochSlots db bp k = getStdGen >>= mainLoop Nothing + -> IO void +download tracer genesisBlock epochSlots securityParam db bp = do + gen <- getStdGen + mTip <- ChainDB.getTipHeader db + tipHash <- case mTip of + Nothing -> do + traceWith tracer "Seeding database with genesis" + genesisBlock' :: Block cfg <- recodeBlockOrFail epochSlots throwIO (Left genesisBlock) + ChainDB.addBlock db genesisBlock' + pure $ CSL.headerHash genesisBlock + Just header -> pure $ coerceHashToLegacy (headerHash header) + mainLoop gen tipHash where + -- The BestTip always gives the longest chain seen so far by Byron. All we + -- need to do here is wait until it actually changes, then try to download. + -- For checkpoints, we just need to choose some good ones up to k blocks + -- back, and everything should work out fine. NB: the checkpoints will only + -- be on the main chain. + -- getCurrentChain will give exactly what we need. waitForNext - :: Maybe (BestTip CSL.BlockHeader) - -> STM (Either (BestTip CSL.BlockHeader) Atom) - waitForNext mBt = do - mBt' <- bestTip bp - if mBt == mBt' - -- If recvAtom retries then the whole STM will retry and we'll check again - -- for the best tip to have changed. - then fmap Right (recvAtom bp) - else case mBt' of - Nothing -> retry - Just bt -> pure (Left bt) - - mainLoop :: Maybe (BestTip CSL.BlockHeader) -> StdGen -> IO x - mainLoop mBt rndGen = do + :: CSL.HeaderHash + -> STM (BestTip CSL.BlockHeader) + waitForNext lastDownloadedHash = do + mBt <- bestTip bp + case mBt of + -- Haven't seen any tips from Byron peers. + Nothing -> retry + Just bt -> + if thisHash == lastDownloadedHash + then retry + else pure bt + where + thisHash = CSL.headerHash (btTip bt) + + mainLoop :: StdGen -> CSL.HeaderHash -> IO void + mainLoop rndGen tipHash = do -- Wait until the best tip has changed from the last one we saw. That can -- mean the header changed and/or the list of peers who announced it -- changed. - next <- atomically $ waitForNext mBt - case next of - -- TODO we don't get to know from where it was received. Problem? Maybe - -- not. - Right atom -> do - traceWith tracer $ mconcat - [ "Got atom: " - , Text.fromString (show atom) - ] - mainLoop mBt rndGen - Left bt -> do - mTip <- ChainDB.getTipHeader db - tipHash <- case mTip of - -- If the DB is empty, we use the genesis hash as our tip, but also - -- we need to put the genesis block into the database, because the - -- Byron peer _will not serve it to us_! - Nothing -> do - traceWith tracer "Seeding database with genesis" - genesisBlock' :: Block cfg <- recodeBlockOrFail epochSlots throwIO (Left genesisBlock) - ChainDB.addBlock db genesisBlock' - pure $ CSL.headerHash genesisBlock - Just header -> pure $ coerceHashToLegacy (headerHash header) - -- Pick a peer from the list of announcers at random and download - -- the chain. - let (peer, rndGen') = pickRandom rndGen (btPeers bt) - remoteTipHash = CSL.headerHash (btTip bt) - traceWith tracer $ mconcat - [ "Attempting to download chain with hash " - , Text.fromString (show remoteTipHash) - , " from " - , Text.fromString (show peer) - ] - -- Try to download the chain, but do not die in case of IOExceptions. - _ <- downloadChain - bp - peer - remoteTipHash - [tipHash] - streamer - `catch` - exceptionHandler - mainLoop (Just bt) rndGen' - - -- If it ends at an EBB, the EBB will _not_ be written. The tip will be the - -- parent of the EBB. - -- This should be OK. - streamer :: CSL.StreamBlocks CSL.Block IO () - streamer = CSL.StreamBlocks + bt <- atomically $ waitForNext tipHash + -- Pick a peer from the list of announcers at random and download + -- the chain. + let (peer, rndGen') = pickRandom rndGen (btPeers bt) + chain <- atomically $ ChainDB.getCurrentChain db + traceWith tracer $ mconcat + [ "Attempting to download chain with hash " + , Text.fromString (show tipHash) + , " from " + , Text.fromString (show peer) + ] + -- Try to download the chain, but do not die in case of IOExceptions. + -- The hash of the last downloaded block is returned, so that on the next + -- recursive call, that chain won't be downloaded again. If there's an + -- exception, or if batch downloaded was used, this hash may not be the + -- hash of the tip of the chain that was to be downloaded. + tipHash' <- downloadChain + bp + peer + (CSL.headerHash (btTip bt)) + (checkpoints chain) + (streamer tipHash) + `catch` + exceptionHandler tipHash + mainLoop rndGen' tipHash' + + checkpoints + :: AF.AnchoredFragment (Header (Block cfg)) + -> [CSL.HeaderHash] + checkpoints = mapMaybe pointToHash . AF.selectPoints (fmap fromIntegral offsets) + + pointToHash :: Point (Header (Block cfg)) -> Maybe CSL.HeaderHash + pointToHash pnt = case pointHash pnt of + GenesisHash -> Nothing + BlockHash hash -> Just $ coerceHashToLegacy hash + + -- Offsets for selectPoints. Defined in the same way as for the Shelley + -- chain sync client: fibonacci numbers including 0 and k. + offsets :: [Word64] + offsets = 0 : foldr includeK ([] {- this is never forced -}) (tail fibs) + + includeK :: Word64 -> [Word64] -> [Word64] + includeK w ws | w >= k = [k] + | otherwise = w : ws + + fibs :: [Word64] + fibs = 1 : 1 : zipWith (+) fibs (tail fibs) + + streamer :: CSL.HeaderHash -> CSL.StreamBlocks CSL.Block IO CSL.HeaderHash + streamer tipHash = CSL.StreamBlocks { CSL.streamBlocksMore = \blocks -> do -- List comes in newest-to-oldest order. let orderedBlocks = NE.toList (NE.reverse blocks) -- The blocks are legacy CSL blocks. To put them into the DB, we must -- convert them to new cardano-ledger blocks. That's done by -- encoding and decoding. - forM_ orderedBlocks $ \blk -> do - blk' <- recodeBlockOrFail epochSlots throwIO blk - ChainDB.addBlock db blk' - k blk blk' - pure streamer - , CSL.streamBlocksDone = pure () + tipHash' <- foldlM commitBlock tipHash orderedBlocks + pure (streamer tipHash') + , CSL.streamBlocksDone = pure tipHash } + commitBlock :: CSL.HeaderHash -> CSL.Block -> IO CSL.HeaderHash + commitBlock _ blk = do + blk' <- recodeBlockOrFail epochSlots throwIO blk + ChainDB.addBlock db blk' + pure $ CSL.headerHash blk + -- No need to trace it; cardano-sl libraries will do that. - exceptionHandler :: IOException -> IO (Maybe ()) - exceptionHandler _ = pure Nothing + exceptionHandler :: CSL.HeaderHash -> IOException -> IO CSL.HeaderHash + exceptionHandler h _ = pure h pickRandom :: StdGen -> NonEmpty t -> (t, StdGen) pickRandom rndGen ne = let (idx, rndGen') = randomR (0, NE.length ne - 1) rndGen in (ne NE.!! idx, rndGen') + k :: Word64 + k = maxRollbacks securityParam + recodeBlockOrFail :: Cardano.EpochSlots -> (forall x . Binary.DecoderError -> IO x) diff --git a/src/exec/DB.hs b/src/exec/DB.hs index 4e28711f..c048e676 100644 --- a/src/exec/DB.hs +++ b/src/exec/DB.hs @@ -27,7 +27,8 @@ import qualified Ouroboros.Consensus.Ledger.Byron as Byron import Ouroboros.Consensus.Ledger.Byron.Config (ByronConfig) import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState) import Ouroboros.Consensus.Protocol (NodeConfig) -import Ouroboros.Consensus.Protocol.Abstract (SecurityParam (..)) +import Ouroboros.Consensus.Protocol.Abstract (SecurityParam (..), + protocolSecurityParam) import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry) import qualified Ouroboros.Consensus.Util.ResourceRegistry as ResourceRegistry import Ouroboros.Storage.ChainDB.API (ChainDB) @@ -60,12 +61,11 @@ withDB -> Tracer IO (ChainDB.TraceEvent (Block ByronConfig)) -> Tracer IO Sqlite.TraceEvent -> ResourceRegistry IO - -> SecurityParam -> NodeConfig (BlockProtocol (Block ByronConfig)) -> ExtLedgerState (Block ByronConfig) -> (Index IO (Header (Block ByronConfig)) -> ChainDB IO (Block ByronConfig) -> IO t) -> IO t -withDB dbOptions dbTracer indexTracer rr securityParam nodeConfig extLedgerState k = do +withDB dbOptions dbTracer indexTracer rr nodeConfig extLedgerState k = do -- The ChainDB/Storage layer will not create a directory for us, we have -- to ensure it exists. System.Directory.createDirectoryIfMissing True (dbFilePath dbOptions) @@ -116,7 +116,7 @@ withDB dbOptions dbTracer indexTracer rr securityParam nodeConfig extLedgerState , cdbValidation = ValidateMostRecentEpoch , cdbBlocksPerFile = 21600 -- ? - , cdbMemPolicy = defaultMemPolicy securityParam + , cdbMemPolicy = defaultMemPolicy (protocolSecurityParam nodeConfig) , cdbDiskPolicy = ledgerDiskPolicy , cdbNodeConfig = nodeConfig diff --git a/src/exec/Main.hs b/src/exec/Main.hs index b82a6d47..1f3852da 100644 --- a/src/exec/Main.hs +++ b/src/exec/Main.hs @@ -19,7 +19,7 @@ import Control.Monad.Trans.Except (runExceptT) import Control.Tracer (Tracer (..), contramap, nullTracer, traceWith) import Data.Functor.Contravariant (Op (..)) import Data.List (intercalate) -import qualified Data.Reflection as Reflection (give) +import qualified Data.Reflection as Reflection (give, given) import Data.String (fromString) import Data.Text (Text) import qualified Data.Text as Text @@ -39,17 +39,15 @@ import qualified Cardano.Chain.Update as Cardano import qualified Cardano.Crypto as Cardano import qualified Pos.Chain.Block as CSL (genesisBlock0) -import qualified Pos.Chain.Block as CSL (recoveryHeadersMessage, streamWindow, - withBlockConfiguration) +import qualified Pos.Chain.Block as CSL (BlockConfiguration, withBlockConfiguration) import qualified Pos.Chain.Delegation as CSL import qualified Pos.Chain.Lrc as CSL (genesisLeaders) import qualified Pos.Chain.Genesis as CSL.Genesis (Config) import qualified Pos.Chain.Genesis as CSL import qualified Pos.Chain.Update as CSL import qualified Pos.Chain.Ssc as CSL (withSscConfiguration) -import qualified Pos.Configuration as CSL (networkConnectionTimeout, withNodeConfiguration) +import qualified Pos.Configuration as CSL (NodeConfiguration, withNodeConfiguration) import qualified Pos.Crypto as CSL -import qualified Pos.Diffusion.Full as CSL (FullDiffusionConfiguration (..)) import qualified Pos.Infra.Network.CLI as CSL (NetworkConfigOpts (..), externalNetworkAddressOption, @@ -59,8 +57,7 @@ import qualified Pos.Infra.Network.CLI as CSL (NetworkConfigOpts (..), import Pos.Infra.Network.Types (NetworkConfig (..)) import qualified Pos.Infra.Network.Policy as Policy import qualified Pos.Launcher.Configuration as CSL (Configuration (..), - ConfigurationOptions (..), - HasConfigurations) + ConfigurationOptions (..)) import qualified Pos.Client.CLI.Options as CSL (configurationOptionsParser) import qualified Pos.Util.Config as CSL (parseYamlConfig) @@ -78,7 +75,7 @@ import Ouroboros.Consensus.Block (GetHeader (Header)) import Ouroboros.Consensus.BlockchainTime (SlotLength (..), SystemStart (..), realBlockchainTime) import Ouroboros.Consensus.Ledger.Byron (ByronGiven) import Ouroboros.Consensus.Ledger.Byron.Config (ByronConfig) -import Ouroboros.Consensus.Protocol.Abstract (SecurityParam (..)) +import Ouroboros.Consensus.Protocol.Abstract (SecurityParam (..), protocolSecurityParam) import Ouroboros.Consensus.Mempool.API (Mempool) import Ouroboros.Consensus.Mempool.TxSeq (TicketNo) import Ouroboros.Consensus.Node (getMempool) @@ -364,48 +361,39 @@ runShelleyClient producerAddrs _ ctable iversions = do ) runByron - :: ( CSL.HasConfigurations, ByronGiven ) + :: ( ByronGiven ) => Tracer IO (Monitoring.LoggerName, Monitoring.Severity, Text.Builder) -> ByronOptions -> CSL.Genesis.Config - -> Cardano.ProtocolMagicId + -> CSL.BlockConfiguration + -> CSL.UpdateConfiguration + -> CSL.NodeConfiguration -> Cardano.EpochSlots + -> SecurityParam -> Index IO (Header (Block ByronConfig)) -> ChainDB IO (Block ByronConfig) -> Mempool IO (Block ByronConfig) TicketNo -> IO () -runByron tracer byronOptions genesisConfig _ epochSlots idx db mempool = do +runByron tracer byronOptions genesisConfig blockConfig updateConfig nodeConfig epochSlots k idx db mempool = do let cslTrace = mkCSLTrace tracer + trace = Trace.appendName "diffusion" cslTrace -- Get the `NetworkConfig` from the options networkConfig <- CSL.intNetworkConfigOpts (Trace.named cslTrace) (boNetworkOptions byronOptions) - let bpc :: ByronProxyConfig - bpc = ByronProxyConfig - { bpcAdoptedBVData = CSL.configBlockVersionData genesisConfig - -- ^ Hopefully that never needs to change. - , bpcEpochSlots = epochSlots - , bpcNetworkConfig = networkConfig - { ncEnqueuePolicy = Policy.defaultEnqueuePolicyRelay - , ncDequeuePolicy = Policy.defaultDequeuePolicyRelay - } - -- ^ These default relay policies should do what we want. - -- If not, could give a --policy option and use yaml files as in - -- cardano-sl - , bpcDiffusionConfig = CSL.FullDiffusionConfiguration - { CSL.fdcProtocolMagic = CSL.configProtocolMagic genesisConfig - , CSL.fdcProtocolConstants = CSL.configProtocolConstants genesisConfig - , CSL.fdcRecoveryHeadersMessage = CSL.recoveryHeadersMessage - , CSL.fdcLastKnownBlockVersion = CSL.lastKnownBlockVersion CSL.updateConfiguration - , CSL.fdcConvEstablishTimeout = CSL.networkConnectionTimeout - -- Diffusion layer logs will have "diffusion" in their names. - , CSL.fdcTrace = Trace.appendName "diffusion" cslTrace - , CSL.fdcStreamWindow = CSL.streamWindow - , CSL.fdcBatchSize = 64 - } - , bpcSendQueueSize = 1 - , bpcRecvQueueSize = 1 + let networkConfig' = networkConfig + { ncEnqueuePolicy = Policy.defaultEnqueuePolicyRelay + , ncDequeuePolicy = Policy.defaultDequeuePolicyRelay } + bpc :: ByronProxyConfig + bpc = configFromCSLConfigs + genesisConfig + blockConfig + updateConfig + nodeConfig + networkConfig' + 64 -- Batch size. + trace genesisBlock = CSL.genesisBlock0 (CSL.configProtocolMagic genesisConfig) (CSL.configGenesisHash genesisConfig) (CSL.genesisLeaders genesisConfig) @@ -421,10 +409,8 @@ runByron tracer byronOptions genesisConfig _ epochSlots idx db mempool = do where byronClient genesisBlock bp = void $ concurrently - (Byron.download textTracer genesisBlock epochSlots db bp k) - (Byron.announce Nothing db bp) - where - k _ _ = pure () + (Byron.download textTracer genesisBlock epochSlots k db bp) + (Byron.announce Nothing db bp) textTracer :: Tracer IO Text.Builder textTracer = contramap @@ -527,9 +513,7 @@ main = do -- Thread registry is needed by ChainDB and by the network protocols. -- I assume it's supposed to be shared? ResourceRegistry.withRegistry $ \rr -> do - let -- TODO Grab this from the newGenesisConfig config - securityParam = SecurityParam 2160 - protocolVersion = Cardano.ProtocolVersion 1 0 0 + let protocolVersion = Cardano.ProtocolVersion 1 0 0 softwareVersion = Cardano.SoftwareVersion (Cardano.ApplicationName (fromString "cardano-byron-proxy")) 2 protocolInfo = protocolInfoByron @@ -547,7 +531,7 @@ main = do slotDuration = SlotLength (fromRational (toRational slotMs / 1000)) systemStart = SystemStart (Cardano.gdStartTime (Cardano.configGenesisData newGenesisConfig)) btime <- realBlockchainTime rr slotDuration systemStart - withDB dbc dbTracer indexTracer rr securityParam nodeConfig extLedgerState $ \idx cdb -> do + withDB dbc dbTracer indexTracer rr nodeConfig extLedgerState $ \idx cdb -> do traceWith (Logging.convertTrace' trace) ("", Monitoring.Info, fromString "Database opened") Shelley.withShelley rr cdb nodeConfig nodeState btime $ \kernel ctable iversions rversions -> do let server = runShelleyServer (soLocalAddress (bpoShelleyOptions bpo)) rr ctable rversions @@ -558,8 +542,11 @@ main = do (Logging.convertTrace' trace) bopts oldGenesisConfig - protocolMagic + (Reflection.given :: CSL.BlockConfiguration) + (Reflection.given :: CSL.UpdateConfiguration) + (Reflection.given :: CSL.NodeConfiguration) epochSlots + (protocolSecurityParam nodeConfig) idx cdb (getMempool kernel) diff --git a/src/lib/Ouroboros/Byron/Proxy/Block.hs b/src/lib/Ouroboros/Byron/Proxy/Block.hs index 6aaf8d9c..329e4e88 100644 --- a/src/lib/Ouroboros/Byron/Proxy/Block.hs +++ b/src/lib/Ouroboros/Byron/Proxy/Block.hs @@ -6,6 +6,11 @@ {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE GADTSyntax #-} +{-| +Module : Ouroboros.Byron.Proxy.Block +Description : Some block-related definitions. +-} + module Ouroboros.Byron.Proxy.Block ( Block , ByronBlockOrEBB (..) @@ -34,15 +39,22 @@ import Ouroboros.Consensus.Ledger.Byron (ByronBlockOrEBB (..), -- Anyone who imports this module will almost certainly want that instance. import Ouroboros.Consensus.Block () +-- | The block type, mutually understandable between Byron and Shelley programs. +-- Suitable for storage in ChainDB and transport over the block fetch protocol. type Block cfg = ByronBlockOrEBB cfg +-- | Part of the Byron Logic layer interface requires making a serialized block, +-- which is just the block's encoding. toSerializedBlock :: Block cfg -> SerializedBlock toSerializedBlock = Serialized . CBOR.toStrictByteString . encodeByronBlock --- TODO: Move these functions to a compatibility module +-- | Convert from a new header hash to a legacy header hash. They are +-- structurally the same, nominally different. coerceHashToLegacy :: Cardano.HeaderHash -> CSL.HeaderHash coerceHashToLegacy (AbstractHash digest) = Legacy.AbstractHash digest +-- | Convert from a legacy header hash to a new header hash. They are +-- structurally the same, nominally different. coerceHashFromLegacy :: CSL.HeaderHash -> Cardano.HeaderHash coerceHashFromLegacy (Legacy.AbstractHash digest) = AbstractHash digest @@ -52,6 +64,8 @@ headerHash hdr = case unByronHeaderOrEBB hdr of Left ebb -> Cardano.boundaryHeaderHashAnnotated ebb Right mh -> Cardano.headerHashAnnotated mh +-- | Gives `Just` with the block's header's hash, whenever it's an epoch +-- boundary block. isEBB :: Block cfg -> Maybe Cardano.HeaderHash isEBB blk = case unByronBlockOrEBB blk of Cardano.ABOBBlock _ -> Nothing diff --git a/src/lib/Ouroboros/Byron/Proxy/Genesis/Convert.hs b/src/lib/Ouroboros/Byron/Proxy/Genesis/Convert.hs index 6dc71709..bdc6d862 100644 --- a/src/lib/Ouroboros/Byron/Proxy/Genesis/Convert.hs +++ b/src/lib/Ouroboros/Byron/Proxy/Genesis/Convert.hs @@ -40,6 +40,9 @@ import qualified Pos.Crypto as CSL convertHash :: CSL.AbstractHash algo a -> Cardano.AbstractHash algo b convertHash (CSL.AbstractHash it) = Cardano.AbstractHash it +convertEpochSlots :: CSL.SlotCount -> Cardano.EpochSlots +convertEpochSlots = Cardano.EpochSlots . fromIntegral + convertRequiresNetworkMagic :: CSL.RequiresNetworkMagic -> Cardano.RequiresNetworkMagic convertRequiresNetworkMagic rnm = case rnm of CSL.RequiresNoMagic -> Cardano.RequiresNoMagic diff --git a/src/lib/Ouroboros/Byron/Proxy/Main.hs b/src/lib/Ouroboros/Byron/Proxy/Main.hs index 81443a89..50de4f14 100644 --- a/src/lib/Ouroboros/Byron/Proxy/Main.hs +++ b/src/lib/Ouroboros/Byron/Proxy/Main.hs @@ -9,7 +9,23 @@ {-# OPTIONS_GHC "-fwarn-incomplete-patterns" #-} -module Ouroboros.Byron.Proxy.Main where +{-| +Module : Ouroboros.Byron.Proxy.Main +Description : Definition of the Byron proxy interface and implementation. + +The 'ByronProxy' type gives the interface to a Byron network. +'withByronProxy' will create one, allowing the caller to see the tips-of-chains +of its Byron peers, to download one of these chains, and to announce its own +tip-of-chain header to its peers. +-} + +module Ouroboros.Byron.Proxy.Main + ( ByronProxyConfig (..) + , configFromCSLConfigs + , ByronProxy (..) + , withByronProxy + , BestTip (..) + ) where import Codec.CBOR.Decoding (Decoder) import qualified Codec.CBOR.Write as CBOR (toLazyByteString) @@ -17,17 +33,12 @@ import Control.Arrow (first) import Control.Concurrent.Async (concurrently, race) import Control.Concurrent.STM (STM, atomically, check) -import Control.Concurrent.STM.TBQueue (TBQueue, - newTBQueueIO, - readTBQueue, - writeTBQueue) import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO, readTVar) import Control.Exception (Exception, bracket, throwIO) import Control.Lens ((^.)) -import Control.Monad (forM, join, void, - when) +import Control.Monad (forM, join, when) import Control.Monad.Trans.Class (lift) import Control.Tracer (Tracer, traceWith) import Data.Conduit (ConduitT, await, @@ -47,7 +58,8 @@ import Data.Tagged (Tagged (..), tagWith, untag) import Data.Text (Text) import qualified Data.Text.Lazy.Builder as Text (Builder) -import Numeric.Natural (Natural) +import Data.Word (Word32) +import Data.Time.Units (fromMicroseconds) import qualified Cardano.Binary as Binary import Cardano.BM.Data.Severity (Severity (..)) @@ -64,7 +76,8 @@ import qualified Pos.Chain.Block as Byron.Legacy (Bloc MainBlockHeader, getBlockHeader, headerHash) -import Pos.Chain.Delegation (ProxySKHeavy) +import qualified Pos.Chain.Block as CSL (BlockConfiguration (..)) +import qualified Pos.Chain.Genesis as CSL.Genesis import Pos.Chain.Ssc (MCCommitment (..), MCOpening (..), MCShares (..), @@ -75,7 +88,10 @@ import Pos.Chain.Txp (TxAux (..), TxId, import Pos.Chain.Update (BlockVersionData, UpdateProposal (..), UpdateVote (..)) +import qualified Pos.Chain.Update as CSL (UpdateConfiguration, + lastKnownBlockVersion) import Pos.Communication (NodeId) +import qualified Pos.Configuration as CSL (NodeConfiguration (..)) import Pos.Core (HasDifficulty (difficultyL), addressHash, getEpochOrSlot) @@ -94,6 +110,9 @@ import Pos.Infra.DHT.Real.Param (KademliaParams) import Pos.Infra.Network.Types (NetworkConfig (..)) import Pos.Logic.Types hiding (streamBlocks) import qualified Pos.Logic.Types as Logic +import Pos.Util.Trace (Trace) +import Pos.Util.Trace.Named (LogNamed) +import qualified Pos.Util.Wlog as Wlog (Severity) import Ouroboros.Byron.Proxy.Block (Block, Header, coerceHashFromLegacy, @@ -102,6 +121,7 @@ import Ouroboros.Byron.Proxy.Block (Block, Header, toSerializedBlock) import Ouroboros.Byron.Proxy.Index.Types (Index) import qualified Ouroboros.Byron.Proxy.Index.Types as Index +import Ouroboros.Byron.Proxy.Genesis.Convert (convertEpochSlots) import Ouroboros.Consensus.Block (getHeader) import Ouroboros.Consensus.Ledger.Byron (byronTx, byronTxId, encodeByronBlock, @@ -122,23 +142,60 @@ import qualified Ouroboros.Network.TxSubmission.Outbound as Tx.Out import Ouroboros.Storage.ChainDB.API (ChainDB) import qualified Ouroboros.Storage.ChainDB.API as ChainDB --- | Definitions required in order to run the Byron proxy. +-- | Configuration used to get a 'ByronProxy'. +-- 'configFromGenesis' will make one from a genesis configuration. data ByronProxyConfig = ByronProxyConfig - { -- TODO see if we can't derive the block version data from the database. - bpcAdoptedBVData :: !BlockVersionData + { -- | The Byron Logic layer needs the BlockVersionData. Technically this + -- is not constant over a whole blockchain: it can be changed by an update. + -- However, in cardano-byron-proxy it is only used to determine message + -- size limits on Byron protocols, so as long as header, block, and tx + -- size limits do not change, it's ok. + -- + -- To get a BlockVersionData, + -- Pos.Chain.Genesis.Config.configBlockVersionData can be used on a Byron + -- genesis configuration. + bpcAdoptedBVData :: !BlockVersionData -- | Number fo slots per epoch. Assumed to never change. - , bpcEpochSlots :: !EpochSlots - , bpcNetworkConfig :: !(NetworkConfig KademliaParams) - , bpcDiffusionConfig :: !FullDiffusionConfiguration - -- | Size of the send queue. Sending atomic (non-block data) to Byron - -- will block if this queue is full. - , bpcSendQueueSize :: !Natural - -- | Size of the recv queue. - -- TODO should probably let it be unlimited, since there is no backpressure - -- in the Byron diffusion layer anyway, so failing to clear this queue - -- will still cause a memory leak. - , bpcRecvQueueSize :: !Natural + , bpcEpochSlots :: !EpochSlots + -- | Byron network configuration. To get one, consider using + -- Pos.Infra.Network.CLI.intNetworkConfigOpts + , bpcNetworkConfig :: !(NetworkConfig KademliaParams) + -- | Configuration for the Byron Diffusion layer (i.e. the network part). + , bpcDiffusionConfig :: !FullDiffusionConfiguration + } + +-- | Make a 'ByronProxyConfig' using cardano-sl configuration types. +-- This will ensure that the proxy is configured in such a way that it is +-- able to communicate with a Byron peer using the same config. +configFromCSLConfigs + :: CSL.Genesis.Config + -> CSL.BlockConfiguration + -> CSL.UpdateConfiguration + -> CSL.NodeConfiguration + -> NetworkConfig KademliaParams + -> Word32 -- ^ Batch size for block streaming. + -> Trace IO (LogNamed (Wlog.Severity, Text)) + -> ByronProxyConfig +configFromCSLConfigs genesisConfig blockConfig updateConfig nodeConfig networkConfig batchSize trace = ByronProxyConfig + { bpcAdoptedBVData = CSL.Genesis.configBlockVersionData genesisConfig + , bpcEpochSlots = epochSlots + , bpcNetworkConfig = networkConfig + , bpcDiffusionConfig = FullDiffusionConfiguration + { fdcProtocolMagic = CSL.Genesis.configProtocolMagic genesisConfig + , fdcProtocolConstants = CSL.Genesis.configProtocolConstants genesisConfig + -- fromIntergal :: Int -> Word + , fdcRecoveryHeadersMessage = fromIntegral $ CSL.ccRecoveryHeadersMessage blockConfig + , fdcLastKnownBlockVersion = CSL.lastKnownBlockVersion updateConfig + , fdcConvEstablishTimeout = timeout + -- fromIntegral :: Int -> Word32 + , fdcStreamWindow = fromIntegral $ CSL.ccStreamWindow blockConfig + , fdcBatchSize = batchSize + , fdcTrace = trace + } } + where + epochSlots = convertEpochSlots (CSL.Genesis.configEpochSlots genesisConfig) + timeout = fromMicroseconds (1000 * fromIntegral (CSL.ccNetworkConnectionTimeout nodeConfig)) -- | Interface presented by the Byron proxy. data ByronProxy = ByronProxy @@ -156,26 +213,18 @@ data ByronProxy = ByronProxy -- Those data can be taken from 'bestTip', but of course may no longer be -- correct at the time of the call. -- - -- TODO deal with the `Maybe t` in this type. Should it be there? It's - -- used to indicate whether streaming is available, for fallback to - -- batching. - -- -- FIXME should not use legacy header hash type here. , downloadChain :: forall t . NodeId -> Byron.Legacy.HeaderHash -- of tip to request -> [Byron.Legacy.HeaderHash] -- of checkpoints -> StreamBlocks Byron.Legacy.Block IO t - -> IO (Maybe t) + -> IO t -- | Make Byron peers aware of this chain. It's expected that they will - -- request it, which will be served by some database, so the blocks for - -- this chain should be in it. + -- request it if it's better than their own, and the download will be + -- served by a backing ChainDB, so the blocks for this chain should be in + -- it. , announceChain :: Byron.Legacy.MainBlockHeader -> IO () - -- | Take the next atom from the Byron network (non-block data). - , recvAtom :: STM Atom - -- | Send an atom to the Byron network. It's in STM because the send is - -- performed asynchronously. - , sendAtom :: Atom -> STM () } taggedKeyValNoOp @@ -346,44 +395,6 @@ fromByronTxAux txAux = case Binary.decodeFullAnnotatedBytes "TxAux" decoder cslB decoder :: Decoder s (Cardano.ATxAux Binary.ByteSpan) decoder = Binary.fromCBOR --- | Atoms are data which are not blocks. -data Atom where - Transaction :: TxMsgContents -> Atom - UpdateProposal :: (UpdateProposal, [UpdateVote]) -> Atom - UpdateVote :: UpdateVote -> Atom - Commitment :: MCCommitment -> Atom - Opening :: MCOpening -> Atom - Shares :: MCShares -> Atom - VssCertificate :: MCVssCertificate -> Atom - Delegation :: ProxySKHeavy -> Atom - -deriving instance Show Atom - --- To get atoms from Shelley to Byron we put them into the pool and then --- send them using the diffusion layer. --- --- To get them from Byron to Shelley we use the relay mechanism built in to --- the diffusion layer: it will put the thing into the relevant pool, then --- make and deposit an `Atom` into a queue. - -sendAtomToByron :: Diffusion IO -> Atom -> IO () -sendAtomToByron diff atom = case atom of - - Transaction tx -> void $ sendTx diff (getTxMsgContents tx) - - UpdateProposal (up, uvs) -> sendUpdateProposal diff (hash up) up uvs - UpdateVote uv -> sendVote diff uv - - Opening (MCOpening sid opening) -> sendSscOpening diff sid opening - Shares (MCShares sid shares) -> sendSscShares diff sid shares - VssCertificate (MCVssCertificate vc) -> sendSscCert diff (getCertId vc) vc - Commitment (MCCommitment commitment) -> sendSscCommitment diff sid commitment - where - (pk, _, _) = commitment - sid = addressHash pk - - Delegation psk -> sendPskHeavy diff psk - -- | Information about the best tip from the Byron network. data BestTip tip = BestTip { -- | This tip ... @@ -713,13 +724,6 @@ withByronProxy trace bpc idx db mempool k = do -- never go from `Just` to `Nothing`, it only starts as `Nothing`. tipsTVar :: TVar (Maybe (BestTip Byron.Legacy.BlockHeader)) <- newTVarIO Nothing - -- Send and receive bounded queues for atomic data (non-block). - -- The receive queue is populated by the relay system by way of the logic - -- layer. The send queue is emptied by a thread spawned here which uses - -- the diffusion layer to send (ultimately by way of the outbound queue). - atomRecvQueue :: TBQueue Atom <- newTBQueueIO (bpcRecvQueueSize bpc) - atomSendQueue :: TBQueue Atom <- newTBQueueIO (bpcSendQueueSize bpc) - -- Associates Byron transaction identifiers (their hashes) with Shelley -- mempool ticket numbers. Needed because the Byron relay system is -- random access by TxId. @@ -729,10 +733,20 @@ withByronProxy trace bpc idx db mempool k = do let byronProxy :: Diffusion IO -> ByronProxy byronProxy diff = ByronProxy { bestTip = takeBestTip - , downloadChain = streamBlocks diff + , downloadChain = \peer tipHash checkpointHashes sbK -> do + streamResult <- streamBlocks diff peer tipHash checkpointHashes sbK + case streamResult of + Just t -> pure t + -- Nothing means streaming is not supported. Fall back to + -- batching. This will do one batch then finish the + -- StreamBlocks callback. That may not give all of the + -- blocks requested. + Nothing -> do + batchResult <- getBlocks diff peer tipHash checkpointHashes + case getOldestFirst batchResult of + [] -> streamBlocksDone sbK + (b : bs) -> streamBlocksMore sbK (b NE.:| bs) >>= streamBlocksDone , announceChain = announceBlockHeader diff - , recvAtom = readTBQueue atomRecvQueue - , sendAtom = writeTBQueue atomSendQueue } epochSlots :: EpochSlots @@ -741,19 +755,10 @@ withByronProxy trace bpc idx db mempool k = do takeBestTip :: STM (Maybe (BestTip Byron.Legacy.BlockHeader)) takeBestTip = readTVar tipsTVar - -- FIXME this probably isn't necessary anymore. - -- Transactions are sent indirectly, by adding them to the mempool - sendingThread :: forall x . Diffusion IO -> IO x - sendingThread diff = do - atom <- atomically $ readTBQueue atomSendQueue - sendAtomToByron diff atom - sendingThread diff - background :: forall x . Diffusion IO -> IO x background diff = fmap (\(x, _) -> x) $ - concurrently (sendingThread diff) $ - concurrently (sendTxsFromMempool mempool diff) - (updateMempoolIdMap mempool txTicketMapVar) + concurrently (sendTxsFromMempool mempool diff) + (updateMempoolIdMap mempool txTicketMapVar) blockDecodeError :: forall x . Text -> IO x blockDecodeError text = throwIO $ MalformedBlock text