Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
v0d1ch committed Jun 5, 2023
1 parent dc7fff5 commit dad3bbb
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 309 deletions.
17 changes: 10 additions & 7 deletions hydra-node/exe/hydra-node/Main.hs
@@ -1,9 +1,12 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE TypeApplications #-}

module Main where

import Hydra.Prelude

import Data.Default (def)
import Data.Reflection (reify)
import Hydra.API.Server (withAPIServer)
import Hydra.Cardano.Api (serialiseToRawBytesHex)
import Hydra.Chain (HeadParameters (..))
Expand Down Expand Up @@ -103,13 +106,13 @@ main = do
let RunOptions{host, port, peers, nodeId} = opts
withNetwork (contramap Network tracer) host port peers nodeId (putEvent . NetworkEvent defaultTTL) $ \hn -> do
let RunOptions{apiHost, apiPort} = opts

apiPersistence <- createPersistenceIncremental $ persistenceDir <> "/server-output"
withAPIServer apiHost apiPort party apiPersistence (contramap APIServer tracer) (putEvent . ClientEvent) $ \server -> do
let RunOptions{ledgerConfig} = opts
withCardanoLedger ledgerConfig chainConfig $ \ledger ->
runHydraNode (contramap Node tracer) $
HydraNode{eq, hn, nodeState, oc = chain, server, ledger, env, persistence}
reify def $ \(Proxy :: Proxy r) -> do
apiPersistence <- createPersistenceIncremental $ persistenceDir <> "/server-output"
withAPIServer @r apiHost apiPort party apiPersistence (contramap APIServer tracer) (putEvent . ClientEvent) $ \server -> do
let RunOptions{ledgerConfig} = opts
withCardanoLedger ledgerConfig chainConfig $ \ledger ->
runHydraNode (contramap Node tracer) $
HydraNode{eq, hn, nodeState, oc = chain, server, ledger, env, persistence}

publish opts = do
(_, sk) <- readKeyPair (publishSigningKey opts)
Expand Down
6 changes: 4 additions & 2 deletions hydra-node/hydra-node.cabal
Expand Up @@ -152,8 +152,6 @@ library
, io-classes >=0.3.0.0
, iohk-monitoring
, iproute
, lens
, lens-aeson
, modern-uri
, network
, network-mux
Expand Down Expand Up @@ -192,9 +190,11 @@ executable hydra-node
main-is: Main.hs
build-depends:
, base
, data-default
, hydra-cardano-api
, hydra-node
, hydra-prelude
, reflection

ghc-options: -threaded -rtsopts

Expand Down Expand Up @@ -328,6 +328,7 @@ test-suite tests
, cborg
, containers
, contra-tracer
, data-default
, directory
, filepath
, hspec
Expand All @@ -352,6 +353,7 @@ test-suite tests
, quickcheck-dynamic >=3.0.3
, quickcheck-instances
, req
, reflection
, silently
, text
, time
Expand Down
46 changes: 24 additions & 22 deletions hydra-node/src/Hydra/API/Server.hs
Expand Up @@ -13,6 +13,7 @@ import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan)
import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO, readTVar)
import Control.Exception (IOException)
import qualified Data.Aeson as Aeson
import Data.Reflection (Reifies, reify)
import Hydra.API.ClientInput (ClientInput)
import Hydra.API.Projection (Projection (..), mkProjection)
import Hydra.API.ServerOutput (
Expand All @@ -22,6 +23,7 @@ import Hydra.API.ServerOutput (
ServerOutputConfig (..),
TimedServerOutput (..),
WithUTxO (..),
WrappedServerOutput (WrappedServerOutput, unWrapped),
headStatus,
me,
projectHeadStatus,
Expand Down Expand Up @@ -93,21 +95,21 @@ type ServerCallback tx m = ClientInput tx -> m ()
type ServerComponent tx m a = ServerCallback tx m -> (Server tx m -> m a) -> m a

withAPIServer ::
forall tx.
(IsChainState tx) =>
forall r tx.
(IsChainState tx, Reifies r ServerOutputConfig) =>
IP ->
PortNumber ->
Party ->
PersistenceIncremental (TimedServerOutput tx) IO ->
PersistenceIncremental r (TimedServerOutput r tx) IO ->
Tracer IO APIServerLog ->
ServerComponent tx IO ()
withAPIServer host port party PersistenceIncremental{loadAll, append} tracer callback action = do
responseChannel <- newBroadcastTChanIO
timedOutputEvents <- reverse <$> loadAll

-- Intialize our read model from stored events
headStatusP <- mkProjection Idle (output <$> timedOutputEvents) projectHeadStatus
snapshotUtxoP <- mkProjection Nothing (output <$> timedOutputEvents) projectSnapshotUtxo
headStatusP <- mkProjection Idle (unWrapped . output <$> timedOutputEvents) projectHeadStatus
snapshotUtxoP <- mkProjection Nothing (unWrapped . output <$> timedOutputEvents) projectSnapshotUtxo

-- NOTE: we need to reverse the list because we store history in a reversed
-- list in memory but in order on disk
Expand All @@ -120,7 +122,7 @@ withAPIServer host port party PersistenceIncremental{loadAll, append} tracer cal
action $
Server
{ sendOutput = \output -> do
timedOutput <- appendToHistory history output
timedOutput <- appendToHistory history (WrappedServerOutput output)
atomically $ do
update headStatusP output
update snapshotUtxoP output
Expand All @@ -138,7 +140,7 @@ withAPIServer host port party PersistenceIncremental{loadAll, append} tracer cal
append timedOutput
pure timedOutput

nextSequenceNumber :: TVar [TimedServerOutput tx] -> STM.STM Natural
nextSequenceNumber :: TVar [TimedServerOutput r tx] -> STM.STM Natural
nextSequenceNumber historyList =
STM.readTVar historyList >>= \case
[] -> pure 0
Expand All @@ -156,19 +158,19 @@ setupServerNotification = do
pure (putMVar mv (), takeMVar mv)

runAPIServer ::
forall tx.
(IsChainState tx) =>
forall r tx.
(IsChainState tx, Reifies r ServerOutputConfig) =>
IP ->
PortNumber ->
Party ->
Tracer IO APIServerLog ->
TVar [TimedServerOutput tx] ->
TVar [TimedServerOutput r tx] ->
(ClientInput tx -> IO ()) ->
-- | Read model to enhance 'Greetings' messages with 'HeadStatus'.
Projection STM.STM (ServerOutput tx) HeadStatus ->
-- | Read model to enhance 'Greetings' messages with snapshot UTxO.
Projection STM.STM (ServerOutput tx) (Maybe (UTxOType tx)) ->
TChan (TimedServerOutput tx) ->
TChan (TimedServerOutput r tx) ->
-- | Called when the server is listening before entering the main loop.
NotifyServerRunning ->
IO ()
Expand All @@ -194,6 +196,7 @@ runAPIServer host port party tracer history callback headStatusP snapshotUtxoP r
traceWith tracer NewAPIConnection
let path = requestPath $ pendingRequest pending
queryParams <- uriQuery <$> mkURIBs path
let outConfig = mkServerOutputConfig queryParams
con <- acceptRequest pending
chan <- STM.atomically $ dupTChan responseChannel

Expand All @@ -203,8 +206,6 @@ runAPIServer host port party tracer history callback headStatusP snapshotUtxoP r

forwardGreetingOnly con

let outConfig = mkServerOutputConfig queryParams

withPingThread con 30 (pure ()) $
race_ (receiveInputs con) (sendOutputs chan con outConfig)

Expand All @@ -225,12 +226,13 @@ runAPIServer host port party tracer history callback headStatusP snapshotUtxoP r
{ time
, seq
, output =
Greetings
{ me = party
, headStatus
, snapshotUtxo
} ::
ServerOutput tx
WrappedServerOutput
Greetings
{ me = party
, headStatus
, snapshotUtxo
} ::
WrappedServerOutput r tx
}

Projection{getLatest = getLatestHeadStatus} = headStatusP
Expand Down Expand Up @@ -275,8 +277,8 @@ runAPIServer host port party tracer history callback headStatusP snapshotUtxoP r
sendOutputs chan con outConfig = forever $ do
response <- STM.atomically $ readTChan chan
let sentResponse =
reify outConfig $ \(Proxy :: Proxy r) ->
(toJSON response :: ToJSON TimedServerOutput r tx)
reify outConfig $ \_ ->
Aeson.encode $ toJSON (response :: TimedServerOutput r tx)

sendTextData con sentResponse
traceWith tracer (APIOutputSent $ toJSON response)
Expand All @@ -293,7 +295,7 @@ runAPIServer host port party tracer history callback headStatusP snapshotUtxoP r
let clientInput = decodeUtf8With lenientDecode $ toStrict msg
time <- getCurrentTime
seq <- atomically $ nextSequenceNumber history
let timedOutput = TimedServerOutput{output = InvalidInput @tx e clientInput, time, seq}
let timedOutput = TimedServerOutput{output = WrappedServerOutput @r (InvalidInput @tx e clientInput), time, seq}
sendTextData con $ Aeson.encode timedOutput
traceWith tracer (APIInvalidInput e clientInput)

Expand Down

0 comments on commit dad3bbb

Please sign in to comment.