Skip to content
Permalink
Browse files

TxSubmission: switch client and server

  • Loading branch information...
coot committed May 14, 2019
1 parent e05bf98 commit e392b7d3423a3d7ce5c1b449927d8a636ae2fc81
@@ -1,138 +1,61 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.Protocol.TxSubmission.Client
( TxSubmissionClientPipelined (..)
, TxSubmissionSender (..)
, Collection
, txSubmissionClientPeerPipelined
, txSubmissionSender
)
where
module Ouroboros.Network.Protocol.TxSubmission.Client where

import Data.Word (Word16)

import Network.TypedProtocol.Core
import Network.TypedProtocol.Pipelined

import Ouroboros.Network.Protocol.TxSubmission.Type

--
-- Pipelined client
--

-- | This type represents what @'PeerReceiver'@ will collect, since
-- tx-submission client can pipeline both @'SendMsgGetHash'@ and @'MsgGetTx'@
-- we need a sum of both @[hash]@ and @tx@ types.
-- |
-- Client side of the tx-submission protocol.
--
type Collection hash tx = Either [hash] tx
newtype TxSubmissionClient hash tx m a = TxSubmissionClient {
runTxSubmissionClient :: m (TxSubmissionHandlers hash tx m a)
}

data TxSubmissionClientPipelined hash tx m a where
TxSubmissionClientPipelined
:: TxSubmissionSender hash tx Z (Collection hash tx) m a
-> TxSubmissionClientPipelined hash tx m a
instance Functor m => Functor (TxSubmissionClient hash tx m) where
fmap f (TxSubmissionClient msender) = TxSubmissionClient ((fmap . fmap) f msender)

-- |
-- @'TxSubmissionSender'@ can be transformed into a pipelined @'PeerSender'@.
-- We are only pipelining trasactions requests, e.g. @'MsgTx'@.
--
-- @c@ variable will be instantiated with @'Collect' hash tx@.
data TxSubmissionSender hash tx (n :: N) c m a where

-- |
-- Ask the server for a list of transaction hashes by sending @'MsgGetHashes'@.
--
SendMsgGetHashes
:: Word16
-> ([hash] -> m (TxSubmissionSender hash tx n c m a))
-> TxSubmissionSender hash tx n c m a

-- |
-- Piplined version of @'SendMsgGetHashes'@.
--
SendMsgGetHashesPipelined
:: Word16
-> m (TxSubmissionSender hash tx (S n) c m a)
-> TxSubmissionSender hash tx n c m a

-- |
-- Possibly pipelined @'MsgGetTx'@..
--
SendMsgGetTx
:: hash
-> m (TxSubmissionSender hash tx (S n) c m a)
-> TxSubmissionSender hash tx n c m a
-- (Recursive) handlers of the tx-submission client
--
data TxSubmissionHandlers hash tx m a = TxSubmissionHandlers {
getHashes :: Word16 -> m ([hash], TxSubmissionHandlers hash tx m a),
getTx :: hash -> m (tx, TxSubmissionHandlers hash tx m a),
done :: a
}

-- |
-- Collect pipelined responses, either @'MsgSendHashes'@ or @'MsgTx'@.
--
CollectPipelined
:: Maybe (TxSubmissionSender hash tx (S n) c m a)
-> (c -> m (TxSubmissionSender hash tx n c m a))
-> TxSubmissionSender hash tx (S n) c m a
instance Functor m => Functor (TxSubmissionHandlers hash tx m) where
fmap f TxSubmissionHandlers {getHashes, getTx, done} = TxSubmissionHandlers {
getHashes = fmap (\(hs, next) -> (hs, fmap f next)) . getHashes,
getTx = fmap (\(tx, next) -> (tx, fmap f next)) . getTx,
done = f done
}

-- |
-- Terminate the tx-submission protocol.
--
SendMsgDone
:: a
-> TxSubmissionSender hash tx Z c m a

-- |
-- Transform a @'TxSubmissionClientPipelined'@ into a @'PeerPipelined'@ which
-- pipelines @'MsgGetTx'@ messages.
-- A non-pipelined @'Peer'@ representing the @'TxSubmissionClient'@.
--
txSubmissionClientPeerPipelined
:: forall hash tx m a.
Functor m
=> TxSubmissionClientPipelined hash tx m a
-> PeerPipelined (TxSubmission hash tx) AsClient StIdle m a
txSubmissionClientPeerPipelined (TxSubmissionClientPipelined sender) =
PeerPipelined $ txSubmissionSender sender

-- |
-- The @'PeerSender'@ which asks for available transaction hashes and pipelines
-- @'MsgGetTx'@s.
--
txSubmissionSender
:: forall hash tx (n :: N) m a.
Functor m
=> TxSubmissionSender hash tx n (Collection hash tx) m a
-> PeerSender (TxSubmission hash tx) AsClient StIdle n (Collection hash tx) m a

txSubmissionSender (SendMsgDone a) =
SenderYield (ClientAgency TokIdle) MsgDone (SenderDone TokDone a)

txSubmissionSender (SendMsgGetHashes n next) =
SenderPipeline
(ClientAgency TokIdle)
(MsgGetHashes n)
(ReceiverAwait (ServerAgency TokSendHashes) $ \msg -> case msg of
MsgSendHashes hs -> ReceiverDone (Left hs))
(SenderCollect Nothing
$ \c -> case c of
Left hs -> SenderEffect (txSubmissionSender <$> (next hs))
_ -> error "txSubmissionSender: impossible happend")

txSubmissionSender (SendMsgGetHashesPipelined n next) =
SenderPipeline
(ClientAgency TokIdle)
(MsgGetHashes n)
(ReceiverAwait (ServerAgency TokSendHashes) $ \msg -> case msg of
MsgSendHashes hs -> ReceiverDone (Left hs))
(SenderEffect $ txSubmissionSender <$> next)

txSubmissionSender (SendMsgGetTx hash next) =
SenderPipeline
(ClientAgency TokIdle)
(MsgGetTx hash)
(ReceiverAwait (ServerAgency TokSendTx) $ \msg -> case msg of
MsgTx tx -> ReceiverDone (Right tx))
(SenderEffect $ txSubmissionSender <$> next)

txSubmissionSender (CollectPipelined next collect) =
SenderCollect
(txSubmissionSender <$> next)
(SenderEffect . fmap txSubmissionSender . collect)
txSubmissionClientPeer
:: forall hash tx m a. Monad m
=> TxSubmissionClient hash tx m a
-> Peer (TxSubmission hash tx) AsClient StIdle m a
txSubmissionClientPeer (TxSubmissionClient mclient) = Effect $ go <$> mclient
where
go :: TxSubmissionHandlers hash tx m a
-> Peer (TxSubmission hash tx) AsClient StIdle m a
go TxSubmissionHandlers {getHashes, getTx, done} =
Await (ServerAgency TokIdle) $ \msg -> case msg of
MsgGetHashes n -> Effect $ do
(hs, next) <- getHashes n
return $ Yield (ClientAgency TokSendHashes) (MsgSendHashes hs) (go next)
MsgGetTx hash -> Effect $ do
(tx, next) <- getTx hash
return $ Yield (ClientAgency TokSendTx) (MsgTx tx) (go next)
MsgDone -> Done TokDone done
@@ -37,15 +37,15 @@ codecTxSubmission = mkCodecCborLazyBS encode decode
PeerHasAgency pr st
-> Message (TxSubmission hash tx) st st'
-> CBOR.Encoding
encode (ClientAgency TokIdle) (MsgGetHashes n) =
encode (ServerAgency TokIdle) (MsgGetHashes n) =
CBOR.encodeListLen 2 <> CBOR.encodeWord 0 <> CBOR.encode n
encode (ServerAgency TokSendHashes) (MsgSendHashes hs) =
encode (ClientAgency TokSendHashes) (MsgSendHashes hs) =
CBOR.encodeListLen 2 <> CBOR.encodeWord 1 <> CBOR.encode hs
encode (ClientAgency TokIdle) (MsgGetTx hash) =
encode (ServerAgency TokIdle) (MsgGetTx hash) =
CBOR.encodeListLen 2 <> CBOR.encodeWord 2 <> CBOR.encode hash
encode (ServerAgency TokSendTx) (MsgTx tx) =
encode (ClientAgency TokSendTx) (MsgTx tx) =
CBOR.encodeListLen 2 <> CBOR.encodeWord 3 <> CBOR.encode tx
encode (ClientAgency TokIdle) MsgDone =
encode (ServerAgency TokIdle) MsgDone =
CBOR.encodeListLen 1 <> CBOR.encodeWord 4

decode :: forall (pr :: PeerRole) s (st :: TxSubmission hash tx).
@@ -55,12 +55,12 @@ codecTxSubmission = mkCodecCborLazyBS encode decode
len <- CBOR.decodeListLen
key <- CBOR.decodeWord
case (stok, len, key) of
(ClientAgency TokIdle, 2, 0) -> SomeMessage . MsgGetHashes <$> CBOR.decode
(ServerAgency TokSendHashes, 2, 1) -> SomeMessage . MsgSendHashes <$> CBOR.decode
(ClientAgency TokIdle, 2, 2) -> SomeMessage . MsgGetTx <$> CBOR.decode
(ServerAgency TokSendTx, 2, 3) -> SomeMessage . MsgTx <$> CBOR.decode
(ClientAgency TokIdle, 1, 4) -> return (SomeMessage MsgDone)
(ServerAgency TokIdle, 2, 0) -> SomeMessage . MsgGetHashes <$> CBOR.decode
(ClientAgency TokSendHashes, 2, 1) -> SomeMessage . MsgSendHashes <$> CBOR.decode
(ServerAgency TokIdle, 2, 2) -> SomeMessage . MsgGetTx <$> CBOR.decode
(ClientAgency TokSendTx, 2, 3) -> SomeMessage . MsgTx <$> CBOR.decode
(ServerAgency TokIdle, 1, 4) -> return (SomeMessage MsgDone)

(ClientAgency TokIdle, _, _) -> fail "codecTxSubmission.Idle: unexpected key"
(ServerAgency TokSendHashes, _, _) -> fail "codecTxSubmission.SendHashes: unexpected key"
(ServerAgency TokSendTx, _, _) -> fail "codecTxSubmission.SendTx: unexpected key"
(ServerAgency TokIdle, _, _) -> fail "codecTxSubmission.Idle: unexpected key"
(ClientAgency TokSendHashes, _, _) -> fail "codecTxSubmission.SendHashes: unexpected key"
(ClientAgency TokSendTx, _, _) -> fail "codecTxSubmission.SendTx: unexpected key"
@@ -13,11 +13,11 @@ import Ouroboros.Network.Protocol.TxSubmission.Server
direct
:: forall hash tx m a b.
Monad m
=> TxSubmissionClientPipelined hash tx m a
-> TxSubmissionServer hash tx m b
=> TxSubmissionServerPipelined hash tx m a
-> TxSubmissionClient hash tx m b
-> m (a, b)
direct (TxSubmissionClientPipelined client) (TxSubmissionServer mserver) =
mserver >>= directSender EmptyQ client
direct (TxSubmissionServerPipelined server) (TxSubmissionClient mclient) =
mclient >>= directSender EmptyQ server
where
directSender :: Queue n (Either [hash] tx)
-> TxSubmissionSender hash tx n (Collection hash tx) m a
@@ -16,22 +16,22 @@ import Ouroboros.Network.Protocol.TxSubmission.Server


-- |
-- An example @'TxSubmissionServer'@ which sends trasactions from a fixed pool
-- An example @'TxSubmissionClient'@ which sends trasactions from a fixed pool
-- of transactions. It returns a list of @tx@ sent back to the client.
--
-- It is only ment to be used in test. The server will error if a client will
-- ask for a transaction which is not in the pool or if a client will ask for
-- It is only ment to be used in test. The client will error if a server will
-- ask for a transaction which is not in the pool or if a server will ask for
-- the same trasaction twice.
--
txSubmissionServerFixed
txSubmissionClientFixed
:: forall hash tx m.
( Applicative m
, Eq hash
)
=> [tx]
-> (tx -> hash)
-> TxSubmissionServer hash tx m [tx]
txSubmissionServerFixed txs0 txHash = TxSubmissionServer (pure $ handlers [] txs0)
-> TxSubmissionClient hash tx m [tx]
txSubmissionClientFixed txs0 txHash = TxSubmissionClient (pure $ handlers [] txs0)
where
handlers :: [tx] -> [tx] -> TxSubmissionHandlers hash tx m [tx]
handlers !sent txs =
@@ -57,16 +57,16 @@ data ReqOrResp hash tx
deriving (Eq, Show)

-- |
-- A non pipelined tx-submission client. It send @'MsgGetHashes'@ awaits for
-- A non pipelined tx-submission server. It send @'MsgGetHashes'@ awaits for
-- the response and then requests each transaction awaiting for it before
-- sending the next @'MsgGetTx'@ request.
--
txSubmissionClient
txSubmissionServer
:: forall hash tx m.
Applicative m
=> [Word16] -- ^ each element corresponds to a single @'MsgGetHashes'@ request
-> TxSubmissionClientPipelined hash tx m [ReqOrResp hash tx]
txSubmissionClient ns0 = TxSubmissionClientPipelined (sender [] ns0)
-> TxSubmissionServerPipelined hash tx m [ReqOrResp hash tx]
txSubmissionServer ns0 = TxSubmissionServerPipelined (sender [] ns0)
where
sender
:: [ReqOrResp hash tx]
@@ -89,18 +89,18 @@ txSubmissionClient ns0 = TxSubmissionClientPipelined (sender [] ns0)
Right tx -> pure $ getHashes (RespTx tx:ReqTx h:txs) ns hs )

-- |
-- A piplined tx-submission client that sends @'MsgGetTx'@ eagerly but always tries to
-- A piplined tx-submission server that sends @'MsgGetTx'@ eagerly but always tries to
-- collect any replies as soon as they are available. This keeps pipelining to
-- bare minimum, and gives maximum choice to the environment (drivers).
--
-- It returns the interleaving of requests and received trasactions.
--
txSubmissionClientPipelinedMin
txSubmissionServerPipelinedMin
:: forall hash tx m.
Applicative m
=> [Word16]
-> TxSubmissionClientPipelined hash tx m [ReqOrResp hash tx]
txSubmissionClientPipelinedMin ns0 = TxSubmissionClientPipelined (sender [] ns0)
-> TxSubmissionServerPipelined hash tx m [ReqOrResp hash tx]
txSubmissionServerPipelinedMin ns0 = TxSubmissionServerPipelined (sender [] ns0)
where
sender
:: [ReqOrResp hash tx]
@@ -119,13 +119,13 @@ txSubmissionClientPipelinedMin ns0 = TxSubmissionClientPipelined (sender [] ns0)
getHashes txs ns [] (Succ o) = CollectPipelined
Nothing
(\c -> case c of
Left _ -> error "txSubmissionClientPipelinedMin"
Left _ -> error "txSubmissionServerPipelinedMin"
Right tx -> pure $ getHashes (RespTx tx:txs) ns [] o)

getHashes txs ns hs@(h:hs') (Succ o) = CollectPipelined
(Just $ requestMoreTx txs ns h hs' (Succ o))
(\c -> case c of
Left _ -> error "txSubmissionClientPiplinedMin"
Left _ -> error "txSubmissionServerPiplinedMin"
Right tx -> pure $ getHashes (RespTx tx:txs) ns hs o)

getHashes txs ns (h:hs) Zero = requestMoreTx txs ns h hs Zero
@@ -144,16 +144,16 @@ txSubmissionClientPipelinedMin ns0 = TxSubmissionClientPipelined (sender [] ns0)


-- |
-- An example tx-submission client which sends @'MsgGetHashes'@ awaits for the
-- An example tx-submission server which sends @'MsgGetHashes'@ awaits for the
-- response, and then piplines requests for each received hash. The responses
-- are collected before next @'MsgGetHashes'@.
--
txSubmissionClientPipelinedMax
txSubmissionServerPipelinedMax
:: forall hash tx m.
Applicative m
=> [Word16] -- ^ each element corresponds to a single @'MsgGetHashes'@ request
-> TxSubmissionClientPipelined hash tx m [ReqOrResp hash tx]
txSubmissionClientPipelinedMax ns0 = TxSubmissionClientPipelined (sender [] ns0)
-> TxSubmissionServerPipelined hash tx m [ReqOrResp hash tx]
txSubmissionServerPipelinedMax ns0 = TxSubmissionServerPipelined (sender [] ns0)
where
sender
:: [ReqOrResp hash tx]
@@ -177,20 +177,20 @@ txSubmissionClientPipelinedMax ns0 = TxSubmissionClientPipelined (sender [] ns0)
Right tx -> pure $ getHashes (RespTx tx:txs) ns [] o)

-- |
-- Like @'txSubmissionClientPipelinedMin'@ but it also pipelines @'MsgGetHashes'@.
-- Like @'txSubmissionServerPipelinedMin'@ but it also pipelines @'MsgGetHashes'@.
-- We pipeline the request for more hashes whenever we already get half of the
-- transactions.
--
-- It returns the interleaving of requests and received trasactions.
--
txSubmissionClientPipelinedAllMin
txSubmissionServerPipelinedAllMin
:: forall hash tx m.
( Applicative m
, Eq hash
)
=> [Word16]
-> TxSubmissionClientPipelined hash tx m [ReqOrResp hash tx]
txSubmissionClientPipelinedAllMin ns0 = TxSubmissionClientPipelined (sender [] ns0)
-> TxSubmissionServerPipelined hash tx m [ReqOrResp hash tx]
txSubmissionServerPipelinedAllMin ns0 = TxSubmissionServerPipelined (sender [] ns0)
where
middle :: [as] -> Maybe as
middle as = let l = length as
Oops, something went wrong.

0 comments on commit e392b7d

Please sign in to comment.
You can’t perform that action at this time.