Skip to content

Commit

Permalink
wip: query on disk database
Browse files Browse the repository at this point in the history
  • Loading branch information
eyeinsky committed Dec 2, 2022
1 parent 04af26a commit e490494
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 51 deletions.
2 changes: 1 addition & 1 deletion cardano-streaming/src/Cardano/Streaming.hs
Expand Up @@ -156,7 +156,7 @@ blocksPipelined pipelineSize con chainPoint = do
-- * Ledger states

-- | Get a stream of permanent ledger states
ledgerStates :: FilePath -> FilePath -> C.ValidationMode -> S.Stream (S.Of C.LedgerState) IO ()
ledgerStates :: FilePath -> FilePath -> C.ValidationMode -> S.Stream (S.Of C.LedgerState) IO r
ledgerStates config socket validationMode = do
(env, initialLedgerStateHistory) <- liftIO $ getEnvAndInitialLedgerStateHistory config
blocks (mkConnectInfo env socket) C.ChainPointAtGenesis
Expand Down
126 changes: 87 additions & 39 deletions marconi/src/Marconi/Index/EpochStakepoolSize.hs
@@ -1,10 +1,12 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeSynonymInstances #-}

module Marconi.Index.EpochStakepoolSize where

Expand All @@ -21,11 +23,13 @@ import Data.Tuple (swap)
import Data.VMap qualified as VMap
import Data.Word (Word64)
import Database.SQLite.Simple qualified as SQL
import Database.SQLite.Simple.FromField qualified as SQL
import Database.SQLite.Simple.ToField qualified as SQL
import Streaming.Prelude qualified as S
import System.Environment qualified as IO

import Cardano.Api qualified as C
import Cardano.Api.Shelley qualified as CS
import Cardano.Api.Shelley qualified as C
import Cardano.Streaming qualified as CS

import Cardano.Ledger.Coin qualified as L
Expand All @@ -43,27 +47,28 @@ import Cardano.Streaming.Helpers (getEpochNo)

-- * Event

type Event = (CS.EpochNo, M.Map (LK.KeyHash 'LK.StakePool (LE.Crypto (O.ShelleyEra O.StandardCrypto))) L.Coin)
newtype Event = Event (C.EpochNo, M.Map C.PoolId C.Lovelace)

toEvents :: S.Stream (S.Of CS.LedgerState) IO r -> S.Stream (S.Of Event) IO r
toEvents :: S.Stream (S.Of C.LedgerState) IO r -> S.Stream (S.Of Event) IO r
toEvents source = source
& S.mapMaybe toNoByron
& firstEventOfEveryEpoch
where
-- | Skip Byron era as it doesn't have staking
-- Skip Byron era as it doesn't have staking.
toNoByron :: C.LedgerState -> Maybe Event
toNoByron ls = if
| Just epochNo <- getEpochNo ls
, Just m <- getStakeMap ls -> Just (epochNo, m)
, Just m <- getStakeMap ls -> Just $ Event (epochNo, m)
| otherwise -> Nothing

-- | We get LedgerState at every block from the @ledgerStates@
-- streamer but we only want the first one of every epoch
-- We get LedgerState at every block from the ledgerStates
-- streamer but we only want the first one of every epoch, so we
-- zip them and only emit ledger states at epoch boundaries.
firstEventOfEveryEpoch :: S.Stream (S.Of Event) IO r -> S.Stream (S.Of Event) IO r
firstEventOfEveryEpoch source' = source'
& S.slidingWindow 2 -- zip stream with tail
& S.slidingWindow 2
& S.mapMaybe (\case
((e0, _) Seq.:<| t@ (e1, _) Seq.:<| Seq.Empty)
(Event (e0, _) Seq.:<| t@(Event (e1, _)) Seq.:<| Seq.Empty)
| succ e0 == e1 -> Just t
| e0 == e1 -> Nothing
| otherwise -> error $ "This should never happen: consequent epochs wider apart than by one: " <> show (e0, e1)
Expand All @@ -72,20 +77,20 @@ toEvents source = source

getStakeMap
:: C.LedgerState
-> Maybe (M.Map (LK.KeyHash 'LK.StakePool (LE.Crypto (O.ShelleyEra O.StandardCrypto))) L.Coin)
-> Maybe (M.Map C.PoolId C.Lovelace)
getStakeMap ledgerState' = case ledgerState' of
C.LedgerStateByron _st -> Nothing
C.LedgerStateShelley st -> fromState st
C.LedgerStateAllegra st -> fromState st
C.LedgerStateMary st -> fromState st
C.LedgerStateAlonzo st -> fromState st
CS.LedgerState (O.LedgerStateBabbage st) -> fromState st -- TODO pattern missing from cardano-node: is it there on master? if not create PR.
C.LedgerStateByron _st -> Nothing
C.LedgerStateShelley st -> fromState st
C.LedgerStateAllegra st -> fromState st
C.LedgerStateMary st -> fromState st
C.LedgerStateAlonzo st -> fromState st
C.LedgerState (O.LedgerStateBabbage st) -> fromState st -- TODO pattern missing from cardano-node: is it there on master? if not create PR.
where
fromState
:: forall proto era c
. (c ~ LE.Crypto era)
=> O.LedgerState (O.ShelleyBlock proto era)
-> Maybe (M.Map (LK.KeyHash 'LK.StakePool (LE.Crypto era)) L.Coin)
-> Maybe (M.Map C.PoolId C.Lovelace)
fromState st = Just res
where
nes = O.shelleyLedgerState st :: SL.NewEpochState era
Expand All @@ -96,17 +101,21 @@ getStakeMap ledgerState' = case ledgerState' of
delegations :: VMap.VMap VMap.VB VMap.VB (LC.Credential 'LK.Staking c) (LK.KeyHash 'LK.StakePool c)
delegations = Shelley._delegations stakeSnapshot

res :: M.Map (LK.KeyHash 'LK.StakePool c) L.Coin
res = M.fromListWith (\(L.Coin a) (L.Coin b) -> L.Coin (a + b)) $ map swap $ P.catMaybes $ VMap.elems $
VMap.mapWithKey (\cred spkHash -> (\c -> (L.fromCompact c, spkHash)) <$> VMap.lookup cred stakes) delegations
res :: M.Map C.PoolId C.Lovelace
res = M.fromListWith (+) $ map swap $ P.catMaybes $ VMap.elems $
VMap.mapWithKey (\cred spkHash -> (\c -> (C.Lovelace $ coerce $ L.fromCompact c, f spkHash)) <$> VMap.lookup cred stakes) delegations

indexer :: FilePath -> FilePath -> FilePath -> IO ()
indexer conf socket db = S.effects $
f k = undefined -- C.StakePoolKeyHash k

indexer
:: FilePath -> FilePath -> FilePath
-> S.Stream (S.Of Event) IO r
indexer conf socket db =
CS.ledgerStates conf socket C.QuickValidation
& toEvents
& sqlite db

-- * Store in Sqlite
-- * Sqlite

sqlite
:: FilePath
Expand All @@ -120,19 +129,58 @@ sqlite db source = do

let loop source' = lift (S.next source') >>= \case
Left r -> pure r
Right (event@ (epochNo, map'), source'') -> do
let rows = M.toList map'
lift $ forM_ rows $ \(keyHash, coin) ->
SQL.execute c
"INSERT INTO stakepool_delegation (poolId, lovelace, epochId) VALUES (?, ?, ?)" $ let
in (toPoolId keyHash, coerce coin :: Integer, coerce epochNo :: Word64)
Right (event, source'') -> do
lift $ forM_ (toRows event) $ \row ->
SQL.execute c "INSERT INTO stakepool_delegation (poolId, lovelace, epochId) VALUES (?, ?, ?)" row
S.yield event
loop source''
loop source


toRows :: Event -> [[SQL.SQLData]]
toRows (Event (epochNo, m)) = map (SQL.toField epochNo:) m'
where
m' = map (\(keyHash, coin) -> [SQL.toField $ toPoolId keyHash, SQL.toField $ (coerce coin :: Integer)]) $ M.toList m

instance SQL.ToField C.EpochNo where
toField (C.EpochNo word64) = SQL.toField word64
instance SQL.FromField C.EpochNo where
fromField f = C.EpochNo <$> SQL.fromField f

-- instance SQL.ToField C.EpochNo where
-- toField _ = undefined
instance SQL.ToField C.Lovelace where
toField = undefined
instance SQL.FromField C.Lovelace where
fromField = undefined

instance SQL.FromField C.PoolId where
fromField = undefined

queryByEpoch :: SQL.Connection -> C.EpochNo -> IO Event
queryByEpoch c epochNo = do
xs :: [(C.PoolId, C.Lovelace)] <- SQL.query c "SELECT poolId, lovelace FROM stakepool_delegation WHERE epochId = ?" (SQL.Only epochNo)
return $ Event (epochNo, M.fromList xs)
queryPoolId' :: FilePath -> C.PoolId -> IO ()
queryPoolId' dbPath poolId = do
c <- SQL.open dbPath
let poolId' = case poolId of C.StakePoolKeyHash k -> k
-- result :: [Event] <- SQL.query c
-- "SELECT epochId, lovelace FROM stakepool_delegation WHERE poolId = ?" (SQL.Only $ toPoolId $ poolId' )
-- return result
return undefined -- TODO
queryPoolId :: SQL.Connection -> C.PoolId -> IO ()
queryPoolId c poolId = do
-- result :: [(C.EpochNo, C.Lovelace)] <- SQL.query c
-- "SELECT epochId, lovelace FROM stakepool_delegation WHERE poolId = ?" (SQL.Only poolId)
-- return result
return undefined -- TODO
-- | Convert stakepool's key hash to prefixed bech32 as described in CIP-5
toPoolId :: LK.KeyHash 'LK.StakePool O.StandardCrypto -> TS.Text
toPoolId kh = C.serialiseToBech32 (CS.StakePoolKeyHash kh)
toPoolId :: C.PoolId -> TS.Text
toPoolId kh = C.serialiseToBech32 kh
-- * Tmp; TODO: remove this
Expand All @@ -157,7 +205,7 @@ mainnet = ( "cardano/config/config.json"
hot :: IO ()
hot = S.effects $ hot2 preprod
& S.chain (\case
(epochNo, m) -> do
Event (epochNo, m) -> do
putStrLn $ show epochNo <> " (" <> show (M.size m) <> ")"
forM_ (sortBy (flip compare `on` snd) $ M.toList m) $ \(k, v) ->
putStrLn $ " " <> TS.unpack (toPoolId k) <> ": " <> show v
Expand Down
1 change: 1 addition & 0 deletions marconi/src/Marconi/Indexers.hs
Expand Up @@ -255,6 +255,7 @@ epochStakepoolSizeWorker configPath Coordinator{_barrier} tchan dbPath = do

S.effects indexer
where
-- Read blocks from TChan, emit them as a stream.
chainSyncEvents :: S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
chainSyncEvents = S.yield =<< (lift $ atomically $ readTChan tchan)

Expand Down
30 changes: 19 additions & 11 deletions marconi/test/EpochStakepoolSize.hs
Expand Up @@ -84,31 +84,39 @@ test = H.integration . HE.runFinallies . TN.workspace "chairman" $ \tempAbsPath
, C.WitnessStakeKey stakeSKey2
]

(poolVKey, tx, txBody) <- registerPool con networkId pparams tempAbsPath keyWitnesses [stakeCredential, stakeCredential2] genesisAddress
-- Prepare transaction to register stakepool and stake funds
(poolVKey :: C.PoolId, tx, txBody) <- registerPool con networkId pparams tempAbsPath keyWitnesses [stakeCredential, stakeCredential2] genesisAddress

-- start indexer
found <- liftIO IO.newEmptyMVar
let dbPath = "/home/markus/stakepool-test.db"
void $ liftIO $ do
chan <- IO.newChan
let indexer = CS.ledgerStates (TN.configurationFile runtime) socketPath C.QuickValidation
& EpochStakepoolSize.toEvents
& EpochStakepoolSize.sqlite "/home/markus/stakepool-test.db"
& S.chain (IO.writeChan chan)
& EpochStakepoolSize.sqlite dbPath
& S.chain (IO.writeChan chan) -- After indexer has written the event to database, we write it to the chan
void $ (IO.link =<<) $ IO.async $ void $ S.effects indexer

let poolVKey' = case poolVKey :: C.PoolId of C.StakePoolKeyHash k' -> k'
-- Consume the channel until an event is found which (1) has the
-- pool ID and (2) has the right amount of lovelace staked.
(IO.link =<<) $ IO.async $ forever $ do
(_epochNo, stakeMap) <- IO.readChan chan

-- In the following we test whether the stakepool we staked in
-- has the amount of lovelace that we staked.
case Map.lookup poolVKey' stakeMap of
Just coin -> when (coin == L.Coin 120_000_000) $ IO.putMVar found ()
_ -> return ()
EpochStakepoolSize.Event (_epochNo, stakeMap) <- IO.readChan chan
case Map.lookup poolVKey stakeMap of
Just lovelace -> when (lovelace == 120_000_000) $ IO.putMVar found () -- Event found!
_ -> return ()

-- Submit transaction to create stakepool and stake the funds
TN.submitAwaitTx con (tx, txBody)

liftIO $ IO.takeMVar found

-- After indexer as processed the event, we know that it must also be in the database

liftIO $ EpochStakepoolSize.queryPoolId' dbPath poolVKey
-- query db



-- | This is a pure version of `runStakePoolRegistrationCert` defined in /cardano-node/cardano-cli/src/Cardano/CLI/Shelley/Run/Pool.hs::60
makeStakePoolRegistrationCert_
Expand Down

0 comments on commit e490494

Please sign in to comment.