Skip to content

Commit

Permalink
Huge merge of refactoring of everything.
Browse files Browse the repository at this point in the history
  • Loading branch information
raduom committed Feb 22, 2021
1 parent f503770 commit c80bba2
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 352 deletions.
2 changes: 1 addition & 1 deletion plutus-pab/app/Cli.hs
Expand Up @@ -168,7 +168,7 @@ runCliCommand t _ Config {nodeServerConfig, chainIndexConfig} serviceAvailabilit
liftIO $ ChainIndex.main
(toChainIndexLog t)
chainIndexConfig
(NodeServer.mscBaseUrl nodeServerConfig)
(NodeServer.mscSocketPath nodeServerConfig)
serviceAvailability


Expand Down
111 changes: 34 additions & 77 deletions plutus-pab/src/Cardano/ChainIndex/Server.hs
Expand Up @@ -13,43 +13,35 @@ module Cardano.ChainIndex.Server(
, syncState
) where

-- <<<<<<< HEAD
import Cardano.BM.Data.Trace (Trace)
import Cardano.Node.Types (FollowerID)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (MVar, newMVar, putMVar, takeMVar)
import Control.Monad (forever, unless, void)
import Control.Monad.Freer hiding (run)
import Cardano.BM.Data.Trace (Trace)
import Control.Concurrent.MVar (MVar, newMVar, putMVar, takeMVar)
import Control.Monad.Freer hiding (run)
import Control.Monad.Freer.Extras.Log
import Control.Monad.Freer.Extras.State (assign, use)
import Control.Monad.Freer.State
import qualified Control.Monad.Freer.State as Eff
import Control.Monad.IO.Class (MonadIO (..))
import Data.Foldable (fold, traverse_)
import Data.Function ((&))
import Data.Proxy (Proxy (Proxy))
import Data.Time.Units (Second, toMicroseconds)
import Ledger.Blockchain (Block)
import Network.HTTP.Client (defaultManagerSettings, newManager)
import qualified Network.Wai.Handler.Warp as Warp
import Servant (Application, NoContent (NoContent), hoistServer, serve,
(:<|>) ((:<|>)))
import Servant.Client (BaseUrl (baseUrlPort), ClientEnv, mkClientEnv, runClientM)
import qualified Control.Monad.Freer.State as Eff
import Control.Monad.IO.Class (MonadIO (..))
import Data.Foldable (traverse_)
import Data.Function ((&))
import Data.Proxy (Proxy (Proxy))
import Ledger.Blockchain (Block)
import qualified Network.Wai.Handler.Warp as Warp
import Servant (Application, NoContent (NoContent), hoistServer, serve,
(:<|>) ((:<|>)))
import Servant.Client (BaseUrl (baseUrlPort))

import Cardano.ChainIndex.API
import Cardano.ChainIndex.Types
import qualified Cardano.Node.Client as NodeClient
import Cardano.Node.Follower (NodeFollowerEffect, getSlot)
import qualified Cardano.Node.Follower as NodeFollower
import Control.Concurrent.Availability (Availability, available)
import Ledger.Address (Address)
import Ledger.AddressMap (AddressMap)
import Plutus.PAB.Monitoring (convertLog, handleLogMsgTrace, runLogEffects)
import Wallet.Effects (ChainIndexEffect)
import qualified Wallet.Effects as WalletEffects
import Wallet.Emulator.ChainIndex (ChainIndexControlEffect, ChainIndexEvent, ChainIndexState)
import qualified Wallet.Emulator.ChainIndex as ChainIndex
import Wallet.Emulator.NodeClient (ChainClientNotification (BlockValidated, SlotChanged))
import Cardano.Protocol.Socket.Client (runClientNode)
import Control.Concurrent.Availability (Availability, available)
import Ledger.Address (Address)
import Ledger.AddressMap (AddressMap)
import Ledger.Slot (Slot (..))
import Plutus.PAB.Monitoring (convertLog, handleLogMsgTrace, runLogEffects)
import Wallet.Effects (ChainIndexEffect)
import qualified Wallet.Effects as WalletEffects
import Wallet.Emulator.ChainIndex (ChainIndexControlEffect, ChainIndexEvent, ChainIndexState)
import qualified Wallet.Emulator.ChainIndex as ChainIndex
import Wallet.Emulator.NodeClient (ChainClientNotification (BlockValidated, SlotChanged))

import qualified Debug.Trace as Dbg

Expand All @@ -67,42 +59,22 @@ app trace stateVar =
(liftIO . processIndexEffects trace stateVar)
(healthcheck :<|> startWatching :<|> watchedAddresses :<|> confirmedBlocks :<|> WalletEffects.transactionConfirmed :<|> WalletEffects.nextTx)

-- <<<<<<< HEAD
main :: ChainIndexTrace -> ChainIndexConfig -> BaseUrl -> Availability -> IO ()
main trace ChainIndexConfig{ciBaseUrl} nodeBaseUrl availability = runLogEffects trace $ do
nodeClientEnv <- liftIO getNode
main :: ChainIndexTrace -> ChainIndexConfig -> FilePath -> Availability -> IO ()
main trace ChainIndexConfig{ciBaseUrl} socketPath availability = runLogEffects trace $ do
mVarState <- liftIO $ newMVar initialAppState

logInfo StartingNodeClientThread
void $ liftIO $ forkIO $ runLogEffects trace $ updateThread trace 10 mVarState nodeClientEnv
_ <- liftIO $ runClientNode socketPath $ updateChainState mVarState

logInfo $ StartingChainIndex servicePort
liftIO $ Warp.runSettings warpSettings $ app trace mVarState
where
isAvailable = available availability
servicePort = baseUrlPort ciBaseUrl
warpSettings = Warp.defaultSettings & Warp.setPort servicePort & Warp.setBeforeMainLoop isAvailable
getNode = newManager defaultManagerSettings >>= \manager -> pure $ mkClientEnv manager nodeBaseUrl

-- =======
-- main :: Trace IO ChainIndexServerMsg -> ChainIndexConfig -> FilePath -> Availability -> IO ()
-- main trace ChainIndexConfig{ciBaseUrl} socketPath availability = runLogEffects trace $ do
-- let port = baseUrlPort ciBaseUrl
-- mVarState <- liftIO $ newMVar initialAppState
-- logInfo StartingNodeClientThread
-- clientHandler <-
-- liftIO $ runClientNode socketPath $ updateChainState mVarState
-- -- void $ liftIO $ forkIO $ runLogEffects trace $ updateThread 10 mVarState clientHandler
-- let warpSettings :: Warp.Settings
-- warpSettings = Warp.defaultSettings & Warp.setPort port & Warp.setBeforeMainLoop (available availability)
-- logInfo $ StartingChainIndex port
-- liftIO $ Warp.runSettings warpSettings $ app mVarState
-- where
-- updateChainState :: MVar AppState -> Block -> Slot -> IO ()
-- updateChainState mv block slot =
-- processIndexEffects mv $ do
-- syncState block slot
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)
updateChainState :: MVar AppState -> Block -> Slot -> IO ()
updateChainState mv block slot =
processIndexEffects trace mv $ do syncState block slot

healthcheck :: Monad m => m NoContent
healthcheck = pure NoContent
Expand All @@ -119,15 +91,13 @@ confirmedBlocks = WalletEffects.confirmedBlocks
-- | Update the chain index by asking the node for new blocks since the last
-- time.
syncState ::
( -- Member (LogMsg ChainIndexServerMsg) effs
Member ChainIndexControlEffect effs
)
( Member ChainIndexControlEffect effs)
=> Block
-> Slot
-> Eff effs ()
syncState block (Slot slot) = do
let slotChanged = SlotChanged (Slot slot)
traverse_ ChainIndex.chainIndexNotify [BlockValidated block, slotChanged]
Dbg.trace "[ zzz ] Synchronising state" $ traverse_ ChainIndex.chainIndexNotify [BlockValidated block, slotChanged]

type ChainIndexEffects m
= '[ ChainIndexControlEffect
Expand All @@ -143,28 +113,15 @@ processIndexEffects ::
-> MVar AppState
-> Eff (ChainIndexEffects IO) a
-> m a
-- <<<<<<< HEAD
processIndexEffects trace stateVar eff = do
AppState {_indexState, _indexEvents, _indexFollowerID} <- liftIO $ takeMVar stateVar
AppState {_indexState, _indexEvents} <- liftIO $ takeMVar stateVar
(result, newState) <- liftIO
$ ChainIndex.handleChainIndexControl eff
& ChainIndex.handleChainIndex
& Eff.runState _indexState
& handleLogMsgTrace (toChainIndexServerMsg trace)
& runM
liftIO $ putMVar stateVar AppState{_indexState=newState, _indexEvents=_indexEvents, _indexFollowerID=_indexFollowerID }
-- =======
-- processIndexEffects stateVar eff = do
-- AppState{_indexState, _indexEvents} <- liftIO $ takeMVar stateVar
-- (result, newState) <- liftIO
-- $ runM
-- $ runStderrLog
-- $ interpret renderLogMessages
-- $ Eff.runState _indexState
-- $ ChainIndex.handleChainIndex
-- $ ChainIndex.handleChainIndexControl eff
-- liftIO $ putMVar stateVar AppState{ _indexState=newState, _indexEvents=_indexEvents }
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)
liftIO $ putMVar stateVar AppState{_indexState=newState, _indexEvents=_indexEvents}
pure result
where
toChainIndexServerMsg :: Trace m ChainIndexServerMsg -> Trace m ChainIndexEvent
Expand Down
16 changes: 2 additions & 14 deletions plutus-pab/src/Cardano/Node/API.hs
Expand Up @@ -6,17 +6,10 @@ module Cardano.Node.API
, NodeAPI
) where

-- <<<<<<< HEAD
import Cardano.Node.Types (MockServerLogMsg)
import Control.Monad.Freer.Extras.Log (LogMessage)
import Ledger (Block, Slot, Tx)
import Servant.API (Capture, Get, JSON, NoContent, Post, Put, ReqBody, (:<|>), (:>))
-- =======
-- import Control.Monad.Freer.Log (LogMessage)
-- import Ledger (Slot, Tx)
-- import Servant.API (Get, JSON, NoContent, Post, ReqBody, (:<|>), (:>))
-- import Wallet.Emulator.Chain (ChainEvent)
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)
import Ledger (Slot, Tx)
import Servant.API (Get, JSON, NoContent, Post, ReqBody, (:<|>), (:>))

type API
= "healthcheck" :> Get '[ JSON] NoContent
Expand All @@ -27,9 +20,4 @@ type API
-- Routes that are not guaranteed to exist on the real node
type NodeAPI
= "random-tx" :> Get '[ JSON] Tx
-- <<<<<<< HEAD
:<|> "consume-event-history" :> Post '[ JSON] [LogMessage MockServerLogMsg]

-- =======
-- :<|> "consume-event-history" :> Post '[ JSON] [LogMessage ChainEvent]
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)
25 changes: 2 additions & 23 deletions plutus-pab/src/Cardano/Node/Client.hs
Expand Up @@ -8,47 +8,26 @@

module Cardano.Node.Client where

-- <<<<<<< HEAD
import Cardano.Node.API (API)
import Cardano.Node.Follower (NodeFollowerEffect (..))
import Cardano.Node.RandomTx (GenRandomTx (..))
import Cardano.Node.Types (FollowerID, MockServerLogMsg)
import Cardano.Node.Types (MockServerLogMsg)
import Control.Monad (void)
-- =======
-- import Cardano.Node.API (API)
-- import Cardano.Node.RandomTx (GenRandomTx (..))
-- import Control.Monad (void)
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)
import Control.Monad.Freer
import Control.Monad.Freer.Error
import Control.Monad.Freer.Extras.Log (LogMessage)
import Control.Monad.IO.Class
-- <<<<<<< HEAD
import Data.Proxy (Proxy (Proxy))
import Ledger (Block, Slot, Tx)
import Ledger (Slot, Tx)
import Servant (NoContent, (:<|>) (..))
import Servant.Client (ClientEnv, ClientError, ClientM, client, runClientM)
import Wallet.Effects (NodeClientEffect (..))
-- =======
-- import Data.Proxy (Proxy (Proxy))
-- import Ledger (Slot, Tx)
-- import Servant (NoContent, (:<|>) (..))
-- import Servant.Client (ClientEnv, ClientError, ClientM, client, runClientM)
-- import Wallet.Effects (NodeClientEffect (..))
-- import Wallet.Emulator.Chain (ChainEvent)
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)

healthcheck :: ClientM NoContent
getCurrentSlot :: ClientM Slot
addTx :: Tx -> ClientM NoContent
randomTx :: ClientM Tx
-- <<<<<<< HEAD
consumeEventHistory :: ClientM [LogMessage MockServerLogMsg]
(healthcheck, addTx, getCurrentSlot, randomTx, consumeEventHistory) =
-- =======
-- consumeEventHistory :: ClientM [LogMessage ChainEvent]
-- (healthcheck, addTx, getCurrentSlot, randomTx, consumeEventHistory) =
-- >>>>>>> d3fabb83b (Add slot information for fetched blocks)
( healthcheck_
, addTx_
, getCurrentSlot_
Expand Down
57 changes: 0 additions & 57 deletions plutus-pab/src/Cardano/Node/Follower.hs

This file was deleted.

16 changes: 7 additions & 9 deletions plutus-pab/src/Cardano/Node/Mock.hs
Expand Up @@ -8,7 +8,6 @@

module Cardano.Node.Mock where

-- <<<<<<< HEAD
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (MVar, modifyMVar_, putMVar, takeMVar)
import Control.Lens (over, set, unto, view)
Expand All @@ -33,9 +32,6 @@ import Servant (NoContent (NoContent))
import Cardano.BM.Data.Trace (Trace)
import Cardano.Node.RandomTx
import Cardano.Node.Types
import Cardano.Protocol.ChainEffect as CE
import Cardano.Protocol.FollowerEffect as FE
import qualified Cardano.Protocol.Socket.Client as Client
import qualified Cardano.Protocol.Socket.Server as Server
import Ledger (Block, Slot (Slot), Tx)
import Ledger.Tx (outputs)
Expand All @@ -45,6 +41,8 @@ import qualified Wallet.Emulator as EM
import Wallet.Emulator.Chain (ChainControlEffect, ChainEffect, ChainState)
import qualified Wallet.Emulator.Chain as Chain

import qualified Debug.Trace as Dbg

healthcheck :: Monad m => m NoContent
healthcheck = pure NoContent

Expand Down Expand Up @@ -108,7 +106,7 @@ runChainEffects ::
-> Eff (NodeServerEffects IO) a
-> IO ([LogMessage MockServerLogMsg], a)
runChainEffects trace serverHandler stateVar eff = do
oldAppState <- liftIO $ takeMVar stateVar
oldAppState <- Dbg.trace "[ xxx ] Running chain effects" (liftIO $ takeMVar stateVar)
((a, events), newState) <- liftIO
$ processBlock eff
& runRandomTx
Expand All @@ -126,7 +124,7 @@ runChainEffects trace serverHandler stateVar eff = do

runRandomTx = subsume . runGenRandomTx

runChain = interpret (mapLog ProcessingChainEvent) . reinterpret CE.handleChain . interpret (mapLog ProcessingChainEvent) . reinterpret Chain.handleControlChain
runChain = interpret (mapLog ProcessingChainEvent) . reinterpret Chain.handleChain . interpret (mapLog ProcessingChainEvent) . reinterpret Chain.handleControlChain

mergeState = interpret (handleZoomedState chainState)

Expand All @@ -136,9 +134,9 @@ runChainEffects trace serverHandler stateVar eff = do

newlyAddedBlocks :: AppState -> AppState -> [Block]
newlyAddedBlocks oldState newState =
let chainLens = T.chainState . Chain.chainNewestFirst
oldChain = view chainLens oldState
newChain = view chainLens newState
let chainLens = chainState . Chain.chainNewestFirst
oldChain = view chainLens oldState
newChain = view chainLens newState
in take (length newChain - length oldChain) newChain

processChainEffects ::
Expand Down
6 changes: 2 additions & 4 deletions plutus-pab/src/Cardano/Node/Types.hs
Expand Up @@ -26,12 +26,10 @@ module Cardano.Node.Types
, _NodeFollowerState
, initialAppState
, initialChainState
, initialFollowerState

-- * Lens functions
, chainState
, eventHistory
, followerState

-- * Config types
, MockServerConfig (..)
Expand Down Expand Up @@ -183,8 +181,8 @@ makePrisms 'NodeFollowerState
-- | Application State
data AppState =
AppState
{ _chainState :: ChainState
, _eventHistory :: [LogMessage ChainEvent]
{ _chainState :: ChainState -- ^ blockchain state
, _eventHistory :: [LogMessage MockServerLogMsg] -- ^ history of all log messages
}
deriving (Show)

Expand Down

0 comments on commit c80bba2

Please sign in to comment.