Skip to content

Commit

Permalink
LocalStateQuery: allow acquire tip
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed Jan 15, 2021
1 parent 77798f5 commit 10ad383
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ tests =
, testProperty "codec 2-splits" prop_codec_splits2
, testProperty "codec 3-splits" $ withMaxSuccess 30
prop_codec_splits3
, testProperty "codecs V7/V8 compatible"
prop_codec_V7_compatible
, testProperty "codec cbor" prop_codec_cbor
, testProperty "channel ST" prop_channel_ST
, testProperty "channel IO" prop_channel_IO
Expand All @@ -76,25 +78,25 @@ tests =
--

data Query result where
QueryPoint :: Query (Point Block)
QueryPoint :: Query (Maybe (Point Block))

deriving instance Show (Query result)
instance ShowProxy Query where

-- | Information to test an example server and client.
data Setup = Setup
{ clientInput :: [(Point Block, Query (Point Block))]
{ clientInput :: [(Maybe (Point Block), Query (Maybe (Point Block)))]
-- ^ Input for 'localStateQueryClient'
, serverAcquire :: Point Block -> Either AcquireFailure (Point Block)
, serverAcquire :: Maybe (Point Block) -> Either AcquireFailure (Maybe (Point Block))
-- ^ First input parameter for 'localStateQueryServer'
, serverAnswer :: forall result. Point Block -> Query result -> result
, serverAnswer :: forall result. Maybe (Point Block) -> Query result -> result
-- ^ Second input parameter for 'localStateQueryServer'
, expected :: [(Point Block, Either AcquireFailure (Point Block))]
, expected :: [(Maybe (Point Block), Either AcquireFailure (Maybe (Point Block)))]
-- ^ Expected result for the 'localStateQueryClient'.
}

mkSetup
:: Map (Point Block) (Maybe AcquireFailure, Query (Point Block))
:: Map (Maybe (Point Block)) (Maybe AcquireFailure, Query (Maybe (Point Block)))
-- ^ For each point, the given state queries will be executed. In case of
-- the second field is an 'AcquireFailure', the server will fail with
-- that failure.
Expand All @@ -118,7 +120,7 @@ mkSetup input = Setup {
]
}
where
answer :: Point Block -> Query result -> result
answer :: Maybe (Point Block) -> Query result -> result
answer pt q = case q of
QueryPoint -> pt

Expand All @@ -130,7 +132,7 @@ mkSetup input = Setup {
-- | Run a simple local state query client and server, directly on the wrappers,
-- without going via the 'Peer'.
--
prop_direct :: Map (Point Block) (Maybe AcquireFailure, Query (Point Block))
prop_direct :: Map (Maybe (Point Block)) (Maybe AcquireFailure, Query (Maybe (Point Block)))
-> Property
prop_direct input =
runSimOrThrow
Expand All @@ -150,7 +152,7 @@ prop_direct input =
-- | Run a simple local state query client and server, going via the 'Peer'
-- representation, but without going via a channel.
--
prop_connect :: Map (Point Block) (Maybe AcquireFailure, Query (Point Block))
prop_connect :: Map (Maybe (Point Block)) (Maybe AcquireFailure, Query (Maybe (Point Block)))
-> Property
prop_connect input =
case runSimOrThrow
Expand All @@ -176,7 +178,7 @@ prop_channel :: ( MonadAsync m
, MonadST m
)
=> m (Channel m ByteString, Channel m ByteString)
-> Map (Point Block) (Maybe AcquireFailure, Query (Point Block))
-> Map (Maybe (Point Block)) (Maybe AcquireFailure, Query (Maybe (Point Block)))
-> m Property
prop_channel createChannels input =

Expand All @@ -185,7 +187,7 @@ prop_channel createChannels input =
runConnectedPeers
createChannels
nullTracer
codec
(codec True)
(localStateQueryClientPeer $
localStateQueryClient clientInput)
(localStateQueryServerPeer $
Expand All @@ -195,22 +197,22 @@ prop_channel createChannels input =

-- | Run 'prop_channel' in the simulation monad.
--
prop_channel_ST :: Map (Point Block) (Maybe AcquireFailure, Query (Point Block))
prop_channel_ST :: Map (Maybe (Point Block)) (Maybe AcquireFailure, Query (Maybe (Point Block)))
-> Property
prop_channel_ST input =
runSimOrThrow
(prop_channel createConnectedChannels input)

-- | Run 'prop_channel' in the IO monad.
--
prop_channel_IO :: Map (Point Block) (Maybe AcquireFailure, Query (Point Block))
prop_channel_IO :: Map (Maybe (Point Block)) (Maybe AcquireFailure, Query (Maybe (Point Block)))
-> Property
prop_channel_IO input =
ioProperty (prop_channel createConnectedChannels input)

-- | Run 'prop_channel' in the IO monad using local pipes.
--
prop_pipe_IO :: Map (Point Block) (Maybe AcquireFailure, Query (Point Block))
prop_pipe_IO :: Map (Maybe (Point Block)) (Maybe AcquireFailure, Query (Maybe (Point Block)))
-> Property
prop_pipe_IO input =
ioProperty (prop_channel createPipeConnectedChannels input)
Expand All @@ -226,13 +228,32 @@ instance Arbitrary AcquireFailure where
, AcquireFailurePointNotOnChain
]

instance Arbitrary (Query (Point Block)) where
instance Arbitrary (Query (Maybe (Point Block))) where
arbitrary = pure QueryPoint

instance Arbitrary (AnyMessageAndAgency (LocalStateQuery Block (Point Block) Query)) where
arbitrary = oneof
[ getAnyMessageAndAgencyV7 <$> arbitrary

, pure $ AnyMessageAndAgency (ClientAgency TokIdle)
(MsgAcquire Nothing)

, pure $ AnyMessageAndAgency (ClientAgency TokAcquired)
(MsgReAcquire Nothing)
]

-- Newtype wrapper which generates only valid data for 'NodeToClientV7' protocol.
--
newtype AnyMessageAndAgencyV7 = AnyMessageAndAgencyV7 {
getAnyMessageAndAgencyV7
:: AnyMessageAndAgency (LocalStateQuery Block (Point Block) Query)
}
deriving Show

instance Arbitrary AnyMessageAndAgencyV7 where
arbitrary = AnyMessageAndAgencyV7 <$> oneof
[ AnyMessageAndAgency (ClientAgency TokIdle) <$>
(MsgAcquire <$> arbitrary)
(MsgAcquire . Just <$> arbitrary)

, AnyMessageAndAgency (ServerAgency TokAcquiring) <$>
pure MsgAcquired
Expand All @@ -241,7 +262,7 @@ instance Arbitrary (AnyMessageAndAgency (LocalStateQuery Block (Point Block) Que
(MsgFailure <$> arbitrary)

, AnyMessageAndAgency (ClientAgency TokAcquired) <$>
(MsgQuery <$> (arbitrary :: Gen (Query (Point Block))))
(MsgQuery <$> (arbitrary :: Gen (Query (Maybe (Point Block)))))

, AnyMessageAndAgency (ServerAgency (TokQuerying QueryPoint)) <$>
(MsgResult QueryPoint <$> arbitrary)
Expand All @@ -250,12 +271,13 @@ instance Arbitrary (AnyMessageAndAgency (LocalStateQuery Block (Point Block) Que
pure MsgRelease

, AnyMessageAndAgency (ClientAgency TokAcquired) <$>
(MsgReAcquire <$> arbitrary)
(MsgReAcquire . Just <$> arbitrary)

, AnyMessageAndAgency (ClientAgency TokIdle) <$>
pure MsgDone
]


instance ShowQuery Query where
showResult QueryPoint = show

Expand Down Expand Up @@ -293,13 +315,16 @@ instance Eq (AnyMessage (LocalStateQuery Block (Point Block) Query)) where


codec :: MonadST m
=> Codec (LocalStateQuery Block (Point Block) Query)
=> Bool
-> Codec (LocalStateQuery Block (Point Block) Query)
DeserialiseFailure
m ByteString
codec = codecLocalStateQuery
Serialise.encode Serialise.decode
encodeQuery decodeQuery
encodeResult decodeResult
codec canAcquireTip =
codecLocalStateQuery
canAcquireTip
Serialise.encode Serialise.decode
encodeQuery decodeQuery
encodeResult decodeResult
where
encodeQuery :: Query result -> CBOR.Encoding
encodeQuery QueryPoint = Serialise.encode ()
Expand All @@ -321,26 +346,32 @@ prop_codec
:: AnyMessageAndAgency (LocalStateQuery Block (Point Block) Query)
-> Bool
prop_codec msg =
runST (prop_codecM codec msg)
runST (prop_codecM (codec True) msg)

-- | Check for data chunk boundary problems in the codec using 2 chunks.
--
prop_codec_splits2
:: AnyMessageAndAgency (LocalStateQuery Block (Point Block) Query)
-> Bool
prop_codec_splits2 msg =
runST (prop_codec_splitsM splits2 codec msg)
runST (prop_codec_splitsM splits2 (codec True) msg)

-- | Check for data chunk boundary problems in the codec using 3 chunks.
--
prop_codec_splits3
:: AnyMessageAndAgency (LocalStateQuery Block (Point Block) Query)
-> Bool
prop_codec_splits3 msg =
runST (prop_codec_splitsM splits3 codec msg)
runST (prop_codec_splitsM splits3 (codec True) msg)

prop_codec_cbor
:: AnyMessageAndAgency (LocalStateQuery Block (Point Block) Query)
-> Bool
prop_codec_cbor msg =
runST (prop_codec_cborM codec msg)
runST (prop_codec_cborM (codec True) msg)

prop_codec_V7_compatible
:: AnyMessageAndAgencyV7
-> Bool
prop_codec_V7_compatible (AnyMessageAndAgencyV7 msg) =
runST (prop_codecs_compatM (codec False) (codec True) msg)
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ data NodeToClientVersion
-- ^ enabled @CardanoNodeToClientVersion5@, i.e., Mary
| NodeToClientV_7
-- ^ enabled @CardanoNodeToClientVersion6@, adding a query
| NodeToClientV_8
-- ^ 'LocalStateQuery' protocol codec change, allows to acquire tip point.
deriving (Eq, Ord, Enum, Bounded, Show, Typeable)

-- | We set 16ths bit to distinguish `NodeToNodeVersion` and
Expand All @@ -56,6 +58,7 @@ nodeToClientVersionCodec = CodecCBORTerm { encodeTerm, decodeTerm }
encodeTerm NodeToClientV_5 = CBOR.TInt (5 `setBit` nodeToClientVersionBit)
encodeTerm NodeToClientV_6 = CBOR.TInt (6 `setBit` nodeToClientVersionBit)
encodeTerm NodeToClientV_7 = CBOR.TInt (7 `setBit` nodeToClientVersionBit)
encodeTerm NodeToClientV_8 = CBOR.TInt (8 `setBit` nodeToClientVersionBit)

decodeTerm (CBOR.TInt tag) =
case ( tag `clearBit` nodeToClientVersionBit
Expand All @@ -68,6 +71,7 @@ nodeToClientVersionCodec = CodecCBORTerm { encodeTerm, decodeTerm }
(5, True) -> Right NodeToClientV_5
(6, True) -> Right NodeToClientV_6
(7, True) -> Right NodeToClientV_7
(8, True) -> Right NodeToClientV_8
(n, _) -> Left ( T.pack "decode NodeToClientVersion: unknown tag: " <> T.pack (show tag)
, Just n)
decodeTerm _ = Left ( T.pack "decode NodeToClientVersion: unexpected term"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ localStateQueryClientNull =
-- * a termination messge
--
data ClientStIdle block point query (m :: Type -> Type) a where
SendMsgAcquire :: point
SendMsgAcquire :: Maybe point
-> ClientStAcquiring block point query m a
-> ClientStIdle block point query m a

Expand Down Expand Up @@ -85,7 +85,7 @@ data ClientStAcquired block point query m a where
-> ClientStQuerying block point query m a result
-> ClientStAcquired block point query m a

SendMsgReAcquire :: point
SendMsgReAcquire :: Maybe point
-> ClientStAcquiring block point query m a
-> ClientStAcquired block point query m a

Expand Down Expand Up @@ -123,7 +123,7 @@ mapLocalStateQueryClient fpoint fquery fresult =
goIdle :: ClientStIdle block point query m a
-> ClientStIdle block' point' query' m a
goIdle (SendMsgAcquire pt k) =
SendMsgAcquire (fpoint pt) (goAcquiring k)
SendMsgAcquire (fpoint <$> pt) (goAcquiring k)

goIdle (SendMsgDone a) = SendMsgDone a

Expand All @@ -139,7 +139,7 @@ mapLocalStateQueryClient fpoint fquery fresult =
-> ClientStAcquired block' point' query' m a
goAcquired (SendMsgQuery q k) = case fquery q of
Some q' -> SendMsgQuery q' (goQuerying q q' k)
goAcquired (SendMsgReAcquire pt k) = SendMsgReAcquire (fpoint pt) (goAcquiring k)
goAcquired (SendMsgReAcquire pt k) = SendMsgReAcquire (fpoint <$> pt) (goAcquiring k)
goAcquired (SendMsgRelease k) = SendMsgRelease (fmap goIdle k)

goQuerying :: forall result result'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ codecLocalStateQuery
( MonadST m
, ShowQuery query
)
=> (point -> CBOR.Encoding)
=> Bool -- allow @Maybe 'Point'@ in 'MsgAcquire' and 'MsgReAcquire'.
-> (point -> CBOR.Encoding)
-> (forall s . CBOR.Decoder s point)
-> (forall result . query result -> CBOR.Encoding)
-> (forall s . CBOR.Decoder s (Some query))
-> (forall result . query result -> result -> CBOR.Encoding)
-> (forall result . query result -> forall s . CBOR.Decoder s result)
-> Codec (LocalStateQuery block point query) CBOR.DeserialiseFailure m ByteString
codecLocalStateQuery encodePoint decodePoint
codecLocalStateQuery canAcquireTip
encodePoint decodePoint
encodeQuery decodeQuery
encodeResult decodeResult =
mkCodecCborLazyBS encode decode
Expand All @@ -64,11 +66,20 @@ codecLocalStateQuery encodePoint decodePoint
PeerHasAgency pr st
-> Message (LocalStateQuery block point query) st st'
-> CBOR.Encoding
encode (ClientAgency TokIdle) (MsgAcquire pt) =
encode (ClientAgency TokIdle) (MsgAcquire (Just pt)) =
CBOR.encodeListLen 2
<> CBOR.encodeWord 0
<> encodePoint pt

encode (ClientAgency TokIdle) (MsgAcquire Nothing) | canAcquireTip =
CBOR.encodeListLen 1
<> CBOR.encodeWord 8

encode (ClientAgency TokIdle) (MsgAcquire Nothing) =
error $ "encodeFailure: local state query: using acquire without a "
++ "Point must be conditional on negotiating v8 of the "
++ "node-to-client protocol"

encode (ServerAgency TokAcquiring) MsgAcquired =
CBOR.encodeListLen 1
<> CBOR.encodeWord 1
Expand All @@ -92,11 +103,18 @@ codecLocalStateQuery encodePoint decodePoint
CBOR.encodeListLen 1
<> CBOR.encodeWord 5

encode (ClientAgency TokAcquired) (MsgReAcquire pt) =
encode (ClientAgency TokAcquired) (MsgReAcquire (Just pt)) =
CBOR.encodeListLen 2
<> CBOR.encodeWord 6
<> encodePoint pt

encode (ClientAgency TokAcquired) (MsgReAcquire Nothing) | canAcquireTip =
CBOR.encodeListLen 1
<> CBOR.encodeWord 9

encode (ClientAgency TokAcquired) (MsgReAcquire Nothing) =
error "encodeFailure: this version does not support re-acquiring tip"

encode (ClientAgency TokIdle) MsgDone =
CBOR.encodeListLen 1
<> CBOR.encodeWord 7
Expand All @@ -110,7 +128,10 @@ codecLocalStateQuery encodePoint decodePoint
case (stok, len, key) of
(ClientAgency TokIdle, 2, 0) -> do
pt <- decodePoint
return (SomeMessage (MsgAcquire pt))
return (SomeMessage (MsgAcquire (Just pt)))

(ClientAgency TokIdle, 1, 8) | canAcquireTip -> do
return (SomeMessage (MsgAcquire Nothing))

(ServerAgency TokAcquiring, 1, 1) ->
return (SomeMessage MsgAcquired)
Expand All @@ -132,7 +153,10 @@ codecLocalStateQuery encodePoint decodePoint

(ClientAgency TokAcquired, 2, 6) -> do
pt <- decodePoint
return (SomeMessage (MsgReAcquire pt))
return (SomeMessage (MsgReAcquire (Just pt)))

(ClientAgency TokAcquired, 1, 9) | canAcquireTip -> do
return (SomeMessage (MsgReAcquire Nothing))

(ClientAgency TokIdle, 1, 7) ->
return (SomeMessage MsgDone)
Expand Down
Loading

0 comments on commit 10ad383

Please sign in to comment.