-
Notifications
You must be signed in to change notification settings - Fork 44
/
NodeClient.hs
370 lines (335 loc) · 14.9 KB
/
NodeClient.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
module Language.Marlowe.Runtime.ChainIndexer.NodeClient
( Changes(..)
, CostModel(..)
, NodeClient(..)
, NodeClientDependencies(..)
, NodeClientSelector(..)
, RollBackwardField(..)
, RollForwardField(..)
, isEmptyChanges
, nodeClient
, toEmptyChanges
) where
import Cardano.Api
( Block(..)
, BlockHeader(..)
, BlockInMode(..)
, BlockNo
, CardanoMode
, ChainPoint(..)
, ChainSyncClientPipelined(..)
, ChainTip(..)
, LocalChainSyncClient(..)
, LocalNodeClientProtocols(..)
, LocalNodeClientProtocolsInMode
, SlotNo(..)
)
import Cardano.Api.ChainSync.ClientPipelined
( ClientPipelinedStIdle(..)
, ClientPipelinedStIntersect(..)
, ClientStNext(..)
, MkPipelineDecision
, N(..)
, Nat(..)
, PipelineDecision(..)
, mapChainSyncClientPipelined
, pipelineDecisionLowHighMark
, runPipelineDecision
)
import Control.Arrow ((&&&))
import Control.Concurrent.Component
import Control.Concurrent.STM (STM, TVar, atomically, modifyTVar, newTVar, readTVar, writeTVar)
import Control.Exception (SomeException, mask, throw, try)
import Control.Monad (guard)
import Data.IntMap.Lazy (IntMap)
import qualified Data.IntMap.Lazy as IntMap
import Data.List (sortOn)
import Data.Maybe (mapMaybe)
import Data.Ord (Down(..))
import Data.Void (Void)
import Language.Marlowe.Runtime.ChainIndexer.Database (CardanoBlock, GetIntersectionPoints(..))
import Observe.Event.Backend (simpleNewEventArgs)
import Observe.Event.Explicit (EventBackend, addField, finalize, newEvent, withEvent)
import Ouroboros.Network.Point (WithOrigin(..))
type NumberedCardanoBlock = (BlockNo, CardanoBlock)
type NumberedChainTip = (WithOrigin BlockNo, ChainTip)
-- | Describes a batch of chain data changes to write.
data Changes = Changes
{ changesRollback :: !(Maybe ChainPoint) -- ^ Point to rollback to before writing any blocks.
, changesBlocks :: ![CardanoBlock] -- ^ New blocks to write.
, changesTip :: !ChainTip -- ^ Most recently observed tip of the local node.
, changesLocalTip :: !ChainTip -- ^ Chain tip the changes will advance the local state to.
, changesBlockCount :: !Int -- ^ Number of blocks in the change set.
, changesTxCount :: !Int -- ^ Number of transactions in the change set.
}
-- | An empty Changes collection.
emptyChanges :: Changes
emptyChanges = Changes Nothing [] ChainTipAtGenesis ChainTipAtGenesis 0 0
-- | Make a set of changes into an empty set (preserves the tip and point fields).
toEmptyChanges :: Changes -> Changes
toEmptyChanges changes = changes
{ changesRollback = Nothing
, changesBlocks = []
, changesBlockCount = 0
, changesTxCount = 0
}
-- | Returns True if the change set is empty.
isEmptyChanges :: Changes -> Bool
isEmptyChanges (Changes Nothing [] _ _ _ _) = True
isEmptyChanges _ = False
-- | Parameters for estimating the cost of writing a batch of changes.
data CostModel = CostModel
{ blockCost :: Int
, txCost :: Int
} deriving (Show, Eq)
-- | Computes the cost of a change set. The value is a unitless heuristic.
-- Prevents large numbers of transactions and blocks being held in memory.
cost :: CostModel -> Changes -> Int
cost CostModel{..} Changes{..} = changesBlockCount * blockCost + changesTxCount * txCost
-- | The set of dependencies needed by the NodeClient component.
data NodeClientDependencies r = NodeClientDependencies
{ connectToLocalNode :: !(LocalNodeClientProtocolsInMode CardanoMode -> IO ()) -- ^ Connect to the local node.
, getIntersectionPoints :: !(GetIntersectionPoints IO) -- ^ How to load the set of initial intersection points for the chain sync client.
-- | The maximum cost a set of changes is allowed to incur before the
-- NodeClient blocks.
, maxCost :: Int
, costModel :: CostModel
, eventBackend :: !(EventBackend IO r NodeClientSelector)
}
-- | The public API of the NodeClient component.
data NodeClient = NodeClient
{ getChanges :: STM Changes -- ^ An STM action that atomically reads and clears the current change set.
, connected :: STM Bool
}
data NodeClientSelector f where
Connect :: NodeClientSelector Void
Intersect :: NodeClientSelector [ChainPoint]
IntersectFound :: NodeClientSelector ChainPoint
IntersectNotFound :: NodeClientSelector Void
RollForward :: NodeClientSelector RollForwardField
RollBackward :: NodeClientSelector RollBackwardField
data RollForwardField
= RollForwardBlock BlockHeader
| RollForwardTip ChainTip
| RollForwardNewCost Int
data RollBackwardField
= RollBackwardPoint ChainPoint
| RollBackwardTip ChainTip
| RollBackwardNewCost Int
-- | Create a new NodeClient component.
nodeClient :: Component IO (NodeClientDependencies r) NodeClient
nodeClient = component \NodeClientDependencies{..} -> do
changesVar <- newTVar emptyChanges
connectedVar <- newTVar False
let
getChanges :: STM Changes
getChanges = do
changes <- readTVar changesVar
modifyTVar changesVar toEmptyChanges
pure changes
pipelinedClient' :: ChainSyncClientPipelined CardanoBlock ChainPoint ChainTip IO ()
pipelinedClient' = mapChainSyncClientPipelined id id (blockToBlockNo &&& id) (chainTipToBlockNo &&& id)
$ pipelinedClient eventBackend costModel maxCost changesVar getIntersectionPoints
runNodeClient :: IO ()
runNodeClient = mask \restore -> do
ev <- newEvent eventBackend $ simpleNewEventArgs Connect
result <- try @SomeException $ restore $ connectToLocalNode LocalNodeClientProtocols
{ localChainSyncClient =
let ChainSyncClientPipelined client = pipelinedClient'
in LocalChainSyncClientPipelined $ ChainSyncClientPipelined do
finalize ev Nothing
atomically $ writeTVar connectedVar True
client
, localTxSubmissionClient = Nothing
, localTxMonitoringClient = Nothing
, localStateQueryClient = Nothing
}
atomically $ writeTVar connectedVar False
case result of
Left ex -> finalize ev (Just ex) *> throw ex
Right _ -> pure ()
connected = readTVar connectedVar
pure (runNodeClient, NodeClient { getChanges, connected })
blockHeaderToBlockNo :: BlockHeader -> BlockNo
blockHeaderToBlockNo (BlockHeader _ _ blockNo) = blockNo
blockToBlockNo :: CardanoBlock -> BlockNo
blockToBlockNo (BlockInMode (Block header _) _) = blockHeaderToBlockNo header
chainTipToBlockNo :: ChainTip -> WithOrigin BlockNo
chainTipToBlockNo = \case
ChainTipAtGenesis -> Origin
ChainTip _ _ blockNo -> At blockNo
pipelinedClient
:: EventBackend IO r NodeClientSelector
-> CostModel
-> Int
-> TVar Changes
-> GetIntersectionPoints IO
-> ChainSyncClientPipelined NumberedCardanoBlock ChainPoint NumberedChainTip IO ()
pipelinedClient eventBackend costModel maxCost changesVar getIntersectionPoints =
ChainSyncClientPipelined do
headers <- withEvent eventBackend Intersect \ev -> do
headers <- sortOn (Down . fmap blockHeaderToBlockNo) <$> runGetIntersectionPoints getIntersectionPoints
addField ev $ headerToPoint <$> headers
pure headers
pure $ SendMsgFindIntersect (headerToPoint <$> headers) ClientPipelinedStIntersect
{ recvMsgIntersectFound = \point tip -> withEvent eventBackend IntersectFound \ev -> do
let
getSlotAndBlock = case point of
ChainPointAtGenesis -> const Nothing
ChainPoint pointSlot _ -> \case
Origin -> Nothing
At (BlockHeader (SlotNo s) _ b)
| SlotNo s <= pointSlot -> Just (fromIntegral s, b)
| otherwise -> Nothing
slotNoToBlockNo = IntMap.fromList $ mapMaybe getSlotAndBlock headers
addField ev point
clientStIdle slotNoToBlockNo point tip
, recvMsgIntersectNotFound = withEvent eventBackend IntersectNotFound . const . clientStIdle mempty ChainPointAtGenesis
}
where
clientStIdle
:: IntMap BlockNo
-> ChainPoint
-> NumberedChainTip
-> IO (ClientPipelinedStIdle 'Z NumberedCardanoBlock ChainPoint NumberedChainTip IO ())
clientStIdle slotNoToBlockNo point nodeTip = do
let
clientTip = case point of
ChainPointAtGenesis -> Origin
ChainPoint (SlotNo s) _ -> case IntMap.lookup (fromIntegral s) slotNoToBlockNo of
Nothing -> error $ "Unable to find block number for chain point " <> show point
Just b -> At b
pure $ mkClientStIdle eventBackend costModel maxCost changesVar slotNoToBlockNo pipelinePolicy Zero clientTip nodeTip
headerToPoint Origin = ChainPointAtGenesis
headerToPoint (At (BlockHeader s h _)) = ChainPoint s h
-- How to pipeline. If we have fewer than 50 requests in flight, send
-- another request. When we hit 50, start collecting responses until we
-- have 1 request in flight, then repeat. If we are caught up to tip,
-- requests will not be pipelined.
pipelinePolicy :: MkPipelineDecision
pipelinePolicy = pipelineDecisionLowHighMark 1 50
mkClientStIdle
:: forall r n
. EventBackend IO r NodeClientSelector
-> CostModel
-> Int
-> TVar Changes
-> IntMap BlockNo
-> MkPipelineDecision
-> Nat n
-> WithOrigin BlockNo
-> NumberedChainTip
-> ClientPipelinedStIdle n NumberedCardanoBlock ChainPoint NumberedChainTip IO ()
mkClientStIdle eventBackend costModel maxCost changesVar slotNoToBlockNo pipelineDecision n clientTip nodeTip =
case (n, runPipelineDecision pipelineDecision n clientTip (fst nodeTip)) of
(_, (Request, pipelineDecision')) ->
SendMsgRequestNext (collect pipelineDecision' n) $ pure (collect pipelineDecision' n)
(_, (Pipeline, pipelineDecision')) ->
nextPipelineRequest pipelineDecision'
(Succ n', (CollectOrPipeline, pipelineDecision')) ->
CollectResponse
(Just $ pure $ nextPipelineRequest pipelineDecision')
(collect pipelineDecision' n')
(Succ n', (Collect, pipelineDecision')) ->
CollectResponse Nothing (collect pipelineDecision' n')
where
nextPipelineRequest
:: MkPipelineDecision
-> ClientPipelinedStIdle n NumberedCardanoBlock ChainPoint NumberedChainTip IO ()
nextPipelineRequest pipelineDecision' = SendMsgRequestNextPipelined
$ mkClientStIdle eventBackend costModel maxCost changesVar slotNoToBlockNo pipelineDecision' (Succ n) clientTip nodeTip
collect
:: forall n'
. MkPipelineDecision
-> Nat n'
-> ClientStNext n' NumberedCardanoBlock ChainPoint NumberedChainTip IO ()
collect pipelineDecision' = mkClientStNext eventBackend costModel maxCost changesVar slotNoToBlockNo pipelineDecision'
mkClientStNext
:: EventBackend IO r NodeClientSelector
-> CostModel
-> Int
-> TVar Changes
-> IntMap BlockNo
-> MkPipelineDecision
-> Nat n
-> ClientStNext n NumberedCardanoBlock ChainPoint NumberedChainTip IO ()
mkClientStNext eventBackend costModel maxCost changesVar slotNoToBlockNo pipelineDecision n = ClientStNext
{ recvMsgRollForward = \(blockNo, block@(BlockInMode (Block header@(BlockHeader slotNo hash _) txs) _)) tip -> withEvent eventBackend RollForward \ev -> do
addField ev $ RollForwardBlock header
addField ev $ RollForwardTip $ snd tip
nextChanges <- atomically do
changes <- readTVar changesVar
let
nextChanges = changes
{ changesBlocks = block : changesBlocks changes
, changesTip = snd tip
, changesLocalTip = ChainTip slotNo hash blockNo
, changesBlockCount = changesBlockCount changes + 1
, changesTxCount = changesTxCount changes + length txs
}
-- Retry unless either the current change set is empty, or the next
-- change set would not be too expensive.
guard $ isEmptyChanges changes || cost costModel nextChanges <= maxCost
writeTVar changesVar nextChanges
pure nextChanges
addField ev $ RollForwardNewCost $ cost costModel nextChanges
let clientTip = At blockNo
let slotNoToInt (SlotNo s) = fromIntegral s
let slotNoToBlockNo' = IntMap.insert (slotNoToInt slotNo) blockNo slotNoToBlockNo
pure $ mkClientStIdle eventBackend costModel maxCost changesVar slotNoToBlockNo' pipelineDecision n clientTip tip
, recvMsgRollBackward = \point tip -> withEvent eventBackend RollBackward \ev -> do
addField ev $ RollBackwardPoint point
addField ev $ RollBackwardTip $ snd tip
let
clientTip = case point of
ChainPointAtGenesis -> Origin
ChainPoint (SlotNo s) _ -> case IntMap.lookup (fromIntegral s) slotNoToBlockNo of
Nothing -> error $ "Unable to find block number for chain point " <> show point
Just b -> At b
newChanges <- atomically do
Changes{..} <- readTVar changesVar
let
changesBlocks' = case point of
ChainPointAtGenesis -> []
ChainPoint slot _ -> dropWhile ((> slot) . blockSlot) changesBlocks
blockTxCount (BlockInMode (Block _ txs) _) = length txs
newChanges = Changes
{ changesBlocks = changesBlocks'
, changesRollback = case changesRollback of
-- If there was no previous rollback, and we still have blocks
-- in the batch after the rollback, we don't need to actually
-- process the rollback.
Nothing -> point <$ guard (null changesBlocks')
-- Otherwise, we need to process whichever rollback was to an
-- earlier point: the previous one, or this new one.
Just prevRollback -> Just $ minPoint point prevRollback
, changesTip = snd tip
, changesLocalTip = case (point, clientTip) of
(ChainPointAtGenesis, _) -> ChainTipAtGenesis
(_, Origin) -> ChainTipAtGenesis
(ChainPoint slotNo hash, At blockNo) -> ChainTip slotNo hash blockNo
, changesBlockCount = length changesBlocks'
, changesTxCount = sum $ blockTxCount <$> changesBlocks
}
writeTVar changesVar newChanges
pure newChanges
addField ev $ RollBackwardNewCost $ cost costModel newChanges
let
slotNoToBlockNo' = case point of
ChainPointAtGenesis -> mempty
ChainPoint (SlotNo s) _ -> IntMap.fromDistinctAscList
$ reverse
$ dropWhile ((s <) . fromIntegral . fst)
$ IntMap.toDescList slotNoToBlockNo
pure $ mkClientStIdle eventBackend costModel maxCost changesVar slotNoToBlockNo' pipelineDecision n clientTip tip
}
minPoint :: ChainPoint -> ChainPoint -> ChainPoint
minPoint ChainPointAtGenesis _ = ChainPointAtGenesis
minPoint _ ChainPointAtGenesis = ChainPointAtGenesis
minPoint p1@(ChainPoint s1 _) p2@(ChainPoint s2 _)
| s1 < s2 = p1
| otherwise = p2
blockSlot :: CardanoBlock -> SlotNo
blockSlot (BlockInMode (Block (BlockHeader slot _ _) _) _) = slot