-
Notifications
You must be signed in to change notification settings - Fork 86
/
OnDisk.hs
419 lines (381 loc) · 16.6 KB
/
OnDisk.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DeriveTraversable #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TupleSections #-}
module Ouroboros.Consensus.Storage.LedgerDB.OnDisk (
-- * Opening the database
initLedgerDB
, InitLog(..)
, InitFailure(..)
-- ** Abstraction over the stream API
, NextBlock(..)
, StreamAPI(..)
-- * Write to disk
, takeSnapshot
, trimSnapshots
-- * Low-level API (primarily exposed for testing)
, DiskSnapshot -- opaque
, deleteSnapshot
, snapshotToPath
-- * Trace events
, TraceEvent(..)
, TraceReplayEvent(..)
) where
import qualified Codec.CBOR.Write as CBOR
import Codec.Serialise.Decoding (Decoder)
import Codec.Serialise.Encoding (Encoding)
import Control.Monad.Except
import Control.Tracer
import qualified Data.Bifunctor.TH as TH
import qualified Data.List as List
import Data.Maybe (mapMaybe)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Word
import GHC.Generics (Generic)
import GHC.Stack
import Text.Read (readMaybe)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Util.CBOR (ReadIncrementalErr,
readIncremental)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Storage.FS.API
import Ouroboros.Consensus.Storage.FS.API.Types
import Ouroboros.Consensus.Storage.LedgerDB.DiskPolicy
import Ouroboros.Consensus.Storage.LedgerDB.InMemory
{-------------------------------------------------------------------------------
Abstraction over the streaming API provided by the Chain DB
-------------------------------------------------------------------------------}
-- | Next block returned during streaming
data NextBlock r b = NoMoreBlocks | NextBlock (r, b)
-- | Stream blocks from the immutable DB
--
-- When we initialize the ledger DB, we try to find a snapshot close to the
-- tip of the immutable DB, and then stream blocks from the immutable DB to its
-- tip to bring the ledger up to date with the tip of the immutable DB.
--
-- In CPS form to enable the use of 'withXYZ' style iterator init functions.
data StreamAPI m r b = StreamAPI {
-- | Start streaming after the specified block
streamAfter :: forall a. HasCallStack
=> WithOrigin r
-- Reference to the block corresponding to the snapshot we found
-- (or 'TipGen' if we didn't find any)
-> (Maybe (m (NextBlock r b)) -> m a)
-- Get the next block (by value)
--
-- Should be 'Nothing' if the snapshot we found is more recent than
-- the tip of the immutable DB; since we only store snapshots to disk
-- for blocks in the immutable DB, this can only happen if the
-- immutable DB got truncated due to disk corruption.
-> m a
}
-- | Stream all blocks
streamAll :: forall m r b e a. (Monad m, HasCallStack)
=> StreamAPI m r b
-> WithOrigin r -- ^ Starting point for streaming
-> (WithOrigin r -> e) -- ^ Error when tip not found
-> a -- ^ Starting point when tip /is/ found
-> ((r, b) -> a -> m a) -- ^ Update function for each block
-> ExceptT e m a
streamAll StreamAPI{..} tip notFound e f = ExceptT $
streamAfter tip $ \case
Nothing -> return $ Left (notFound tip)
Just getNext -> do
let go :: a -> m a
go a = do mNext <- getNext
case mNext of
NoMoreBlocks -> return a
NextBlock b -> go =<< f b a
Right <$> go e
{-------------------------------------------------------------------------------
Initialize the DB
-------------------------------------------------------------------------------}
-- | Initialization log
--
-- The initialization log records which snapshots from disk were considered,
-- in which order, and why some snapshots were rejected. It is primarily useful
-- for monitoring purposes.
data InitLog r =
-- | Defaulted to initialization from genesis
--
-- NOTE: Unless the blockchain is near genesis, we should see this /only/
-- if data corrupted occurred.
InitFromGenesis
-- | Used a snapshot corresponding to the specified tip
| InitFromSnapshot DiskSnapshot (WithOrigin r)
-- | Initialization skipped a snapshot
--
-- We record the reason why it was skipped.
--
-- NOTE: We should /only/ see this if data corrupted occurred.
| InitFailure DiskSnapshot (InitFailure r) (InitLog r)
deriving (Show, Eq, Generic)
-- | Initialize the ledger DB from the most recent snapshot on disk
--
-- If no such snapshot can be found, use the genesis ledger DB. Returns the
-- initialized DB as well as the block reference corresponding to the snapshot
-- we found on disk (the latter primarily for testing/monitoring purposes).
--
-- We do /not/ catch any exceptions thrown during streaming; should any be
-- thrown, it is the responsibility of the 'ChainDB' to catch these
-- and trigger (further) validation. We only discard snapshots if
--
-- * We cannot deserialise them, or
-- * they are /ahead/ of the chain
--
-- It is possible that the Ledger DB will not be able to roll back @k@ blocks
-- after initialization if the chain has been truncated (data corruption).
-- We do /not/ attempt to use multiple ledger states from disk to construct the
-- ledger DB. Instead we load only a /single/ ledger state from disk, and
-- /compute/ all subsequent ones. This is important, because the ledger states
-- obtained in this way will (hopefully) share much of their memory footprint
-- with their predecessors.
initLedgerDB :: forall m h l r b. (IOLike m, ApplyBlock l b, HasCallStack)
=> Tracer m (TraceReplayEvent r ())
-> Tracer m (TraceEvent r)
-> HasFS m h
-> (forall s. Decoder s l)
-> (forall s. Decoder s r)
-> LedgerDbParams
-> FullBlockConfig l b
-> m l -- ^ Genesis ledger state
-> StreamAPI m r b
-> m (InitLog r, LedgerDB l r, Word64)
initLedgerDB replayTracer
tracer
hasFS
decLedger
decRef
params
conf
getGenesisLedger
streamAPI = do
snapshots <- listSnapshots hasFS
tryNewestFirst id snapshots
where
tryNewestFirst :: (InitLog r -> InitLog r)
-> [DiskSnapshot]
-> m (InitLog r, LedgerDB l r, Word64)
tryNewestFirst acc [] = do
-- We're out of snapshots. Start at genesis
traceWith replayTracer $ ReplayFromGenesis ()
initDb <- ledgerDbFromGenesis params <$> getGenesisLedger
ml <- runExceptT $ initStartingWith replayTracer conf streamAPI initDb
case ml of
Left _ -> error "invariant violation: invalid current chain"
Right (l, replayed) -> return (acc InitFromGenesis, l, replayed)
tryNewestFirst acc (s:ss) = do
-- If we fail to use this snapshot, delete it and try an older one
ml <- runExceptT $ initFromSnapshot
replayTracer
hasFS
decLedger
decRef
params
conf
streamAPI
s
case ml of
Left err -> do
deleteSnapshot hasFS s
traceWith tracer $ InvalidSnapshot s err
tryNewestFirst (acc . InitFailure s err) ss
Right (r, l, replayed) ->
return (acc (InitFromSnapshot s r), l, replayed)
{-------------------------------------------------------------------------------
Internal: initialize using the given snapshot
-------------------------------------------------------------------------------}
data InitFailure r =
-- | We failed to deserialise the snapshot
--
-- This can happen due to data corruption in the ledger DB.
InitFailureRead ReadIncrementalErr
-- | This snapshot is too recent (ahead of the tip of the chain)
| InitFailureTooRecent (WithOrigin r)
deriving (Show, Eq, Generic)
-- | Attempt to initialize the ledger DB from the given snapshot
--
-- If the chain DB or ledger layer reports an error, the whole thing is aborted
-- and an error is returned. This should not throw any errors itself (ignoring
-- unexpected exceptions such as asynchronous exceptions, of course).
initFromSnapshot :: forall m h l r b. (IOLike m, ApplyBlock l b, HasCallStack)
=> Tracer m (TraceReplayEvent r ())
-> HasFS m h
-> (forall s. Decoder s l)
-> (forall s. Decoder s r)
-> LedgerDbParams
-> FullBlockConfig l b
-> StreamAPI m r b
-> DiskSnapshot
-> ExceptT (InitFailure r) m (WithOrigin r, LedgerDB l r, Word64)
initFromSnapshot tracer hasFS decLedger decRef params conf streamAPI ss = do
initSS <- withExceptT InitFailureRead $
readSnapshot hasFS decLedger decRef ss
lift $ traceWith tracer $ ReplayFromSnapshot ss (csTip initSS) ()
(initDB, replayed) <- initStartingWith tracer conf streamAPI (ledgerDbWithAnchor params initSS)
return (csTip initSS, initDB, replayed)
-- | Attempt to initialize the ledger DB starting from the given ledger DB
initStartingWith :: forall m l r b. (Monad m, ApplyBlock l b, HasCallStack)
=> Tracer m (TraceReplayEvent r ())
-> FullBlockConfig l b
-> StreamAPI m r b
-> LedgerDB l r
-> ExceptT (InitFailure r) m (LedgerDB l r, Word64)
initStartingWith tracer conf streamAPI initDb = do
streamAll streamAPI (ledgerDbTip initDb)
InitFailureTooRecent
(initDb, 0)
push
where
push :: (r, b) -> (LedgerDB l r, Word64) -> m (LedgerDB l r, Word64)
push (r, b) !(!db, !replayed) = do
traceWith tracer (ReplayedBlock r ())
(, replayed + 1) <$> ledgerDbPush conf (ReapplyVal r b) db
{-------------------------------------------------------------------------------
Write to disk
-------------------------------------------------------------------------------}
-- | Take a snapshot of the /oldest ledger state/ in the ledger DB
--
-- We write the /oldest/ ledger state to disk because the intention is to only
-- write ledger states to disk that we know to be immutable. Primarily for
-- testing purposes, 'takeSnapshot' returns the block reference corresponding
-- to the snapshot that we wrote.
--
-- NOTE: This is a lower-level API that unconditionally takes a snapshot
-- (i.e., independent from whether this snapshot corresponds to a state that
-- is more than @k@ back).
--
-- TODO: Should we delete the file if an error occurs during writing?
takeSnapshot :: forall m l r h. MonadThrow m
=> Tracer m (TraceEvent r)
-> HasFS m h
-> (l -> Encoding)
-> (r -> Encoding)
-> LedgerDB l r -> m (DiskSnapshot, WithOrigin r)
takeSnapshot tracer hasFS encLedger encRef db = do
ss <- nextAvailable <$> listSnapshots hasFS
writeSnapshot hasFS encLedger encRef ss oldest
traceWith tracer $ TookSnapshot ss (csTip oldest)
return (ss, csTip oldest)
where
oldest :: ChainSummary l r
oldest = ledgerDbAnchor db
-- | Trim the number of on disk snapshots so that at most 'onDiskNumSnapshots'
-- snapshots are stored on disk. The oldest snapshots are deleted.
--
-- The deleted snapshots are returned.
trimSnapshots :: Monad m
=> Tracer m (TraceEvent r)
-> HasFS m h
-> DiskPolicy
-> m [DiskSnapshot]
trimSnapshots tracer hasFS DiskPolicy{..} = do
snapshots <- listSnapshots hasFS
-- The snapshot are most recent first, so we can simply drop from the
-- front to get the snapshots that are "too" old.
forM (drop (fromIntegral onDiskNumSnapshots) snapshots) $ \snapshot -> do
deleteSnapshot hasFS snapshot
traceWith tracer $ DeletedSnapshot snapshot
return snapshot
{-------------------------------------------------------------------------------
Internal: reading from disk
-------------------------------------------------------------------------------}
-- | On disk snapshots are numbered monotonically
newtype DiskSnapshot = DiskSnapshot Int
deriving (Show, Eq, Ord, Generic)
-- | Number of the next snapshot, given snapshots currently on disk
nextAvailable :: [DiskSnapshot] -> DiskSnapshot
nextAvailable [] = DiskSnapshot 1
nextAvailable ss = let DiskSnapshot n = maximum ss in DiskSnapshot (n + 1)
-- | Read snapshot from disk
readSnapshot :: forall m l r h. (IOLike m)
=> HasFS m h
-> (forall s. Decoder s l)
-> (forall s. Decoder s r)
-> DiskSnapshot
-> ExceptT ReadIncrementalErr m (ChainSummary l r)
readSnapshot hasFS decLedger decRef =
ExceptT
. readIncremental hasFS decoder
. snapshotToPath
where
decoder :: Decoder s (ChainSummary l r)
decoder = decodeChainSummary decLedger decRef
-- | Write snapshot to disk
writeSnapshot :: forall m l r h. MonadThrow m
=> HasFS m h
-> (l -> Encoding)
-> (r -> Encoding)
-> DiskSnapshot -> ChainSummary l r -> m ()
writeSnapshot hasFS@HasFS{..} encLedger encRef ss cs = do
withFile hasFS (snapshotToPath ss) (WriteMode MustBeNew) $ \h ->
void $ hPut hasFS h $ CBOR.toBuilder (encode cs)
where
encode :: ChainSummary l r -> Encoding
encode = encodeChainSummary encLedger encRef
-- | Delete snapshot from disk
deleteSnapshot :: HasCallStack => HasFS m h -> DiskSnapshot -> m ()
deleteSnapshot HasFS{..} = removeFile . snapshotToPath
-- | List on-disk snapshots, most recent first
listSnapshots :: Monad m => HasFS m h -> m [DiskSnapshot]
listSnapshots HasFS{..} =
aux <$> listDirectory (mkFsPath [])
where
aux :: Set String -> [DiskSnapshot]
aux = List.sortBy (flip compare) . mapMaybe snapshotFromPath . Set.toList
snapshotToPath :: DiskSnapshot -> FsPath
snapshotToPath (DiskSnapshot ss) = mkFsPath [show ss]
snapshotFromPath :: String -> Maybe DiskSnapshot
snapshotFromPath = fmap DiskSnapshot . readMaybe
{-------------------------------------------------------------------------------
Trace events
-------------------------------------------------------------------------------}
data TraceEvent r
= InvalidSnapshot DiskSnapshot (InitFailure r)
-- ^ An on disk snapshot was skipped because it was invalid.
| TookSnapshot DiskSnapshot (WithOrigin r)
-- ^ A snapshot was written to disk.
| DeletedSnapshot DiskSnapshot
-- ^ An old or invalid on-disk snapshot was deleted
deriving (Generic, Eq, Show)
-- | Events traced while replaying blocks against the ledger to bring it up to
-- date w.r.t. the tip of the ImmutableDB during initialisation. As this
-- process takes a while, we trace events to inform higher layers of our
-- progress.
--
-- The @replayTo@ parameter is meant to be filled in by a higher layer,
-- i.e., the ChainDB.
data TraceReplayEvent r replayTo
= ReplayFromGenesis replayTo
-- ^ There were no LedgerDB snapshots on disk, so we're replaying all
-- blocks starting from Genesis against the initial ledger.
--
-- The @replayTo@ parameter corresponds to the block at the tip of the
-- ImmutableDB, i.e., the last block to replay.
| ReplayFromSnapshot DiskSnapshot (WithOrigin r) replayTo
-- ^ There was a LedgerDB snapshot on disk corresponding to the given tip.
-- We're replaying more recent blocks against it.
--
-- The @replayTo@ parameter corresponds to the block at the tip of the
-- ImmutableDB, i.e., the last block to replay.
| ReplayedBlock r replayTo
-- ^ We replayed the given block (reference) on the genesis snapshot
-- during the initialisation of the LedgerDB.
--
-- The @blockInfo@ parameter corresponds replayed block and the @replayTo@
-- parameter corresponds to the block at the tip of the ImmutableDB, i.e.,
-- the last block to replay.
deriving (Generic, Eq, Show, Functor, Foldable, Traversable)
TH.deriveBifunctor ''TraceReplayEvent
TH.deriveBifoldable ''TraceReplayEvent
TH.deriveBitraversable ''TraceReplayEvent