Skip to content

Commit

Permalink
Marconi - add addtional utxo slot into
Browse files Browse the repository at this point in the history
more property tests for the in memory hot store
  • Loading branch information
kayvank committed Dec 2, 2022
1 parent 56f6437 commit 955481c
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 71 deletions.
1 change: 1 addition & 0 deletions marconi/marconi.cabal
Expand Up @@ -164,6 +164,7 @@ test-suite marconi-test
, lens
, prettyprinter
, serialise
, sqlite-simple
, stm
, streaming
, tasty
Expand Down
133 changes: 71 additions & 62 deletions marconi/src/Marconi/Index/Utxos.hs
Expand Up @@ -21,39 +21,41 @@
| Slot
| BlockNumber
| transactionIndexWithinTheBlock (-}
module Marconi.Index.Utxos
( Utxo (..)
, UtxoEvent (..)
, UtxoRow (..)
, C.BlockNo (..)
, C.SlotNo (..)
, Depth (..)
, Result
, toRows
, addressFilteredRows
, toAddr
, UtxoIndex
, HasUtxo (..)
, HasUtxoRow (..)
, HasUtxoEvent (..)
, open
) where
module Marconi.Index.Utxos where
-- ( eventAtAddress
-- , Utxo (..)
-- , UtxoEvent (..)
-- , UtxoRow (..)
-- , C.BlockNo (..)
-- , C.SlotNo (..)
-- , Depth (..)
-- , Result
-- , toRows
-- , addressFilteredRows
-- , toAddr
-- , UtxoIndex
-- , HasUtxo (..)
-- , HasUtxoRow (..)
-- , HasUtxoEvent (..)
-- , open
-- ) whe re

import Cardano.Api qualified as C
import Control.Concurrent.Async (concurrently_)
import Control.Lens (filtered, folded, traversed)
import Control.Lens.Operators ((%~), (&), (^.), (^..))
import Control.Lens.TH (makeClassy)
import Control.Lens.TH (makeLenses)

import Cardano.Binary (fromCBOR, toCBOR)
import Codec.Serialise (Serialise (encode), deserialiseOrFail, serialise)
import Codec.Serialise.Class (Serialise (decode))
import Control.Exception (bracket_)
import Control.Monad (unless, when)
import Data.Aeson (ToJSON (toEncoding, toJSON), defaultOptions, genericToEncoding)
import Control.Monad (unless)
import Data.Aeson (ToJSON (toJSON))
import Data.Aeson qualified
import Data.ByteString.Lazy (toStrict)
import Data.Maybe (catMaybes, fromJust)
import Data.List (union)
import Data.Maybe (fromJust)
import Data.Proxy (Proxy (Proxy))
import Data.Set qualified as Set
import Data.Text (pack)
Expand All @@ -67,7 +69,6 @@ import GHC.Generics (Generic)
import Marconi.Types (CurrentEra)
import RewindableIndex.Index.VSqlite (SqliteIndex)
import RewindableIndex.Index.VSqlite qualified as Ix
import System.Random.MWC (createSystemRandom, uniformR)
import Text.ParserCombinators.Parsec (parse)

data Utxo = Utxo
Expand All @@ -81,7 +82,7 @@ data Utxo = Utxo
-- , _inlineScript :: Maybe (C.ScriptDatum C.WitCtxTxIn)
} deriving (Show, Generic)

$(makeClassy ''Utxo)
$(makeLenses ''Utxo)

instance ToJSON C.AddressAny where
toJSON = Data.Aeson.String . C.serialiseAddress
Expand All @@ -106,15 +107,15 @@ data UtxoEvent = UtxoEvent
, _utxoEventBlockNo :: !C.BlockNo
} deriving (Show, Eq)

$(makeClassy ''UtxoEvent)
$(makeLenses ''UtxoEvent)

data UtxoRow = UtxoRow
{ _utxoRowUtxo :: Utxo
, _utxoRowSlotNo :: !C.SlotNo
, _utxoRowBlockNo :: !C.BlockNo
} deriving (Show, Eq, Ord, Generic)

$(makeClassy ''UtxoRow)
$(makeLenses ''UtxoRow)

instance ToJSON C.BlockNo

Expand All @@ -137,6 +138,16 @@ instance FromRow C.TxIn where
fromRow = C.TxIn <$> field <*> field
instance ToRow C.TxIn where
toRow (C.TxIn txid txix) = toRow (txid, txix)
instance ToRow UtxoRow where
toRow u = toRow (
(u ^. utxoRowUtxo . utxoAddress)
, (u ^. utxoRowUtxo . utxoTxId)
, (u ^. utxoRowUtxo . utxoTxIx)
, (u ^. utxoRowUtxo . utxoDatum)
, (u ^. utxoRowUtxo . utxoDatumHash)
, (u ^. utxoRowUtxo . utxoValue)
, (u ^. utxoRowSlotNo)
, (u ^. utxoRowBlockNo))

instance FromRow UtxoRow where
fromRow
Expand Down Expand Up @@ -234,17 +245,17 @@ open dbPath (Depth k) = do
SQL.execute_ conn "CREATE UNIQUE INDEX IF NOT EXISTS unspent_transaction_txid ON unspent_transactions (txId)"
pure ix

isAtAddress :: C.AddressAny -> Utxo -> Bool
isAtAddress address' utx = (utx ^. utxoAddress) == address'

onlyAt :: C.AddressAny -> UtxoEvent -> Maybe UtxoEvent
onlyAt address' event =
eventAtAddress :: C.AddressAny -> UtxoEvent -> [UtxoEvent]
eventAtAddress addr event =
let
uts = event ^. utxoEventUtxos ^.. folded . filtered (isAtAddress address')
utxosAtAddress :: [Utxo]
utxosAtAddress = event ^. utxoEventUtxos ^.. folded . filtered (\u -> (u ^. utxoAddress ) == addr)
in
if null uts then
Nothing
else Just event {_utxoEventUtxos = uts}
if null utxosAtAddress then []
else [event { _utxoEventUtxos = utxosAtAddress }]

addressFilteredEvents :: C.AddressAny -> [UtxoEvent] -> [UtxoEvent]
addressFilteredEvents addr = concatMap (eventAtAddress addr)

toRows :: UtxoEvent -> [UtxoRow]
toRows event = event ^. utxoEventUtxos & traversed %~ f
Expand All @@ -257,65 +268,63 @@ toRows event = event ^. utxoEventUtxos & traversed %~ f
-- + coldStore : SQL DB
-- + buffered : data that can still change (through rollbacks)
--
addressFilteredRows :: C.AddressAny -> [UtxoEvent] -> Set.Set UtxoRow
addressFilteredRows addr = Set.fromList . concatMap toRows . catMaybes . fmap (onlyAt addr)
addressFilteredRows :: C.AddressAny -> [UtxoEvent] -> [UtxoRow]
addressFilteredRows addr = (concatMap toRows ) . addressFilteredEvents addr

query
:: UtxoIndex
-> C.AddressAny
-> [UtxoEvent] -- ^ inflight events
-> IO Result
query ix address' events = do
query ix addr events = do
fromColdStore <-
SQL.query
(ix ^. Ix.handle)
"SELECT u.address, u.txId, u.txIx, u.datum, u.datumHash, u.value, u.slotNo, u.blockNo FROM unspent_transactions u LEFT JOIN spent s ON u.txId = s.txId AND u.txIx = s.txIx WHERE u.address = ?"
(Only address')
buffered <- Ix.getBuffer $ ix ^. Ix.storage
pure . Just . Set.toList
$ (Set.fromList fromColdStore
`Set.union` (addressFilteredRows address' buffered)
`Set.union` (addressFilteredRows address' events) )
(Only addr) :: IO[UtxoRow]

-- putStrLn $ show fromColdStore <> "fromColdstore"
hotStore <- Ix.getEvents $ ix ^. Ix.storage :: IO [UtxoEvent]
-- putStrLn $ show hotStore <> " fromhotstore"
pure . Just $
(addressFilteredRows addr events)
`union`
(addressFilteredRows addr hotStore)
`union`
fromColdStore
-- TODO
-- & filter (\u -> not (_reference u `Set.member` spentOutputs))
-- & map _reference

onInsert :: UtxoIndex -> UtxoEvent -> IO [Notification]
onInsert _ _ = pure []
onInsert _ _ = pure []

store :: UtxoIndex -> IO ()
store ix = do
buffer <- Ix.getBuffer $ ix ^. Ix.storage
let asTuple :: UtxoRow -> (C.AddressAny, C.TxId, C.TxIx, Maybe C.ScriptData, Maybe (C.Hash C.ScriptData), C.Value , C.SlotNo, C.BlockNo)
asTuple u =
( (u ^. utxoRowUtxo . utxoAddress)
, (u ^. utxoRowUtxo . utxoTxId)
, (u ^. utxoRowUtxo . utxoTxIx)
, (u ^. utxoRowUtxo . utxoDatum)
, (u ^. utxoRowUtxo . utxoDatumHash)
, (u ^. utxoRowUtxo . utxoValue)
, (u ^. utxoRowSlotNo)
, (u ^. utxoRowBlockNo))
rows = (fmap asTuple) . (concatMap toRows) $ buffer
putStrLn "storing row"
let rows = (concatMap toRows) $ buffer
spent = concatMap (Set.toList . _utxoEventInputs) buffer
conn = ix ^. Ix.handle

bracket_
(SQL.execute_ conn "BEGIN")
(SQL.execute_ conn "COMMIT")
( concurrently_ (
unless (null rows)
(SQL.executeMany conn
"INSERT OR REPLACE INTO unspent_transactions (address, txId, txIx, datum, datumHash, value, slotNo, blockNo) VALUES (?,?,?,?,?,?,?,?)"
rows) )
rows) >> putStrLn "inserted utxo")
(unless (null spent)
(SQL.executeMany conn
"INSERT OR REPLACE INTO spent (txId, txIx) VALUES (?, ?)"
spent))
spent) >> putStrLn "inserted spent")
)

-- We want to perform vacuum about once every 100 * buffer ((k + 1) * 2)
rndCheck <- createSystemRandom >>= uniformR (1 :: Int, 100)
when (rndCheck == 42) $ do
SQL.execute_ conn "DELETE FROM unspent_transactions WHERE unspent_transactions.rowid IN (SELECT unspent_transactions.rowid FROM unspent_transactions LEFT JOIN spent on unspent_transactions.txId = spent.txId AND unspent_transactions.txIx = spent.txIx WHERE spent.txId IS NOT NULL)"
SQL.execute_ conn "VACUUM"
-- rndCheck <- createSystemRandom >>= uniformR (1 :: Int, 100)
-- when (rndCheck == 42) $ do
-- SQL.execute_ conn "DELETE FROM unspent_transactions WHERE unspent_transactions.rowid IN (SELECT unspent_transactions.rowid FROM unspent_transactions LEFT JOIN spent on unspent_transactions.txId = spent.txId AND unspent_transactions.txIx = spent.txIx WHERE spent.txId IS NOT NULL)"
-- SQL.execute_ conn "VACUUM"

toAddr :: C.AddressInEra CurrentEra -> C.AddressAny
toAddr (C.AddressInEra C.ByronAddressInAnyEra addr) = C.AddressByron addr
Expand Down
48 changes: 39 additions & 9 deletions marconi/test/Index/Spec.hs
Expand Up @@ -9,14 +9,17 @@
module Index.Spec (tests) where

import Cardano.Api qualified as C
import Control.Lens.Operators ((^.))
import Control.Lens (folded)
import Control.Lens.Operators ((^.), (^..))
import Control.Monad (forM)
import Control.Monad.IO.Class (liftIO)
import Data.List (nub)
import Data.List.NonEmpty (nonEmpty, toList)
import Data.Maybe (catMaybes, fromJust)
import Database.SQLite.Simple qualified as SQLite
import Gen.Cardano.Api.Typed qualified as CGen
import Hedgehog (Gen, Property, forAll, property, (===))
import Hedgehog (Gen, Property, diff, forAll, property, (===))
import Hedgehog qualified
import Hedgehog.Gen qualified as Gen
import Hedgehog.Range qualified as Range
import Marconi.Index.Utxos qualified as Utxos
Expand All @@ -29,20 +32,30 @@ import Test.Tasty.Hedgehog (testPropertyNamed)
tests :: TestTree
tests = testGroup "Marconi.Index.Specs" $
[testPropertyNamed "Index-utxos-build" "Spec. Utxo from Cardano.Api.Tx" txToUtxoTest
, testPropertyNamed "Index-utxos-event-at-address" "Spec. Filter UtxoEvents for an address" addressFilteredEventsTest
, testPropertyNamed "Index-utxos-store" "Spec. Save and retreive UtxoEvents" roundTripUtxoEventsTest
, testPropertyNamed "Index-utxos-hot-store" "Spec. Save and retreive UtxoEvents from hot-store only"
hotstoreUtxoEventTest
]

addressFilteredEventsTest :: Property
addressFilteredEventsTest = property $ do
event <- forAll $ genEvents -- force db flush
let (addresses :: [C.AddressAny]) = nub( event ^. Utxos.utxoEventUtxos ^.. folded . Utxos.utxoAddress)
event' = Utxos.eventAtAddress (head addresses) event
Hedgehog.assert ((null event') == False)
1 === (length . nub) ( (head event') ^. Utxos.utxoEventUtxos ^.. folded . Utxos.utxoAddress)

txToUtxoTest :: Property
txToUtxoTest = property $ do
t@(C.Tx (C.TxBody C.TxBodyContent{C.txOuts}) _) <- forAll $ CGen.genTx C.BabbageEra
let (targetAddresses :: Maybe TargetAddresses ) = addressesFromTxOuts txOuts
let (utxos :: [Utxos.Utxo]) = uTxo targetAddresses t
-- we should not see any target addresses in the utxos
(areTargetAddressesInUtxos targetAddresses utxos) === True

let utxos = uTxo Nothing t :: [Utxos.Utxo]
-- there should be some utxos
null utxos === False
Hedgehog.assert (null utxos)

genBlockNo :: Gen C.BlockNo
genBlockNo = C.BlockNo <$> Gen.word64 Range.constantBounded
Expand All @@ -54,16 +67,33 @@ genEvents = do
txs <- Gen.list (Range.linear 2 5)(CGen.genTx C.ShelleyEra)
pure . fromJust $ uTxoEvents Nothing slotNo blockNo txs

hotstoreUtxoEventTest :: Property
hotstoreUtxoEventTest = property $ do
event <- forAll $ genEvents
let (rows :: [Utxos.UtxoRow]) = Utxos.toRows event
(addresses :: [C.AddressAny]) = nub . fmap (\r -> r ^. Utxos.utxoRowUtxo . Utxos.utxoAddress ) $ rows
ndx <- liftIO $ Utxos.open ":memory:" (Utxos.Depth 2196) -- no db writes, test memory only
ix <- liftIO $ Ix.insert event ndx
storeResults <- liftIO $ ( forM addresses (\addr -> Utxos.query ix addr []) :: IO [Utxos.Result ] )
let (fromQuery :: [Utxos.UtxoRow] ) = concat . catMaybes $ storeResults
let conn = ix ^. Ix.handle
liftIO $ SQLite.close conn
Hedgehog.diff (length rows) (==) (length fromQuery)
rows === fromQuery






roundTripUtxoEventsTest :: Property
roundTripUtxoEventsTest = property $ do
ndx <- liftIO $ Utxos.open ":memory:" (Utxos.Depth 1)
events <- forAll $ Gen.list (Range.linear 2 3) genEvents -- force db flush
ndx <- liftIO $ Utxos.open ":memory:" (Utxos.Depth 2196)
events <- forAll $ Gen.list (Range.linear 2 4) genEvents -- force db flush
let (rows :: [Utxos.UtxoRow]) = nub . concatMap Utxos.toRows $ events
(addresses :: [C.AddressAny]) = nub . fmap (\r -> r ^. Utxos.utxoRowUtxo . Utxos.utxoAddress ) $ rows
ix <- liftIO $ Ix.insertL (events) ndx
let queryIx addr = (ix ^. Ix.query) ix addr [] -- events finding them in the events is the trivial case
rowsFromStore <- liftIO $ (concat . catMaybes) <$> forM addresses queryIx -- queryByAddress
liftIO $ print (length addresses)
rowsFromStore <- liftIO $ (concat . catMaybes) <$> forM addresses (\a -> (ix ^. Ix.query) ix a [] )
(null rowsFromStore) === False
let (tid :: Utxos.UtxoRow -> C.AddressAny) = (\r -> r ^. Utxos.utxoRowUtxo . Utxos.utxoAddress)
(all (`elem` (tid <$> rows)) (tid <$> rowsFromStore)) === True
Expand Down

0 comments on commit 955481c

Please sign in to comment.