Skip to content

Commit

Permalink
CAD-2905: wip NodeId.
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Shevchenko committed May 12, 2021
1 parent 830d746 commit 2e4e1f9
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 36 deletions.
1 change: 1 addition & 0 deletions cardano-logger/cardano-logger.cabal
Expand Up @@ -53,6 +53,7 @@ library
, contra-tracer
, ekg-core
, ekg-forward
, hashable
, iohk-monitoring
, network
, optparse-applicative
Expand Down
14 changes: 7 additions & 7 deletions cardano-logger/src/Cardano/Logger/Acceptors.hs
Expand Up @@ -63,7 +63,7 @@ import System.Metrics.Network.Acceptor (acceptEKGMetrics)

import Cardano.Logger.Configuration
import Cardano.Logger.Types (AcceptedItems, LogObjects, Metrics,
prepareAcceptedItems)
addressToNodeId, prepareAcceptedItems)

runAcceptors
:: LoggerConfig
Expand Down Expand Up @@ -205,7 +205,7 @@ runEKGAcceptor
-> ConnectionId addr
-> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void ()
runEKGAcceptor ekgConfig acceptedItems connId = do
let (_, (ekgStore, localStore)) =
let (_, _, (ekgStore, localStore)) =
unsafePerformIO $ prepareStores acceptedItems connId
acceptEKGMetrics ekgConfig ekgStore localStore

Expand All @@ -216,18 +216,18 @@ runLogObjectsAcceptor
-> ConnectionId addr
-> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void ()
runLogObjectsAcceptor tfConfig acceptedItems connId = do
let (loQueue, _) =
let (niStore, loQueue, _) =
unsafePerformIO $ prepareStores acceptedItems connId
acceptLogObjects tfConfig loQueue
acceptLogObjects tfConfig loQueue niStore

prepareStores
:: Show addr
=> AcceptedItems
-> ConnectionId addr
-> IO (LogObjects, Metrics)
-> IO (TF.NodeInfoStore, LogObjects, Metrics)
prepareStores acceptedItems ConnectionId{..} = do
-- Remote address of the node is unique (from logger's point of view), use it as 'NodeId'.
let nodeId = show remoteAddress
-- Remote address of the node is unique identifier, from the logger's point of view.
let nodeId = addressToNodeId $ show remoteAddress
prepareAcceptedItems nodeId acceptedItems
items <- readIORef acceptedItems
return $ items ! nodeId
Expand Down
2 changes: 1 addition & 1 deletion cardano-logger/src/Cardano/Logger/Handlers/Logs/Run.hs
Expand Up @@ -22,7 +22,7 @@ runLogsHandler
runLogsHandler config acceptedItems = forever $ do
threadDelay 2000000
items <- HM.toList <$> readIORef acceptedItems
forM_ items $ \(nodeId, (loQueue, _)) ->
forM_ items $ \(nodeId, (_niStore, loQueue, _)) ->
atomically (getAllLogObjects loQueue) >>= writeLogObjects config nodeId

getAllLogObjects :: TBQueue lo -> STM [lo]
Expand Down
36 changes: 30 additions & 6 deletions cardano-logger/src/Cardano/Logger/Types.hs
@@ -1,36 +1,59 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}

module Cardano.Logger.Types
( AcceptedItems
, LogObjects
, Metrics
, NodeId
, NodeId (..)
, addressToNodeId
, initAcceptedItems
, prepareAcceptedItems
) where

import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO)
import Control.Monad (unless)
import Data.Hashable (Hashable)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HM
import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef)
import Data.Text (Text)
import Data.Text (Text, pack, splitOn, unpack)
import Data.Word (Word16)
import GHC.Generics (Generic)
import qualified System.Metrics as EKG

import Cardano.BM.Data.LogItem (LogObject)

import Trace.Forward.Protocol.Type (NodeInfoStore)

import System.Metrics.Store.Acceptor (MetricsLocalStore, emptyMetricsLocalStore)

type NodeId = String
data NodeId = NodeId
{ nodeIP :: !String
, nodePort :: !Word16
} deriving (Eq, Generic, Hashable, Ord)

instance Show NodeId where
show (NodeId ip port) = ip <> "-" <> show port

addressToNodeId :: String -> NodeId
addressToNodeId remoteAddress =
-- We assume that 'remoteAddress' is a String-representation of the normal address (IP:port).
case splitOn ":" . pack $ remoteAddress of
[ip, port] -> NodeId (unpack ip) (read (unpack port) :: Word16)
_ -> NodeId remoteAddress 0 -- Unexpected format of 'remoteAddress'!

type LogObjects = TBQueue (LogObject Text)
type Metrics = (EKG.Store, IORef MetricsLocalStore)

type AcceptedItems = IORef (HashMap NodeId (LogObjects, Metrics))
type AcceptedItems = IORef (HashMap NodeId (NodeInfoStore, LogObjects, Metrics))

initAcceptedItems :: IO AcceptedItems
initAcceptedItems = newIORef HM.empty

prepareAcceptedItems
:: String
:: NodeId
-> AcceptedItems
-> IO ()
prepareAcceptedItems nodeId itemsIORef = do
Expand All @@ -39,9 +62,10 @@ prepareAcceptedItems nodeId itemsIORef = do
-- already worked with the logger and now it's re-connect to the logger.
-- No need to re-create its stores.
unless (nodeId `HM.member` items') $ do
niStore <- newIORef []
loQueue <- newTBQueueIO 2000
ekgStore <- EKG.newStore
localStore <- newIORef emptyMetricsLocalStore
let storesForNewNode = (loQueue, (ekgStore, localStore))
let storesForNewNode = (niStore, loQueue, (ekgStore, localStore))
atomicModifyIORef' itemsIORef $ \items ->
(HM.insert nodeId storesForNewNode items, ())
5 changes: 4 additions & 1 deletion trace-forward/demo/acceptor.hs
Expand Up @@ -42,10 +42,13 @@ main = do
-- Create an empty TBQueue where received 'LogObject's will be stored.
queue <- newTBQueueIO 100

-- Create an empty store where received node's info will be stored.
niStore <- newIORef []

-- Run the acceptor. It will listen to the forwarder, and after the connection
-- will be established, the acceptor will ask for N 'LogObject's from the forwarder.
-- After these 'LogObject's will be received, the acceptor will write them in the 'queue'.
runTraceAcceptor config queue
runTraceAcceptor config queue niStore

-- We need it for 'AcceptorConfiguration lo' (in this example it is 'LogObject Text').
instance ShowProxy (LogObject Text)
5 changes: 3 additions & 2 deletions trace-forward/demo/mux/Network/Acceptor.hs
Expand Up @@ -102,7 +102,8 @@ doListenToForwarder
doListenToForwarder snocket address timeLimits (ekgConfig, tfConfig) tidVar = do
store <- EKG.newStore
metricsStore <- newIORef emptyMetricsLocalStore
logObjectsQueue <- newTBQueueIO 1000000
loQueue <- newTBQueueIO 1000000
niStore <- newIORef []

networkState <- newNetworkMutableState
_ <- async $ cleanNetworkMutableState networkState
Expand All @@ -121,7 +122,7 @@ doListenToForwarder snocket address timeLimits (ekgConfig, tfConfig) tidVar = do
UnversionedProtocolData
(SomeResponderApplication $
acceptorApp [ (acceptEKGMetrics ekgConfig store metricsStore, 1)
, (acceptLogObjects tfConfig logObjectsQueue, 2)
, (acceptLogObjects tfConfig loQueue niStore, 2)
]
)
)
Expand Down
13 changes: 7 additions & 6 deletions trace-forward/src/Trace/Forward/Acceptor.hs
Expand Up @@ -16,6 +16,7 @@ import Ouroboros.Network.Util.ShowProxy (ShowProxy(..))

import Trace.Forward.Network.Acceptor (listenToForwarder)
import Trace.Forward.Configuration (AcceptorConfiguration (..))
import Trace.Forward.Protocol.Type (NodeInfoStore)

-- | Please note that acceptor is a server from the __networking__ point of view:
-- the forwarder establishes network connection with the acceptor. This is because
Expand All @@ -25,11 +26,11 @@ runTraceAcceptor
ShowProxy lo,
Typeable lo)
=> AcceptorConfiguration lo -- ^ Acceptor configuration.
-> TBQueue lo -- ^ The queue all received 'LogObject's will be write in.
-> TBQueue lo -- ^ The queue all received 'LogObject's will be written in.
-> NodeInfoStore -- ^ The store node's basic info will be written in.
-> IO ()
runTraceAcceptor config loQueue =
try (listenToForwarder config loQueue) >>= \case
Left (e :: SomeException) -> do
putStrLn $ "trace-forward, acceptor problem: " <> show e
runTraceAcceptor config loQueue
runTraceAcceptor config loQueue niStore =
try (listenToForwarder config loQueue niStore) >>= \case
Left (_e :: SomeException) ->
runTraceAcceptor config loQueue niStore
Right _ -> return ()
38 changes: 26 additions & 12 deletions trace-forward/src/Trace/Forward/Network/Acceptor.hs
Expand Up @@ -17,7 +17,7 @@ import Control.Monad.Class.MonadSTM.Strict (StrictTVar, atomically, mo
readTVar, retry)
import qualified Data.ByteString.Lazy as LBS
import Data.Functor ((<&>))
import Data.IORef (readIORef)
import Data.IORef (atomicModifyIORef', readIORef)
import qualified Data.Text as T
import Data.Typeable (Typeable)
import Data.Void (Void)
Expand All @@ -43,7 +43,8 @@ import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedPr
unversionedHandshakeCodec,
unversionedProtocolDataCodec)
import Ouroboros.Network.Protocol.Handshake.Type (Handshake)
import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, simpleSingletonVersions)
import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion,
simpleSingletonVersions)
import Ouroboros.Network.Util.ShowProxy (ShowProxy(..))

import qualified Trace.Forward.Protocol.Acceptor as Acceptor
Expand All @@ -59,8 +60,9 @@ listenToForwarder
Typeable lo)
=> AcceptorConfiguration lo
-> TBQueue lo
-> NodeInfoStore
-> IO ()
listenToForwarder config@AcceptorConfiguration {..} loQueue = withIOManager $ \iocp ->
listenToForwarder config@AcceptorConfiguration{..} loQueue niStore = withIOManager $ \iocp ->
case forwarderEndpoint of
LocalPipe localPipe -> do
let snocket = localSnocket iocp localPipe
Expand All @@ -76,7 +78,7 @@ listenToForwarder config@AcceptorConfiguration {..} loQueue = withIOManager $ \i
[ MiniProtocol
{ miniProtocolNum = MiniProtocolNum 1
, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound }
, miniProtocolRun = acceptLogObjects config loQueue
, miniProtocolRun = acceptLogObjects config loQueue niStore
}
]

Expand Down Expand Up @@ -115,8 +117,9 @@ acceptLogObjects
Typeable lo)
=> AcceptorConfiguration lo
-> TBQueue lo
-> NodeInfoStore
-> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void ()
acceptLogObjects config loQueue =
acceptLogObjects config loQueue niStore =
ResponderProtocolOnly $
MuxPeerRaw $ \channel -> do
sv <- newEmptyTMVarIO
Expand All @@ -130,7 +133,8 @@ acceptLogObjects config loQueue =
(byteLimitsTraceForward (fromIntegral . LBS.length))
timeLimitsTraceForward
channel
(Acceptor.traceAcceptorPeer $ acceptorActions config loQueue False)
(Acceptor.traceAcceptorPeer $
acceptorActions config loQueue niStore True False)
atomically $ putTMVar sv r
waitSibling siblingVar
return ((), trailing)
Expand All @@ -148,14 +152,24 @@ acceptorActions
Typeable lo)
=> AcceptorConfiguration lo
-> TBQueue lo
-> NodeInfoStore
-> Bool
-> Bool
-> Acceptor.TraceAcceptor lo IO ()
acceptorActions config@AcceptorConfiguration{..} loQueue False =
Acceptor.SendMsgRequest TokBlocking whatToRequest $ \reply -> do
writeLogObjectsToQueue reply loQueue
actionOnReply $ logObjectsFromReply reply
readIORef shouldWeStop <&> acceptorActions config loQueue
acceptorActions config@AcceptorConfiguration{..} loQueue niStore askForNI False =
-- We can send request for the node's basic info or for the new 'LogObject's.
-- But request for node's info should be sent only once (in the beginning of session).
if askForNI
then
Acceptor.SendMsgNodeInfoRequest $ \reply -> do
atomicModifyIORef' niStore $ const $ (reply, ())
readIORef shouldWeStop <&> acceptorActions config loQueue niStore False
else
Acceptor.SendMsgRequest TokBlocking whatToRequest $ \reply -> do
writeLogObjectsToQueue reply loQueue
actionOnReply $ logObjectsFromReply reply
readIORef shouldWeStop <&> acceptorActions config loQueue niStore False

acceptorActions AcceptorConfiguration{..} _ True =
acceptorActions AcceptorConfiguration{..} _ _ _ True =
Acceptor.SendMsgDone
actionOnDone
5 changes: 5 additions & 0 deletions trace-forward/src/Trace/Forward/Protocol/Type.hs
Expand Up @@ -23,9 +23,11 @@ module Trace.Forward.Protocol.Type
, Request (..)
, BlockingReplyList (..)
, NodeInfo
, NodeInfoStore
) where

import Codec.Serialise (Serialise (..))
import Data.IORef (IORef)
import Data.List.NonEmpty (NonEmpty)
import Data.Proxy (Proxy(..))
import Data.Text (Text)
Expand Down Expand Up @@ -129,6 +131,9 @@ deriving instance Show lo => Show (BlockingReplyList blocking lo)
-- protocol, version, start time, unique name, etc.
type NodeInfo = [(Text, Text)]

-- | The store for 'NodeInfo', will be used on the acceptor's side to store received node's info.
type NodeInfoStore = IORef NodeInfo

instance Protocol (TraceForward lo) where

-- | The messages in the Trace forwarding/accepting protocol.
Expand Down
3 changes: 2 additions & 1 deletion trace-forward/test/Trace/Forward/Test/Demo.hs
Expand Up @@ -63,6 +63,7 @@ propDemoIO' maxLen endpoint (NonEmpty logObjects') = do
let logObjects = take maxLen logObjects' -- We don't want too big list here.
forwarderQueue <- newTBQueueIO . fromIntegral . length $ logObjects
acceptorQueue <- newTBQueueIO . fromIntegral . length $ logObjects
acceptorNodeInfo <- newIORef []
weAreDone <- newIORef False

let acceptorConfig = mkAcceptorConfig endpoint weAreDone $ GetLogObjects 1
Expand All @@ -76,7 +77,7 @@ propDemoIO' maxLen endpoint (NonEmpty logObjects') = do
threadDelay 1000000

-- Run the acceptor.
void . forkIO $ runTraceAcceptor acceptorConfig acceptorQueue
void . forkIO $ runTraceAcceptor acceptorConfig acceptorQueue acceptorNodeInfo
waitTillAcceptorReceiveObjects
atomicModifyIORef' weAreDone $ const (True, ())

Expand Down

0 comments on commit 2e4e1f9

Please sign in to comment.