Skip to content

Commit

Permalink
Switch to implicit syncs
Browse files Browse the repository at this point in the history
  • Loading branch information
raduom committed Feb 21, 2021
1 parent 1a58aec commit ee1d6e1
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 98 deletions.
4 changes: 2 additions & 2 deletions plutus-pab/app/PSGenerator.hs
Expand Up @@ -52,7 +52,7 @@ import Plutus.PAB.Events.Contract (ContractEven
import Plutus.PAB.Events.Node (NodeEvent)
import Plutus.PAB.Events.User (UserEvent)
import Plutus.PAB.Events.Wallet (WalletEvent)
import Plutus.PAB.MockApp (defaultWallet, syncAll)
import Plutus.PAB.MockApp (defaultWallet)
import qualified Plutus.PAB.MockApp as MockApp
import Plutus.PAB.Types (ContractExe)
import qualified Plutus.PAB.Webserver.API as API
Expand Down Expand Up @@ -184,7 +184,7 @@ writeTestData outputDir = do
schema :: ContractSignatureResponse TestContracts <-
Webserver.contractSchema (csContract currencyInstance1)
pure (report, schema)
syncAll
-- syncAll
void Chain.processBlock
pure result
BSL.writeFile
Expand Down
112 changes: 55 additions & 57 deletions plutus-pab/src/Cardano/ChainIndex/Server.hs
Expand Up @@ -67,13 +67,19 @@ main :: Trace IO ChainIndexServerMsg -> ChainIndexConfig -> FilePath -> Availabi
main trace ChainIndexConfig{ciBaseUrl} socketPath availability = runLogEffects trace $ do
let port = baseUrlPort ciBaseUrl
mVarState <- liftIO $ newMVar initialAppState
clientHandler <- liftIO $ runClientNode socketPath
logInfo StartingNodeClientThread
void $ liftIO $ forkIO $ runLogEffects trace $ updateThread 10 mVarState clientHandler
clientHandler <-
liftIO $ runClientNode socketPath $ updateChainState mVarState
-- void $ liftIO $ forkIO $ runLogEffects trace $ updateThread 10 mVarState clientHandler
let warpSettings :: Warp.Settings
warpSettings = Warp.defaultSettings & Warp.setPort port & Warp.setBeforeMainLoop (available availability)
logInfo $ StartingChainIndex port
liftIO $ Warp.runSettings warpSettings $ app mVarState
where
updateChainState :: MVar AppState -> Block -> Slot -> IO ()
updateChainState mv block slot =
processIndexEffects mv $ do
syncState block slot

healthcheck :: Monad m => m NoContent
healthcheck = pure NoContent
Expand All @@ -87,67 +93,59 @@ watchedAddresses = WalletEffects.watchedAddresses
confirmedBlocks :: (Member ChainIndexEffect effs) => Eff effs [Block]
confirmedBlocks = WalletEffects.confirmedBlocks


-- | Update the chain index by asking the node for new blocks since the last
-- time.
syncState ::
( -- Member (State AppState) effs
-- , Member (LogMsg ChainIndexServerMsg) effs
-- , Member NodeFollowerEffect effs
Member ChainIndexControlEffect effs
( -- Member (LogMsg ChainIndexServerMsg) effs
Member ChainIndexControlEffect effs
)
=> Eff effs ()
syncState = do
-- followerID <- use indexFollowerID >>=
-- maybe (logInfo ObtainingFollowerID >> NodeFollower.newFollower >>= \i -> assign indexFollowerID (Just i) >> return i) pure
-- logDebug $ UpdatingChainIndex followerID
-- newBlocks <- NodeFollower.getBlocks followerID
let newBlocks = []
-- logInfo $ ReceivedBlocksTxns (length newBlocks) (length $ fold newBlocks)
-- currentSlot <- SlotChanged <$> getSlot
let currentSlot = SlotChanged (Slot 0)
let notifications = BlockValidated <$> newBlocks
traverse_ ChainIndex.chainIndexNotify (notifications ++ [currentSlot])
=> Block
-> Slot
-> Eff effs ()
syncState block (Slot slot) = do
let slotChanged = SlotChanged (Slot slot)
traverse_ ChainIndex.chainIndexNotify [BlockValidated block, slotChanged]


-- | Get the latest transactions from the node and update the index accordingly
updateThread ::
forall effs.
( LastMember IO effs
, Member (LogMsg ChainIndexServerMsg) effs
)
=> Second
-- ^ Interval between two queries for new blocks
-> MVar AppState
-- ^ A handler to the ChainSync client.
-> ClientHandler
-- ^ Servant client for the node API
-> Eff effs ()
updateThread updateInterval mv clientHandler = do
forever $ do
watching <- processIndexEffects mv WalletEffects.watchedAddresses
unless (watching == mempty) (Dbg.trace "fetching new blocks" $ fetchNewBlocks mv clientHandler)
liftIO $ threadDelay $ fromIntegral $ toMicroseconds updateInterval

fetchNewBlocks ::
forall effs.
( LastMember IO effs
, Member (LogMsg ChainIndexServerMsg) effs
)
=> MVar AppState
-> ClientHandler
-> Eff effs ()
fetchNewBlocks mv clientHandler = do
logInfo AskingNodeForNewBlocks
receivedBlocks <- liftIO $ getBlocks clientHandler
let newBlocks = map snd receivedBlocks
currentSlot = fst $ last receivedBlocks
logInfo (ReceivedBlocksTxns (length newBlocks) (length $ fold newBlocks))
-- logInfoN $ "Received " <> tshow (length newBlocks) <> " blocks (" <> tshow (length $ fold newBlocks) <> " transactions) @" <> tshow (Ledger.getSlot currentSlot)
let notifications = BlockValidated <$> newBlocks
Dbg.trace ("[server] Notifications: " <> show notifications) $
traverse_
(processIndexEffects mv . ChainIndex.chainIndexNotify)
(notifications ++ [SlotChanged currentSlot])
-- updateThread ::
-- forall effs.
-- ( LastMember IO effs
-- , Member (LogMsg ChainIndexServerMsg) effs
-- )
-- => Second
-- -- ^ Interval between two queries for new blocks
-- -> MVar AppState
-- -- ^ A handler to the ChainSync client.
-- -> ClientHandler
-- -- ^ Servant client for the node API
-- -> Eff effs ()
-- updateThread updateInterval mv clientHandler = do
-- forever $ do
-- watching <- processIndexEffects mv WalletEffects.watchedAddresses
-- unless (watching == mempty) (Dbg.trace "fetching new blocks" $ fetchNewBlocks mv clientHandler)
-- liftIO $ threadDelay $ fromIntegral $ toMicroseconds updateInterval

-- fetchNewBlocks ::
-- forall effs.
-- ( LastMember IO effs
-- , Member (LogMsg ChainIndexServerMsg) effs
-- )
-- => MVar AppState
-- -> ClientHandler
-- -> Eff effs ()
-- fetchNewBlocks mv clientHandler = do
-- logInfo AskingNodeForNewBlocks
-- receivedBlocks <- liftIO $ getBlocks clientHandler
-- let newBlocks = map snd receivedBlocks
-- currentSlot = fst $ last receivedBlocks
-- logInfo (ReceivedBlocksTxns (length newBlocks) (length $ fold newBlocks))
-- -- logInfoN $ "Received " <> tshow (length newBlocks) <> " blocks (" <> tshow (length $ fold newBlocks) <> " transactions) @" <> tshow (Ledger.getSlot currentSlot)
-- let notifications = BlockValidated <$> newBlocks
-- Dbg.trace ("[server] Notifications: " <> show notifications) $
-- traverse_
-- (processIndexEffects mv . ChainIndex.chainIndexNotify)
-- (notifications ++ [SlotChanged currentSlot])

type ChainIndexEffects m
= '[ ChainIndexControlEffect
Expand Down
10 changes: 5 additions & 5 deletions plutus-pab/src/Cardano/Protocol/Socket/Client.hs
Expand Up @@ -47,7 +47,7 @@ queueTx (ClientHandler inputQueue _ _) tx =

-- | Forks and starts a new client node, returning the newly allocated thread id.
runClientNode :: FilePath
-> ((Slot, Block) -> IO ())
-> (Block -> Slot -> IO ())
-> IO ClientHandler
runClientNode socketPath onNewBlock = do
inputQueue <- newTQueueIO
Expand All @@ -74,15 +74,15 @@ runClientNode socketPath onNewBlock = do
{- Application that communicates using 2 multiplexed protocols
(ChainSync and TxSubmission). -}
app :: MVar Slot
-> ((Slot, Block) -> IO ())
-> (Block -> Slot -> IO ())
-> TQueue Tx
-> OuroborosApplication 'InitiatorMode addr
LBS.ByteString IO () Void
app mSlot onNewBlock' inputQueue =
nodeApplication (chainSync mSlot onNewBlock') (txSubmission inputQueue)

chainSync :: MVar Slot
-> ((Slot, Block) -> IO ())
-> (Block -> Slot -> IO ())
-> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
chainSync mSlot onNewBlock' =
InitiatorProtocolOnly $
Expand All @@ -104,7 +104,7 @@ runClientNode socketPath onNewBlock = do

-- | The client updates the application state when the protocol state changes.
chainSyncClient :: MVar Slot
-> ((Slot, Block) -> IO ())
-> (Block -> Slot -> IO ())
-> ChainSync.ChainSyncClient Block (Point Block) Tip IO ()
chainSyncClient mSlot onNewBlock =
ChainSync.ChainSyncClient $ pure requestNext
Expand All @@ -122,7 +122,7 @@ chainSyncClient mSlot onNewBlock =
ChainSync.recvMsgRollForward = \block _ ->
ChainSync.ChainSyncClient $ do
currentSlot <- takeMVar mSlot
onNewBlock (currentSlot + 1, block)
onNewBlock block (currentSlot + 1)
putMVar mSlot (currentSlot + 1)
return requestNext
, ChainSync.recvMsgRollBackward = error "Not supported."
Expand Down
32 changes: 16 additions & 16 deletions plutus-pab/src/Plutus/PAB/MockApp.hs
Expand Up @@ -19,8 +19,8 @@
-- No networking, no filesystem.
module Plutus.PAB.MockApp
( runScenario
, sync
, syncAll
-- , sync
-- , syncAll
, MockAppEffects
, defaultWallet
, TestState
Expand Down Expand Up @@ -161,7 +161,7 @@ runScenario ::
runScenario action = do
(result, finalState) <- runMockApp initialTestState $ do
void Wallet.Emulator.Chain.processBlock
syncAll
-- syncAll
action
case result of
Left err -> do
Expand Down Expand Up @@ -260,21 +260,21 @@ handleErrors action =
& flip handleError (throwError . WalletError)

-- | Synchronise the agent's view of the blockchain with the node.
sync ::
forall effs.
( Member MultiAgentPABEffect effs
)
=> Wallet
-> Eff effs ()
sync wllt = do
PAB.MultiAgent.agentControlAction wllt ChainIndex.syncState
PAB.MultiAgent.agentAction wllt $ do
processAllContractInboxes @TestContracts
processAllContractOutboxes @TestContracts defaultMaxIterations
-- sync ::
-- forall effs.
-- ( Member MultiAgentPABEffect effs
-- )
-- => Wallet
-- -> Eff effs ()
-- sync wllt = do
-- PAB.MultiAgent.agentControlAction wllt ChainIndex.syncState
-- PAB.MultiAgent.agentAction wllt $ do
-- processAllContractInboxes @TestContracts
-- processAllContractOutboxes @TestContracts defaultMaxIterations

-- | Run 'sync' for all agents
syncAll :: Member MultiAgentPABEffect effs => Eff effs ()
syncAll = traverse_ sync (Wallet <$> [1..10])
-- syncAll :: Member MultiAgentPABEffect effs => Eff effs ()
-- syncAll = traverse_ sync (Wallet <$> [1..10])

-- | Statistics about the transactions that have been validated by the emulated node.
data TxCounts =
Expand Down
36 changes: 18 additions & 18 deletions plutus-pab/test/Plutus/PAB/CoreSpec.hs
Expand Up @@ -37,8 +37,8 @@ import Plutus.PAB.Effects.MultiAgent (PABClientEff
import Plutus.PAB.Events (ChainEvent, ContractInstanceId,
ContractInstanceState (..), hooks)
import Plutus.PAB.MockApp (TestState, TxCounts (..), blockchainNewestFirst,
defaultWallet, runScenario, syncAll, txCounts,
txValidated, valueAt)
defaultWallet, runScenario, txCounts, txValidated,
valueAt)
import qualified Plutus.PAB.Query as Query
import Plutus.PAB.Types (PABError (..), chainOverviewBlockchain,
mkChainOverview)
Expand Down Expand Up @@ -107,16 +107,16 @@ currencyTest =
agentAction defaultWallet (installContract Currency)
contractState <- agentAction defaultWallet (activateContract Currency)
let instanceId = csContract contractState
syncAll
-- syncAll
assertTxCounts
"Activating the currency contract does not generate transactions."
initialTxCounts
agentAction defaultWallet $ createCurrency instanceId mps
syncAll
-- syncAll
void Chain.processBlock
syncAll
-- syncAll
void Chain.processBlock
syncAll
-- syncAll
assertTxCounts
"Forging the currency should produce two valid transactions."
(initialTxCounts & txValidated +~ 2)
Expand All @@ -129,13 +129,13 @@ rpcTest =
agentAction defaultWallet (installContract RPCServer)
ContractInstanceState{csContract=clientId} <- agentAction defaultWallet (activateContract RPCClient)
ContractInstanceState{csContract=serverId} <- agentAction defaultWallet (activateContract RPCServer)
syncAll
-- syncAll
agentAction defaultWallet $ void $ callContractEndpoint @TestContracts serverId "serve" ()
syncAll
-- syncAll
agentAction defaultWallet $ callAdder clientId serverId
syncAll
syncAll
syncAll
-- syncAll
-- syncAll
-- syncAll
agentAction defaultWallet $ do
assertDone clientId
assertDone serverId
Expand All @@ -157,7 +157,7 @@ guessingGameTest =
-- need to add contract address to wallet's watched addresses
contractState <- agentAction defaultWallet (activateContract Game)
let instanceId = csContract contractState
syncAll
-- syncAll
assertTxCounts
"Activating the game does not generate transactions."
initialTxCounts
Expand All @@ -167,9 +167,9 @@ guessingGameTest =
{ Contracts.Game.amount = lovelaceValueOf lockAmount
, Contracts.Game.secretWord = "password"
}
syncAll
-- syncAll
void Chain.processBlock
syncAll
-- syncAll
assertTxCounts
"Locking the game should produce one transaction"
(initialTxCounts & txValidated +~ 1)
Expand All @@ -180,23 +180,23 @@ guessingGameTest =
balance1

game1State <- agentAction defaultWallet (activateContract Game)
syncAll
-- syncAll
agentAction defaultWallet $ guess
(csContract game1State)
Contracts.Game.GuessParams
{Contracts.Game.guessWord = "wrong"}
syncAll
-- syncAll
void Chain.processBlock
assertTxCounts
"A wrong guess still produces a transaction."
(initialTxCounts & txValidated +~ 2)
game2State <- agentAction defaultWallet (activateContract Game)
syncAll
-- syncAll
agentAction defaultWallet $ guess
(csContract game2State)
Contracts.Game.GuessParams
{Contracts.Game.guessWord = "password"}
syncAll
-- syncAll
void Chain.processBlock
assertTxCounts
"A correct guess creates a third transaction."
Expand Down

0 comments on commit ee1d6e1

Please sign in to comment.