Skip to content

Commit

Permalink
Limit the buffer of the node server to K blocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
raduom committed Apr 8, 2021
1 parent 215b9a9 commit b23b7d5
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 21 deletions.
5 changes: 3 additions & 2 deletions plutus-pab/src/Cardano/ChainIndex/Server.hs
@@ -1,3 +1,4 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MonoLocalBinds #-}
Expand Down Expand Up @@ -54,7 +55,7 @@ main trace ChainIndexConfig{ciBaseUrl} socketPath availability = runLogEffects t
mVarState <- liftIO $ newMVar initialAppState

logInfo StartingNodeClientThread
_ <- liftIO $ runClientNode socketPath $ updateChainState mVarState
_ <- liftIO $ runClientNode socketPath (updateChainState mVarState)

logInfo $ StartingChainIndex servicePort
liftIO $ Warp.runSettings warpSettings $ app trace mVarState
Expand All @@ -64,4 +65,4 @@ main trace ChainIndexConfig{ciBaseUrl} socketPath availability = runLogEffects t
warpSettings = Warp.defaultSettings & Warp.setPort servicePort & Warp.setBeforeMainLoop isAvailable
updateChainState :: MVar AppState -> Block -> Slot -> IO ()
updateChainState mv block slot =
processIndexEffects trace mv $ syncState block slot
processIndexEffects trace mv $ syncState block slot
5 changes: 3 additions & 2 deletions plutus-pab/src/Cardano/Node/Server.hs
Expand Up @@ -58,17 +58,18 @@ main :: Trace IO MockServerLogMsg -> MockServerConfig -> Availability -> IO ()
main trace MockServerConfig { mscBaseUrl
, mscRandomTxInterval
, mscBlockReaper
, mscKeptBlocks
, mscSlotLength
, mscInitialTxWallets
, mscSocketPath} availability = LM.runLogEffects trace $ do
, mscSocketPath } availability = LM.runLogEffects trace $ do

-- make initial distribution of 1 billion Ada to all configured wallets
let dist = Map.fromList $ zip mscInitialTxWallets (repeat (Ada.adaValueOf 1000_000_000))
let appState = AppState
{ _chainState = initialChainState dist
, _eventHistory = mempty
}
serverHandler <- liftIO $ Server.runServerNode mscSocketPath (_chainState appState)
serverHandler <- liftIO $ Server.runServerNode mscSocketPath mscKeptBlocks (_chainState appState)
serverState <- liftIO $ newMVar appState
handleDelayEffect $ delayThread (2 :: Second)
clientHandler <- liftIO $ Client.runClientNode mscSocketPath (updateChainState serverState)
Expand Down
2 changes: 2 additions & 0 deletions plutus-pab/src/Cardano/Node/Types.hs
Expand Up @@ -88,6 +88,8 @@ data MockServerConfig =
-- ^ The wallets that receive money from the initial transaction.
, mscSocketPath :: FilePath
-- ^ Path to the socket used to communicate with the server.
, mscKeptBlocks :: Integer
-- ^ The number of blocks to keep for replaying to a newly connected clients
}
deriving (Show, Eq, Generic, FromJSON)

Expand Down
30 changes: 16 additions & 14 deletions plutus-pab/src/Cardano/Protocol/Socket/Client.hs
Expand Up @@ -14,6 +14,7 @@ import Data.Void (Void)

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Catch (catchAll)
import Control.Tracer

import Ouroboros.Network.Block (Point (..))
Expand Down Expand Up @@ -54,9 +55,6 @@ getCurrentSlot ::
getCurrentSlot ClientHandler { chCurrentSlot } =
readMVar chCurrentSlot

{-| Forks and starts a new client node, returning the newly allocated thread id.
The client will retry connecting if the the protocol connection drops, or
cannot be established. -}
runClientNode :: FilePath
-> (Block -> Slot -> IO ())
-> IO ClientHandler
Expand All @@ -76,17 +74,21 @@ runClientNode socketPath onNewBlock = do
where
loop :: TimeUnit a => a -> ClientHandler -> IOManager -> IO ()
loop timeout ch@ClientHandler{chSocketPath, chInputQueue, chCurrentSlot, chHandler} iocp = do
connectToNode
(localSnocket iocp socketPath)
unversionedHandshakeCodec
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
(unversionedProtocol (app chCurrentSlot chHandler chInputQueue))
Nothing
(localAddressFromPath chSocketPath)
threadDelay (fromIntegral $ toMicroseconds timeout)
loop timeout ch iocp
catchAll
(connectToNode
(localSnocket iocp socketPath)
unversionedHandshakeCodec
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
(unversionedProtocol (app chCurrentSlot chHandler chInputQueue))
Nothing
(localAddressFromPath chSocketPath))
{- If we receive any error or disconnect, try to reconnect.
This happens a lot on startup, until the server starts. -}
(\_ -> do
threadDelay (fromIntegral $ toMicroseconds timeout)
loop timeout ch iocp)

{- Application that communicates using 2 multiplexed protocols
(ChainSync and TxSubmission). -}
Expand Down
23 changes: 20 additions & 3 deletions plutus-pab/src/Cardano/Protocol/Socket/Server.hs
Expand Up @@ -88,14 +88,14 @@ data ServerCommand =
{- | The response from the server. Can be used for the information
passed back, or for synchronisation.
-}
data ServerResponse =
newtype ServerResponse =
-- A block was added. We are using this for synchronization.
BlockAdded Block
deriving Show

processBlock :: MonadIO m => ServerHandler -> m Block
processBlock ServerHandler {shCommandChannel} = do
liftIO $ atomically $ writeTQueue (ccCommand shCommandChannel) $ ProcessBlock
liftIO $ atomically $ writeTQueue (ccCommand shCommandChannel) ProcessBlock
-- Wait for the server to finish processing blocks.
liftIO $ atomically $ readTQueue (ccResponse shCommandChannel) >>= \case
BlockAdded block -> pure block
Expand All @@ -108,6 +108,21 @@ trimTo :: MonadIO m => ServerHandler -> Int -> m ()
trimTo ServerHandler {shCommandChannel} size =
liftIO $ atomically $ writeTQueue (ccCommand shCommandChannel) $ TrimTo size

pruneChain :: MonadIO m => Integer -> TChan Block -> m ThreadId
pruneChain k original = do
localChannel <- liftIO $ atomically $ cloneTChan original
liftIO . forkIO $ go k localChannel
where
go :: MonadIO m => Integer -> TChan Block -> m ()
go k' localChannel = do
-- Wait for data on the channel
block <- liftIO $ atomically $ readTChan localChannel
liftIO $ putStrLn $ "Read new block with size: " <> show (length block) <> "."
-- void $ Dbg.trace "[xxx] pruneChain" $ liftIO $ atomically $ readTChan localChannel
if k' == 0
then liftIO $ atomically (readTChan original) >> go 0 localChannel
else go (k' - 1) localChannel

handleCommand ::
MonadIO m
=> CommandChannel
Expand Down Expand Up @@ -137,13 +152,15 @@ handleCommand CommandChannel {ccCommand, ccResponse}
runServerNode ::
MonadIO m
=> FilePath
-> Integer
-> ChainState
-> m ServerHandler
runServerNode shSocketPath initialState = liftIO $ do
runServerNode shSocketPath k initialState = liftIO $ do
serverState <- initialiseInternalState initialState
shCommandChannel <- CommandChannel <$> newTQueueIO <*> newTQueueIO
void $ forkIO . void $ protocolLoop shSocketPath serverState
void $ forkIO . forever $ handleCommand shCommandChannel serverState
void $ pruneChain k (isBlocks serverState)
pure $ ServerHandler { shSocketPath, shCommandChannel }

-- * Internal state
Expand Down
1 change: 1 addition & 0 deletions plutus-pab/tx-inject/config.yaml
Expand Up @@ -15,6 +15,7 @@ nodeServerConfig:
mscBaseUrl: http://localhost:9082
mscSocketPath: ./node-server.sock
mscSlotLength: 5
mscKeptBlocks: 100000
mscBlockReaper:
brcInterval: 6000000
brcBlocksToKeep: 100000
Expand Down

0 comments on commit b23b7d5

Please sign in to comment.