Skip to content

Commit

Permalink
Marconi Utxos addiotional index-slot-info
Browse files Browse the repository at this point in the history
More property test. + add volatile events to query API
  • Loading branch information
kayvank committed Dec 2, 2022
1 parent 955481c commit 5fd6c3d
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 73 deletions.
89 changes: 48 additions & 41 deletions marconi/src/Marconi/Index/Utxos.hs
Expand Up @@ -40,17 +40,14 @@ module Marconi.Index.Utxos where
-- , open
-- ) whe re

import Cardano.Api qualified as C
import Codec.Serialise (Serialise (encode), deserialiseOrFail, serialise)
import Codec.Serialise.Class (Serialise (decode))
import Control.Concurrent.Async (concurrently_)
import Control.Exception (bracket_)
import Control.Lens (filtered, folded, traversed)
import Control.Lens.Operators ((%~), (&), (^.), (^..))
import Control.Lens.TH (makeLenses)

import Cardano.Binary (fromCBOR, toCBOR)
import Codec.Serialise (Serialise (encode), deserialiseOrFail, serialise)
import Codec.Serialise.Class (Serialise (decode))
import Control.Exception (bracket_)
import Control.Monad (unless)
import Control.Monad (unless, when)
import Data.Aeson (ToJSON (toJSON))
import Data.Aeson qualified
import Data.ByteString.Lazy (toStrict)
Expand All @@ -66,10 +63,14 @@ import Database.SQLite.Simple.FromRow (FromRow (fromRow), field)
import Database.SQLite.Simple.ToField (ToField (toField))
import Database.SQLite.Simple.ToRow (ToRow (toRow))
import GHC.Generics (Generic)
import System.Random.MWC (createSystemRandom, uniformR)
import Text.ParserCombinators.Parsec (parse)

import Cardano.Api qualified as C
import Cardano.Binary (fromCBOR, toCBOR)
import Marconi.Types (CurrentEra)
import RewindableIndex.Index.VSqlite (SqliteIndex)
import RewindableIndex.Index.VSqlite qualified as Ix
import Text.ParserCombinators.Parsec (parse)

data Utxo = Utxo
{ _utxoAddress :: !C.AddressAny
Expand Down Expand Up @@ -225,16 +226,14 @@ instance FromField C.BlockNo where
instance ToField C.BlockNo where
toField (C.BlockNo s) = SQLInteger $ fromIntegral s


open
:: FilePath
:: FilePath -- ^ sqlite file path
-> Depth
-> IO UtxoIndex
open dbPath (Depth k) = do
-- The second parameter ((k + 1) * 2) specifies the amount of events that are buffered.
-- The larger the number, the more RAM the indexer uses. However, we get improved SQL
-- queries due to batching more events together.
-- TODO this required for k > 0
ix <- fromJust <$> Ix.newBoxed query store onInsert k ((k + 1) * 2) dbPath
let conn = ix ^. Ix.handle
SQL.execute_ conn "DROP TABLE IF EXISTS unspent_transactions"
Expand All @@ -254,47 +253,56 @@ eventAtAddress addr event =
if null utxosAtAddress then []
else [event { _utxoEventUtxos = utxosAtAddress }]

addressFilteredEvents :: C.AddressAny -> [UtxoEvent] -> [UtxoEvent]
addressFilteredEvents addr = concatMap (eventAtAddress addr)
findByAddress :: C.AddressAny -> [UtxoEvent] -> [UtxoEvent]
findByAddress addr = concatMap (eventAtAddress addr)

rmSpentUtxos :: UtxoEvent -> UtxoEvent
rmSpentUtxos event =
event & utxoEventUtxos %~ (f (event ^. utxoEventInputs) )
where
f :: (Set.Set C.TxIn) -> [Utxo] -> [Utxo]
f txIns utxos = filter (not . isUtxoSpent txIns) utxos
isUtxoSpent :: (Set.Set C.TxIn) -> Utxo -> Bool
isUtxoSpent txIns u = ( C.TxIn (u ^. utxoTxId)(u ^. utxoTxIx)) `Set.member` txIns

toRows :: UtxoEvent -> [UtxoRow]
toRows event = event ^. utxoEventUtxos & traversed %~ f
where
f :: Utxo -> UtxoRow
f u = UtxoRow u (event ^. utxoEventSlotNo ) (event ^. utxoEventBlockNo)

-- | Query the data stored in the indexer as a whole from:
-- + hotStore : in-memory
-- + coldStore : SQL DB
-- + buffered : data that can still change (through rollbacks)
--
addressFilteredRows :: C.AddressAny -> [UtxoEvent] -> [UtxoRow]
addressFilteredRows addr = (concatMap toRows ) . addressFilteredEvents addr
addressFilteredRows addr = (concatMap toRows ) . findByAddress addr

-- | Query the data stored in the indexer
query
:: UtxoIndex
-> C.AddressAny
-> [UtxoEvent] -- ^ inflight events
-> IO Result
query ix addr events = do
fromColdStore <-
:: UtxoIndex -- ^ in-memory indexer
-> C.AddressAny -- ^ Address to filter for
-> [UtxoEvent] -- ^ volatile events that may be rollbacked
-> IO Result -- ^ search results
query ix addr volatiles = do
diskStored <-
SQL.query
(ix ^. Ix.handle)
"SELECT u.address, u.txId, u.txIx, u.datum, u.datumHash, u.value, u.slotNo, u.blockNo FROM unspent_transactions u LEFT JOIN spent s ON u.txId = s.txId AND u.txIx = s.txIx WHERE u.address = ?"
(Only addr) :: IO[UtxoRow]

-- putStrLn $ show fromColdStore <> "fromColdstore"
hotStore <- Ix.getEvents $ ix ^. Ix.storage :: IO [UtxoEvent]
-- putStrLn $ show hotStore <> " fromhotstore"
buffered <- Ix.getBuffer $ ix ^. Ix.storage :: IO [UtxoEvent]
let events = volatiles ++ buffered
pure . Just $
(addressFilteredRows addr events)
`union`
(addressFilteredRows addr hotStore)
( concatMap toRows . fmap rmSpentUtxos . (findByAddress addr) $ events)
`union`
fromColdStore
-- TODO
-- & filter (\u -> not (_reference u `Set.member` spentOutputs))
-- & map _reference
diskStored

-- | Query the data stored in the indexer as a whole from:
-- + volatile : in-memory, datat that may rollback
-- + diskStore : on-disk
-- + buffered : in-memeoy, data that will flush to storage
queryPlusVolatile
:: UtxoIndex -- ^ in-memory indexer
-> C.AddressAny -- ^ Address to filter for
-> IO Result -- ^ search results
queryPlusVolatile ix addr =
Ix.getEvents (ix ^. Ix.storage) >>= query ix addr

onInsert :: UtxoIndex -> UtxoEvent -> IO [Notification]
onInsert _ _ = pure []
Expand All @@ -319,12 +327,11 @@ store ix = do
"INSERT OR REPLACE INTO spent (txId, txIx) VALUES (?, ?)"
spent) >> putStrLn "inserted spent")
)

-- We want to perform vacuum about once every 100 * buffer ((k + 1) * 2)
-- rndCheck <- createSystemRandom >>= uniformR (1 :: Int, 100)
-- when (rndCheck == 42) $ do
-- SQL.execute_ conn "DELETE FROM unspent_transactions WHERE unspent_transactions.rowid IN (SELECT unspent_transactions.rowid FROM unspent_transactions LEFT JOIN spent on unspent_transactions.txId = spent.txId AND unspent_transactions.txIx = spent.txIx WHERE spent.txId IS NOT NULL)"
-- SQL.execute_ conn "VACUUM"
rndCheck <- createSystemRandom >>= uniformR (1 :: Int, 100)
when (rndCheck == 42) $ do
SQL.execute_ conn "DELETE FROM unspent_transactions WHERE unspent_transactions.rowid IN (SELECT unspent_transactions.rowid FROM unspent_transactions LEFT JOIN spent on unspent_transactions.txId = spent.txId AND unspent_transactions.txIx = spent.txIx WHERE spent.txId IS NOT NULL)"
SQL.execute_ conn "VACUUM"

toAddr :: C.AddressInEra CurrentEra -> C.AddressAny
toAddr (C.AddressInEra C.ByronAddressInAnyEra addr) = C.AddressByron addr
Expand Down
56 changes: 24 additions & 32 deletions marconi/test/Index/Spec.hs
Expand Up @@ -8,7 +8,6 @@

module Index.Spec (tests) where

import Cardano.Api qualified as C
import Control.Lens (folded)
import Control.Lens.Operators ((^.), (^..))
import Control.Monad (forM)
Expand All @@ -17,23 +16,29 @@ import Data.List (nub)
import Data.List.NonEmpty (nonEmpty, toList)
import Data.Maybe (catMaybes, fromJust)
import Database.SQLite.Simple qualified as SQLite
import Gen.Cardano.Api.Typed qualified as CGen

import Hedgehog (Gen, Property, diff, forAll, property, (===))
import Hedgehog qualified
import Hedgehog.Gen qualified as Gen
import Hedgehog.Range qualified as Range
import Test.Tasty (TestTree, testGroup)
import Test.Tasty.Hedgehog (testPropertyNamed)

import Cardano.Api qualified as C
import Gen.Cardano.Api.Typed qualified as CGen
import Marconi.Index.Utxos qualified as Utxos
import Marconi.Indexers (uTxo, uTxoEvents)
import Marconi.Types (CurrentEra, TargetAddresses)
import RewindableIndex.Index.VSplit qualified as Ix
import Test.Tasty (TestTree, testGroup)
import Test.Tasty.Hedgehog (testPropertyNamed)

tests :: TestTree
tests = testGroup "Marconi.Index.Specs" $
[testPropertyNamed "Index-utxos-build" "Spec. Utxo from Cardano.Api.Tx" txToUtxoTest
, testPropertyNamed "Index-utxos-event-at-address" "Spec. Filter UtxoEvents for an address" addressFilteredEventsTest
, testPropertyNamed "Index-utxos-store" "Spec. Save and retreive UtxoEvents" roundTripUtxoEventsTest
[testPropertyNamed "Index-utxos-build" "Spec. Utxo from Cardano.Api.Tx with targetAddresses"
txToUtxoTest
, testPropertyNamed "Index-utxos-event-at-address" "Spec. Filter UtxoEvents for an address"
addressFilteredEventsTest
, testPropertyNamed "Index-utxos-store" "Spec. Save and retreive UtxoEvents"
utxoStorageTest
, testPropertyNamed "Index-utxos-hot-store" "Spec. Save and retreive UtxoEvents from hot-store only"
hotstoreUtxoEventTest
]
Expand All @@ -51,11 +56,13 @@ txToUtxoTest = property $ do
t@(C.Tx (C.TxBody C.TxBodyContent{C.txOuts}) _) <- forAll $ CGen.genTx C.BabbageEra
let (targetAddresses :: Maybe TargetAddresses ) = addressesFromTxOuts txOuts
let (utxos :: [Utxos.Utxo]) = uTxo targetAddresses t
-- we should not see any target addresses in the utxos
(areTargetAddressesInUtxos targetAddresses utxos) === True
let utxos = uTxo Nothing t :: [Utxos.Utxo]
-- there should be some utxos
Hedgehog.assert (null utxos)
case targetAddresses of
Nothing -> (length utxos) === (length txOuts)
Just targets ->
( nub
. catMaybes
. fmap (\x -> addressAnyToShelley (x ^. Utxos.utxoAddress))
$ utxos) === (nub . toList $ targets)

genBlockNo :: Gen C.BlockNo
genBlockNo = C.BlockNo <$> Gen.word64 Range.constantBounded
Expand All @@ -74,26 +81,22 @@ hotstoreUtxoEventTest = property $ do
(addresses :: [C.AddressAny]) = nub . fmap (\r -> r ^. Utxos.utxoRowUtxo . Utxos.utxoAddress ) $ rows
ndx <- liftIO $ Utxos.open ":memory:" (Utxos.Depth 2196) -- no db writes, test memory only
ix <- liftIO $ Ix.insert event ndx
storeResults <- liftIO $ ( forM addresses (\addr -> Utxos.query ix addr []) :: IO [Utxos.Result ] )
storeResults <- liftIO $ ( forM addresses (\addr -> Utxos.queryPlusVolatile ix addr) :: IO [Utxos.Result ] )
let (fromQuery :: [Utxos.UtxoRow] ) = concat . catMaybes $ storeResults
let conn = ix ^. Ix.handle
liftIO $ SQLite.close conn
Hedgehog.diff (length rows) (==) (length fromQuery)
rows === fromQuery






roundTripUtxoEventsTest :: Property
roundTripUtxoEventsTest = property $ do
ndx <- liftIO $ Utxos.open ":memory:" (Utxos.Depth 2196)
utxoStorageTest :: Property
utxoStorageTest = property $ do
ndx <- liftIO $ Utxos.open ":memory:" (Utxos.Depth 2) -- force storage to use sqlite + buffer + volatile
events <- forAll $ Gen.list (Range.linear 2 4) genEvents -- force db flush
let (rows :: [Utxos.UtxoRow]) = nub . concatMap Utxos.toRows $ events
(addresses :: [C.AddressAny]) = nub . fmap (\r -> r ^. Utxos.utxoRowUtxo . Utxos.utxoAddress ) $ rows
ix <- liftIO $ Ix.insertL (events) ndx
rowsFromStore <- liftIO $ (concat . catMaybes) <$> forM addresses (\a -> (ix ^. Ix.query) ix a [] )
rowsFromStore <- liftIO $ (concat . catMaybes) <$> forM addresses (\a -> Utxos.queryPlusVolatile ix a )
(null rowsFromStore) === False
let (tid :: Utxos.UtxoRow -> C.AddressAny) = (\r -> r ^. Utxos.utxoRowUtxo . Utxos.utxoAddress)
(all (`elem` (tid <$> rows)) (tid <$> rowsFromStore)) === True
Expand All @@ -110,14 +113,3 @@ addressesFromTxOuts _ = Nothing
addressAnyToShelley :: C.AddressAny -> Maybe (C.Address C.ShelleyAddr)
addressAnyToShelley (C.AddressShelley a )=Just a
addressAnyToShelley _ = Nothing


areTargetAddressesInUtxos :: (Maybe TargetAddresses) -> [Utxos.Utxo] -> Bool
areTargetAddressesInUtxos Nothing _ = True
areTargetAddressesInUtxos (Just targets) utxos =
let
addressesFromUtxos = catMaybes . fmap (\x -> addressAnyToShelley ( x ^. Utxos.utxoAddress)) $ utxos
targetList = toList targets
in
all (`elem` targetList) addressesFromUtxos

0 comments on commit 5fd6c3d

Please sign in to comment.